Source code for simple_ostinato.port

"""
This module implement a class that represents a remote port, controlled by a
:class:`Drone` instance.
"""
from ostinato.core import ost_pb
from .stream import Stream
from . import utils


[docs]class Port(object): """ Represent a remote port. This class provides simple methods to add/remove streams, and send/capture traffic. Args: drone (:class:`Drone`): an object that wraps the underlying protocol buffer calls. port_id (int): id of the port. Attributes: streams (dict): a dictionnary with all the streams configured on this port. It can be refreshed with :meth:`fetch_streams()`. port_id (int): id of the port """ def __init__(self, drone, port_id): self._drone = drone._drone self.port_id = port_id self.streams = [] self.fetch()
[docs] def get_stream(self, stream_id): """ Return a the :class:`Stream` object corresponding to the given stream ID (:class:`int`) """ for stream in self.streams: if stream.stream_id == stream_id: return stream
[docs] def get_streams_by_name(self, name): """ Return a list of :class:`Stream` s that have the given name (:class:str). Since most often names are unique, it is common to get a stream doing: >>> my_stream_foo = my_port.get_streams_by_name('stream_foo')[0] """ streams = [] for stream in self.streams: if stream.name == name: streams.append(stream) return streams
def _fetch(self): o_ports = self._drone.getPortConfig(self._get_o_port_id_list()) return o_ports
[docs] def save(self): """ Save the current port configuration on the remote drone instance. """ o_ports = self._fetch() o_port = o_ports.port[0] o_port.name = self._name o_port.is_enabled = self._is_enabled o_port.transmit_mode = self._transmit_mode o_port.user_name = self._user_name for stream in self.streams: stream.save() self._drone.modifyPort(o_ports)
[docs] def fetch(self): """ Fetch the current port configuration from the remote drone instance. """ o_port = self._fetch().port[0] self._name = o_port.name self._is_enabled = o_port.is_enabled self._transmit_mode = o_port.transmit_mode self._user_name = o_port.user_name self._is_exclusive_control = o_port.is_exclusive_control
def _fetch_stream_ids(self): o_port_ids = ost_pb.PortIdList() o_port_id = o_port_ids.port_id.add() o_port_id.id = self.port_id return self._drone.getStreamIdList(o_port_id) def _fetch_streams(self): o_stream_ids = self._fetch_stream_ids() return self._drone.getStreamConfig(o_stream_ids)
[docs] def fetch_streams(self): """ Fetch the streams configured on this port, from the remote drone instance. The streams are stored in :attr:`streams`. """ o_streams = self._fetch_streams() for o_stream in o_streams.stream: stream_id = o_stream.stream_id.id stream = self.get_stream(stream_id) if stream is None: self.streams.append(Stream(self, stream_id)) else: stream.fetch()
@property def port_id(self): """ ID of the port. This is a read-only attribute. """ return self._port_id @port_id.setter def port_id(self, value): self._port_id = value @property def name(self): """ Name of the port. This is a read-only attribute. """ return getattr(self, '_name', None) @name.setter def name(self, value): raise ValueError('Read-only attribute') @property def is_enabled(self): """ If ``True`` the port is enabled. Otherwise, it is disabled. """ return self._is_enabled @is_enabled.setter def is_enabled(self, value): raise ValueError('Read-only attribute') @property def is_exclusive_control(self): """ """ return self._is_exclusive_control @is_exclusive_control.setter def is_exclusive_control(self, value): raise ValueError('Read-only attribute') class _TransmitMode(utils.Enum): SEQUENTIAL = ost_pb.kSequentialTransmit INTERLEAVED = ost_pb.kInterleavedTransmit @property def transmit_mode(self): """ Can be ``SEQUENTIAL`` or ``INTERLEAVED``. """ return self._TransmitMode.get_key(self._transmit_mode) @transmit_mode.setter def transmit_mode(self, value): self._transmit_mode = self._TransmitMode.get_value(value) @property def user_name(self): """ Name of the port user. """ return self._user_name @user_name.setter def user_name(self, value): self._user_name = str(value) def _get_new_stream_id(self): new_id = 0 while self.get_stream(new_id) is not None: new_id += 1 return new_id
[docs] def add_stream(self, *layers): """ Create a new stream, on the remote drone instance, and return the corresponding Stream object. The object is also added to :attr:`streams`. Layers must be instances of :class:`simple_ostinato.protocols.Protocol` >>> from simple_ostinato import protocols >>> my_port.add_stream(protocols.Mac(), protocols.Ethernet()) """ o_stream_ids = ost_pb.StreamIdList() o_stream_ids.port_id.id = self.port_id stream_id = self._get_new_stream_id() o_stream_ids.stream_id.add().id = stream_id self._drone.addStream(o_stream_ids) new_stream = Stream(self, stream_id) self.streams.append(new_stream) new_stream.layers = list(layers) return new_stream
[docs] def del_stream(self, stream_id): """ Delete the stream provided as argument. Args: stream_id (int): id of the stream to delete from the port. """ o_stream_ids = ost_pb.StreamIdList() o_stream_ids.port_id.id = self.port_id o_stream_ids.stream_id.add().id = stream_id self._drone.deleteStream(o_stream_ids) self.streams.remove(self.get_stream(stream_id))
[docs] def start_send(self): """ Start transmitting the streams that are enabled on this port. """ self._drone.startTransmit(self._get_o_port_id_list())
[docs] def stop_send(self): """ Stop sending """ self._drone.stopTransmit(self._get_o_port_id_list())
[docs] def start_capture(self): """ Start capturing. By default, this method is non-blocking and returns immediately, and :meth:`stop_send()` must be called to stop the capture. """ self._drone.startCapture(self._get_o_port_id_list())
[docs] def stop_capture(self): """ Stop the current capture """ self._drone.stopCapture(self._get_o_port_id_list())
[docs] def clear_stats(self): """ Clear the port statistics """ self._drone.clearStats(self._get_o_port_id_list())
[docs] def to_dict(self): stream_dicts = [] for stream in self.streams: stream_dicts.append(stream.to_dict()) return {'name': self.name, 'transmit_mode': self.transmit_mode, 'is_enabled': self.is_enabled, 'is_exclusive_control': self.is_exclusive_control, 'user_name': self.user_name, 'streams': stream_dicts}
[docs] def from_dict(self, values): read_only = ['name', 'is_exclusive_control', 'user_name', 'is_enabled'] for key, value in values.iteritems(): if key in read_only: pass elif key == 'streams': while self.streams: self.del_stream(self.streams[0].stream_id) for stream_dict in value: stream = self.add_stream() stream.from_dict(stream_dict) else: setattr(self, key, value)
[docs] def get_stats(self): """ Fetch the port statistics, and return them as a dictionary. """ o_stats = self._drone.getStats(self._get_o_port_id_list()) o_stats = o_stats.port_stats[0] return { 'rx_bps': o_stats.rx_bps, 'rx_bytes': o_stats.rx_bytes, 'rx_bytes_nic': o_stats.rx_bytes_nic, 'rx_drops': o_stats.rx_drops, 'rx_errors': o_stats.rx_errors, 'rx_fifo_errors': o_stats.rx_fifo_errors, 'rx_frame_errors': o_stats.rx_frame_errors, 'rx_pkts': o_stats.rx_pkts, 'rx_pkts_nic': o_stats.rx_pkts_nic, 'rx_pps': o_stats.rx_pps, 'tx_bps': o_stats.tx_bps, 'tx_bytes': o_stats.tx_bytes, 'tx_bytes_nic': o_stats.tx_bytes_nic, 'tx_pkts': o_stats.tx_pkts, 'tx_pkts_nic': o_stats.tx_pkts_nic, 'tx_pps': o_stats.tx_pps, }
[docs] def get_capture(self, save_as=None): """ Get the lastest capture and return is as a string. Args: save_as (str): if provided, the capture will also be saved as a \ pcap file at the specified location `on the host that runs \ drone`. """ o_port_id = self._fetch().port[0].port_id o_buff = self._drone.getCaptureBuffer(o_port_id) if save_as: self.save_capture(o_buff, save_as) return o_buff
[docs] def save_capture(self, o_capture_buffer, path): self._drone.saveCaptureBuffer(o_capture_buffer, path)
def _get_o_port_id_list(self): o_port_ids = ost_pb.PortIdList() o_port_ids.port_id.add().id = self.port_id return o_port_ids def __str__(self): if not self.name: return 'port[{}]'.format(self.port_id) return 'port[{}:{}]'.format(self.port_id, self.name)