"""
This module provide a class to manipulate streams.
"""
from ostinato.protocols.protocol_pb2 import StreamControl, StreamCore
from ostinato.core import ost_pb
import time
from . import utils
from . import protocols
from . import constants
class _SendMode(utils.Enum):
FIXED = StreamControl.SendMode.Value('e_sm_fixed')
CONTINUOUS = StreamControl.SendMode.Value('e_sm_continuous')
class _SendUnit(utils.Enum):
PACKETS = StreamControl.SendUnit.Value('e_su_packets')
BURSTS = StreamControl.SendUnit.Value('e_su_bursts')
class _SendNext(utils.Enum):
STOP = StreamControl.NextWhat.Value('e_nw_stop')
GOTO_NEXT = StreamControl.NextWhat.Value('e_nw_goto_next')
GOTO_ID = StreamControl.NextWhat.Value('e_nw_goto_id')
class _FrameLengthMode(utils.Enum):
FIXED = StreamCore.FrameLengthMode.Value('e_fl_fixed')
INC = StreamCore.FrameLengthMode.Value('e_fl_inc')
DEC = StreamCore.FrameLengthMode.Value('e_fl_dec')
RANDOM = StreamCore.FrameLengthMode.Value('e_fl_random')
[docs]class Stream(object):
"""
Represent a stream configured on a port. Besides all the stream
configuration parameters, a stream class has `layers` which define the
packets to be sent.
Args:
port (:class:`Port`): the port instance on which the stream
is defined.
stream_id (int): the stream ID.
"""
def __init__(self, port, stream_id, layers=None):
self.last_check = time.time()
self.port_id = port.port_id
self._drone = port._drone
self.stream_id = stream_id
self.fetch()
if layers:
self.layers.extend(layers)
@property
def layers(self):
"""
List of all the layers configured for this stream.
"""
return self._layers
@layers.setter
def layers(self, value):
self._layers = value
def _fetch_layers(self, o_stream):
o_protocols = o_stream.protocol
self.layers = []
for o_protocol in o_protocols:
protocol_id = o_protocol.protocol_id.id
self.layers.append(_protocol_factory(protocol_id, o_protocol))
def _save_layers(self):
# remove the existing layers
o_streams = self._fetch()
o_stream = o_streams.stream[0]
o_protocols = o_stream.protocol
while len(o_protocols) > 0:
o_protocols.remove(o_protocols[-1])
# add the layers from self.layers
for layer in self.layers:
o_protocol = o_stream.protocol.add()
o_protocol.protocol_id.id = layer._protocol_id
layer._save(o_protocol)
# apply the changes
self._drone.modifyStream(o_streams)
[docs] def save(self):
"""
Save the current stream configuration (including the protocols).
"""
o_streams = self._fetch()
o_stream = o_streams.stream[0]
o_stream.core.is_enabled = self._is_enabled
o_stream.core.name = self._name
o_stream.core.len_mode = self._len_mode
o_stream.core.frame_len = self._frame_len
o_stream.core.frame_len_min = self._frame_len_min
o_stream.core.frame_len_max = self._frame_len_max
o_stream.control.unit = self._unit
o_stream.control.mode = self._mode
o_stream.control.num_bursts = self._num_bursts
o_stream.control.num_packets = self._num_packets
o_stream.control.packets_per_burst = self._packets_per_burst
o_stream.control.next = self._next
o_stream.control.bursts_per_sec = self._bursts_per_sec
o_stream.control.packets_per_sec = self._packets_per_sec
self._drone.modifyStream(o_streams)
self._save_layers()
[docs] def fetch(self):
"""
Fetch the stream configuration on the remote drone instance (including
all the layers).
"""
o_stream = self._fetch().stream[0]
self._name = o_stream.core.name
self._is_enabled = o_stream.core.is_enabled
self._len_mode = o_stream.core.len_mode
self._frame_len = o_stream.core.frame_len
self._frame_len_min = o_stream.core.frame_len_min
self._frame_len_max = o_stream.core.frame_len_max
self._unit = o_stream.control.unit
self._mode = o_stream.control.mode
self._num_bursts = o_stream.control.num_bursts
self._num_packets = o_stream.control.num_packets
self._packets_per_burst = o_stream.control.packets_per_burst
self._next = o_stream.control.next
self._bursts_per_sec = o_stream.control.bursts_per_sec
self._packets_per_sec = o_stream.control.packets_per_sec
self._fetch_layers(o_stream)
def _fetch(self):
o_stream_ids = ost_pb.StreamIdList()
o_stream_ids.port_id.id = self.port_id
o_stream_id = o_stream_ids.stream_id.add()
o_stream_id.id = self.stream_id
o_streams = self._drone.getStreamConfig(o_stream_ids)
return o_streams
@property
def name(self):
"""
Name of the stream (optional)
"""
if not hasattr(self, '_name'):
return ''
return self._name
@name.setter
def name(self, value):
self._name = value
@property
def len_mode(self):
"""
Length mode. It must be either ``FIXED`` (the default), ``INC``,
``DEC`` or ``RANDOM``
"""
return _FrameLengthMode.get_key(self._len_mode)
@len_mode.setter
def len_mode(self, mode):
self._len_mode = _FrameLengthMode.get_value(mode)
@property
def frame_len(self):
return self._frame_len
@frame_len.setter
def frame_len(self, value):
self._frame_len = value
@property
def frame_len_min(self):
return self._frame_len_min
@frame_len_min.setter
def frame_len_min(self, value):
self._frame_len_min = value
@property
def frame_len_max(self):
return self._frame_len_max
@frame_len_max.setter
def frame_len_max(self, value):
self._frame_len_max = value
[docs] def enable(self):
"""
Enable the stream. It is equivalent to setting :attr:`is_enabled` to
``True``.
"""
self._is_enabled = True
[docs] def disable(self):
"""
Disable the stream. It is equivalent to setting :attr:`is_enabled` to
``False``.
"""
self._is_enabled = False
@property
def is_enabled(self):
"""
Return ``True`` if the stream is enabled, ``False`` otherwise. By
default, streams are not enabled.
"""
return self._is_enabled
@is_enabled.setter
def is_enabled(self, value):
if not isinstance(value, bool):
raise TypeError('expected boolean value')
self._is_enabled = value
@property
def unit(self):
"""
Unit to send. It must be either ``PACKETS`` (the default) or
``BURSTS``.
"""
return _SendUnit.get_key(self._unit)
@unit.setter
def unit(self, unit):
self._unit = _SendUnit.get_value(unit)
@property
def mode(self):
"""
Sending mode. It must be either ``FIXED`` (the default) or
``CONTINUOUS``.
If set to ``FIXED``, a fixed number of packets or bursts is sent. If
:attr:`unit` is set to ``PACKETS``, then :attr:`num_packets` packets
are sent. If it is set to ``BURSTS`` then :attr:`num_bursts` bursts
are sent.
If set to ``CONTINUOUS``, packets or bursts are sent continuously until
the port stop transmitting.
"""
return _SendMode.get_key(self._mode)
@mode.setter
def mode(self, mode):
self._mode = _SendMode.get_value(mode)
@property
def num_packets(self):
"""
Number of packets to send. This is ignored if :attr:`mode` is set to
``CONTINUOUS`` or if :attr:`unit` is set to ``BURSTS``.
"""
return self._num_packets
@num_packets.setter
def num_packets(self, value):
self._num_packets = int(value)
@property
def num_bursts(self):
"""
Number of bursts to send. This is ignored if :attr:`mode` is set to
``CONTINUOUS`` or if :attr:`unit` is set to ``PACKETS``.
"""
return self._num_bursts
@num_bursts.setter
def num_bursts(self, value):
self._num_bursts = int(value)
@property
def packets_per_burst(self):
"""
Number of packets per burst. This is ignored if :attr:`mode` is set to
``CONTINUOUS`` or if :attr:`unit` is set to ``PACKETS``
"""
return self._packets_per_burst
@packets_per_burst.setter
def packets_per_burst(self, value):
self._packets_per_burst = value
@property
def next(self):
"""
What to do after the current stream finishes. It is ignored if
:attr:`mode` is set to ``CONTINUOUS``.
- ``STOP``: stop after this stream
- ``GOTO_NEXT``: send the next enabled stream
- ``GOTO_ID``: send a stream with a given ID.
"""
return _SendNext.get_key(self._next)
@next.setter
def next(self, value):
self._next = _SendNext.get_value(value)
@property
def bursts_per_sec(self):
"""
Number of bursts to send per second.
"""
return self._bursts_per_sec
@bursts_per_sec.setter
def bursts_per_sec(self, value):
self._bursts_per_sec = int(value)
@property
def packets_per_sec(self):
"""
Number of bursts to send per second.
"""
return self._packets_per_sec
@packets_per_sec.setter
def packets_per_sec(self, value):
self._packets_per_sec = int(value)
def __str__(self):
if not self.name:
return 'stream[{}]'.format(self.stream_id)
return 'stream[{}:{}]'.format(self.stream_id, self.name)
[docs] def to_dict(self):
layers = []
for layer in self.layers:
layers.append([layer._protocol_id, layer.to_dict()])
return {
'name': self.name,
'is_enabled': self.is_enabled,
'unit': self.unit,
'mode': self.mode,
'num_bursts': self.num_bursts,
'num_packets': self.num_packets,
'packets_per_burst': self.packets_per_burst,
'next': self.next,
'bursts_per_sec': self.bursts_per_sec,
'packets_per_sec': self.packets_per_sec,
'layers': layers,
}
[docs] def from_dict(self, dictionary):
for key, value in dictionary.iteritems():
if key == 'layers':
layers = []
for (protocol_id, layer_dict) in value:
layer = _protocol_factory(protocol_id)
layer.from_dict(layer_dict)
layers.append(layer)
self.layers = layers
else:
setattr(self, key, value)
def _protocol_factory(protocol_id, o_protocol=None):
proto_cls_mapping = {
constants._Protocols.MAC: protocols.Mac,
constants._Protocols.ETHERNET_II: protocols.Ethernet,
constants._Protocols.IP4: protocols.IPv4,
constants._Protocols.TCP: protocols.Tcp,
constants._Protocols.UDP: protocols.Udp,
constants._Protocols.PAYLOAD: protocols.Payload,
}
protocol_cls = proto_cls_mapping[protocol_id]
protocol = protocol_cls()
if o_protocol:
protocol._fetch(o_protocol)
return protocol