"""
The module defines the ``Drone`` class, which is a wrapper for all the protocol
buffer methods. It is usually the object to create when using
``ostinato-simple``:
"""
from ostinato.core import DroneProxy
from .port import Port
global _DEBUG_PROTOBUF
_DEBUG_PROTOBUF = False
[docs]class Drone(object):
"""Wrapper for ``ostinato.core.DroneProxy``.
All the protocol buffer related methods are prefixed with ``_o_`` and are
for internal use only.
Args:
host (str): ip address or hostname of the host running the drone
instance you want to connect to.
connect (bool): if True, attempt to connect to the remote instance when
the object is initialized. Otherwise, it can be done manually later
with :meth:`connect()`
"""
def __init__(self, host, connect=True):
self._drone = DroneProxy(host)
if connect is True:
self.connect()
self.ports = []
[docs] def connect(self):
"""
Connect to the remote drone instance. By default, it is already called
when the object is created.
"""
self._drone.connect()
[docs] def disconnect(self):
"""
Disconnect from the remote drone instance.
"""
self._drone.disconnect()
[docs] def reconnect(self):
"""
Reconnect to the remote drone instance.
"""
self._drone.disconnect()
self._drone.connect()
[docs] def fetch_ports(self):
"""
Get the list of all the ports on the remote host. They are stored in
the :attr:`ports` dictionnary.
"""
o_ports = self._drone.getPortConfig(self._drone.getPortIdList())
for o_port in o_ports.port:
port_id = o_port.port_id.id
port = self.get_port_by_id(port_id)
if port is None:
self.ports.append(Port(self, port_id))
else:
port.fetch()
[docs] def get_port_by_id(self, port_id):
for port in self.ports:
if port.port_id == port_id:
return port
[docs] def get_port(self, name):
"""
Get ports from :attr:`ports` by name. If the port is not found,
``None`` is returned.
"""
for port in self.ports:
if port.name == name:
return port
def __str__(self):
return 'drone({})'.format(self._drone.host)