On Thu, Mar 14, 2013 at 07:16:07PM +0900, YAMADA Hideki wrote:
> event.py: add link events.
> switches.py: add link discovery (only of1.0).
> dumper.py: add handler for link events.
>
> TODO: support other OpenFlow version.
>
> Signed-off-by: YAMADA Hideki <[email protected]>
> ---
> bin/ryu-manager | 1 +
> ryu/topology/dumper.py | 79 +++++-
> ryu/topology/event.py | 42 ++++
> ryu/topology/switches.py | 630
> ++++++++++++++++++++++++++++++++++++++++++++--
> 4 files changed, 726 insertions(+), 26 deletions(-)
>
> diff --git a/bin/ryu-manager b/bin/ryu-manager
> index 03b1b79..2924bd4 100755
> --- a/bin/ryu-manager
> +++ b/bin/ryu-manager
> @@ -41,6 +41,7 @@ from ryu import version
> from ryu.app import wsgi
> from ryu.base.app_manager import AppManager
> from ryu.controller import controller
> +from ryu.topology import switches
>
>
> CONF = cfg.CONF
> diff --git a/ryu/topology/dumper.py b/ryu/topology/dumper.py
> index 426bcb5..b48d06f 100644
> --- a/ryu/topology/dumper.py
> +++ b/ryu/topology/dumper.py
> @@ -21,7 +21,7 @@ import time
>
> from ryu.base import app_manager
> from ryu.controller.handler import set_ev_handler
> -from ryu.topology import switches, event
> +from ryu.topology import event
>
> LOG = logging.getLogger(__name__)
>
> @@ -34,8 +34,15 @@ class DiscoveryEventDumper(app_manager.RyuApp):
> super(DiscoveryEventDumper, self).__init__()
>
> # For testing when sync and async request.
> -# self.threads.append(gevent.spawn_later(0, self._request_sync, 5))
> - self.threads.append(gevent.spawn_later(0, self._request_async, 10))
> +# self.threads.append(
> +# gevent.spawn_later(0, self._switch_request_sync, 5))
> +# self.threads.append(
> +# gevent.spawn_later(0, self._switch_request_async, 10))
> +#
> +# self.threads.append(
> +# gevent.spawn_later(0, self._link_request_sync, 5))
> +# self.threads.append(
> +# gevent.spawn_later(0, self._link_request_async, 10))
>
> self.is_active = True
>
> @@ -59,22 +66,30 @@ class DiscoveryEventDumper(app_manager.RyuApp):
> def port_modify_handler(self, ev):
> LOG.debug(ev)
>
> - def _request_sync(self, interval):
> + @set_ev_handler(event.EventLinkAdd)
> + def link_add_handler(self, ev):
> + LOG.debug(ev)
> +
> + @set_ev_handler(event.EventLinkDelete)
> + def link_del_handler(self, ev):
> + LOG.debug(ev)
> +
> + def _switch_request_sync(self, interval):
> while self.is_active:
> request = event.EventSwitchRequest()
> - LOG.debug('request sync %s thread(%s)',
> + LOG.debug('switch_request sync %s thread(%s)',
> request, id(gevent.getcurrent()))
> reply = self.send_request(request)
> - LOG.debug('reply sync %s', reply)
> + LOG.debug('switch_reply sync %s', reply)
> if len(reply.switches) > 0:
> for sw in reply.switches:
> LOG.debug(' %s', sw)
> gevent.sleep(interval)
>
> - def _request_async(self, interval):
> + def _switch_request_async(self, interval):
> while self.is_active:
> request = event.EventSwitchRequest()
> - LOG.debug('request async %s thread(%s)',
> + LOG.debug('switch_request async %s thread(%s)',
> request, id(gevent.getcurrent()))
> self.send_event(request.dst, request)
>
> @@ -86,7 +101,7 @@ class DiscoveryEventDumper(app_manager.RyuApp):
> i += 1
> LOG.debug(' thread is busy... %s/%s thread(%s)',
> i, busy, id(gevent.getcurrent()))
> - LOG.debug(' thread yield to reply handler. thread(%s)',
> + LOG.debug(' thread yield to switch_reply handler. thread(%s)',
> id(gevent.getcurrent()))
>
> # yield
> @@ -98,7 +113,51 @@ class DiscoveryEventDumper(app_manager.RyuApp):
>
> @set_ev_handler(event.EventSwitchReply)
> def switch_reply_handler(self, reply):
> - LOG.debug('reply async %s', reply)
> + LOG.debug('switch_reply async %s', reply)
> if len(reply.switches) > 0:
> for sw in reply.switches:
> LOG.debug(' %s', sw)
> +
> + def _link_request_sync(self, interval):
> + while self.is_active:
> + request = event.EventLinkRequest()
> + LOG.debug('link_request sync %s thread(%s)',
> + request, id(gevent.getcurrent()))
> + reply = self.send_request(request)
> + LOG.debug('link_reply sync %s', reply)
> + if len(reply.links) > 0:
> + for link in reply.links:
> + LOG.debug(' %s', link)
> + gevent.sleep(interval)
> +
> + def _link_request_async(self, interval):
> + while self.is_active:
> + request = event.EventLinkRequest()
> + LOG.debug('link_request async %s thread(%s)',
> + request, id(gevent.getcurrent()))
> + self.send_event(request.dst, request)
> +
> + start = time.time()
> + busy = interval / 2
> + i = 0
> + while i < busy:
> + if time.time() > start + i:
> + i += 1
> + LOG.debug(' thread is busy... %s/%s thread(%s)',
> + i, busy, id(gevent.getcurrent()))
> + LOG.debug(' thread yield to link_reply handler. thread(%s)',
> + id(gevent.getcurrent()))
> +
> + # yield
> + gevent.sleep(0)
> +
> + LOG.debug(' thread get back. thread(%s)',
> + id(gevent.getcurrent()))
> + gevent.sleep(interval - busy)
> +
> + @set_ev_handler(event.EventLinkReply)
> + def link_reply_handler(self, reply):
> + LOG.debug('link_reply async %s', reply)
> + if len(reply.links) > 0:
> + for link in reply.links:
> + LOG.debug(' %s', link)
> diff --git a/ryu/topology/event.py b/ryu/topology/event.py
> index 0c9ce7c..bd87ab0 100644
> --- a/ryu/topology/event.py
> +++ b/ryu/topology/event.py
> @@ -84,3 +84,45 @@ class EventSwitchReply(event.EventReplyBase):
> def __str__(self):
> return 'EventSwitchReply<dst=%s, %s>' % \
> (self.dst, self.switches)
> +
> +
> +class EventLinkBase(event.EventBase):
> + def __init__(self, link):
> + super(EventLinkBase, self).__init__()
> + self.link = link
class Link should be defined in this file.
> +
> + def __str__(self):
> + return '%s<%s>' % (self.__class__.__name__, self.link)
> +
> +
> +class EventLinkAdd(EventLinkBase):
> + def __init__(self, link):
> + super(EventLinkAdd, self).__init__(link)
> +
> +
> +class EventLinkDelete(EventLinkBase):
> + def __init__(self, link):
> + super(EventLinkDelete, self).__init__(link)
> +
> +
> +class EventLinkRequest(event.EventRequestBase):
> + # If dpid is None, reply all list
> + def __init__(self, dpid=None):
> + super(EventLinkRequest, self).__init__()
> + self.dst = 'switches'
> + self.dpid = dpid
> +
> + def __str__(self):
> + return 'EventLinkRequest<src=%s, dpid=%s>' % \
> + (self.src, self.dpid)
> +
> +
> +class EventLinkReply(event.EventReplyBase):
> + def __init__(self, dst, dpid, links):
> + super(EventLinkReply, self).__init__(dst)
> + self.dpid = dpid
> + self.links = links
> +
> + def __str__(self):
> + return 'EventLinkReply<dst=%s, dpid=%s, links=%s>' % \
> + (self.dst, self.dpid, len(self.links))
> diff --git a/ryu/topology/switches.py b/ryu/topology/switches.py
> index 6cf9af7..6b69a00 100644
> --- a/ryu/topology/switches.py
> +++ b/ryu/topology/switches.py
> @@ -14,16 +14,43 @@
> # limitations under the License.
>
> import logging
> +import gevent
> +import struct
> +import time
> +from oslo.config import cfg
>
> from ryu.topology import event
> from ryu.base import app_manager
> from ryu.controller import ofp_event
> from ryu.controller.handler import set_ev_cls
> from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
> +from ryu.exception import RyuException
> +from ryu.lib.mac import DONTCARE
> +from ryu.lib.dpid import dpid_to_str, str_to_dpid
> +from ryu.lib.packet import packet, ethernet, lldp
> +from ryu.ofproto.ether import ETH_TYPE_LLDP
> +from ryu.ofproto import ofproto_v1_0
> +from ryu.ofproto import nx_match
> +from ryu.ofproto import ofproto_v1_2
> +from ryu.ofproto import ofproto_v1_3
> +
>
> LOG = logging.getLogger(__name__)
>
>
> +CONF = cfg.CONF
> +
> +CONF.register_cli_opts([
> + cfg.BoolOpt('observe-links', default=False,
> + help='observe link discovery events.'),
> + cfg.BoolOpt('install-lldp-flow', default=True,
> + help='link discovery: explicitly install flow entry '
> + 'to send lldp packet to controller'),
> + cfg.BoolOpt('explicit-drop', default=True,
> + help='link discovery: explicitly drop lldp packet in')
> +])
> +
> +
> class Port(object):
> # This is data class passed by EventPortXXX
> def __init__(self, dpid, ofproto, ofpport):
> @@ -91,6 +118,27 @@ class Switch(object):
> return msg
>
>
> +class Link(object):
> + # This is data class passed by EventLinkXXX
> + def __init__(self, src, dst):
> + super(Link, self).__init__()
> + self.src = src
> + self.dst = dst
> +
> + # this type is used for key value of LinkState
> + def __eq__(self, other):
> + return self.src == other.src and self.dst == other.dst
> +
> + def __ne__(self, other):
> + return not self.__eq__(other)
> +
> + def __hash__(self):
> + return hash((self.src, self.dst))
> +
> + def __str__(self):
> + return 'Link: %s to %s' % (self.src, self.dst)
> +
> +
> class PortState(dict):
> # dict: int port_no -> OFPPort port
> # OFPPort is defined in ryu.ofproto.ofproto_v1_X_parser
> @@ -107,17 +155,297 @@ class PortState(dict):
> self[port_no] = port
>
>
> +class PortData(object):
> + def __init__(self, is_down, lldp_data):
> + super(PortData, self).__init__()
> + self.is_down = is_down
> + self.lldp_data = lldp_data
> + self.timestamp = None
> + self.sent = 0
> +
> + def lldp_sent(self):
> + self.timestamp = time.time()
> + self.sent += 1
> +
> + def lldp_received(self):
> + self.sent = 0
> +
> + def lldp_dropped(self):
> + return self.sent
> +
> + def clear_timestamp(self):
> + self.timestamp = None
> +
> + def set_down(self, is_down):
> + self.is_down = is_down
> +
> + def __str__(self):
> + return 'PortData<live=%s, timestamp=%s, sent=%d>' \
> + % (not self.is_down, self.timestamp, self.sent)
> +
> +
> +class PortDataState(dict):
> + # dict: Port class -> PortData class
> + # slimed down version of OrderedDict as python 2.6 doesn't support it.
> + _PREV = 0
> + _NEXT = 1
> + _KEY = 2
> +
> + def __init__(self):
> + super(PortDataState, self).__init__()
> + self._root = root = [] # sentinel node
> + root[:] = [root, root, None] # [_PREV, _NEXT, _KEY]
> + # doubly linked list
> + self._map = {}
> +
> + def _remove_key(self, key):
> + link_prev, link_next, key = self._map.pop(key)
> + link_prev[self._NEXT] = link_next
> + link_next[self._PREV] = link_prev
> +
> + def _append_key(self, key):
> + root = self._root
> + last = root[self._PREV]
> + last[self._NEXT] = root[self._PREV] = self._map[key] = [last, root,
> + key]
> +
> + def _prepend_key(self, key):
> + root = self._root
> + first = root[self._NEXT]
> + first[self._PREV] = root[self._NEXT] = self._map[key] = [root, first,
> + key]
> +
> + def _move_last_key(self, key):
> + self._remove_key(key)
> + self._append_key(key)
> +
> + def _move_front_key(self, key):
> + self._remove_key(key)
> + self._prepend_key(key)
> +
> + def add_port(self, port, lldp_data):
> + if port not in self:
> + self._prepend_key(port)
> + self[port] = PortData(port.is_down(), lldp_data)
> + else:
> + self[port].is_down = port.is_down()
> +
> + def lldp_sent(self, port):
> + port_data = self[port]
> + port_data.lldp_sent()
> + self._move_last_key(port)
> + return port_data
> +
> + def lldp_received(self, port):
> + self[port].lldp_received()
> +
> + def move_front(self, port):
> + port_data = self.get(port, None)
> + if port_data is not None:
> + port_data.clear_timestamp()
> + self._move_front_key(port)
> +
> + def set_down(self, port):
> + is_down = port.is_down()
> + port_data = self[port]
> + port_data.set_down(is_down)
> + port_data.clear_timestamp()
> + if not is_down:
> + self._move_front_key(port)
> + return is_down
> +
> + def get_port(self, port):
> + return self[port]
> +
> + def del_port(self, port):
> + del self[port]
> + self._remove_key(port)
> +
> + def __iter__(self):
> + root = self._root
> + curr = root[self._NEXT]
> + while curr is not root:
> + yield curr[self._KEY]
> + curr = curr[self._NEXT]
> +
> + def clear(self):
> + for node in self._map.itervalues():
> + del node[:]
> + root = self._root
> + root[:] = [root, root, None]
> + self._map.clear()
> + dict.clear(self)
> +
> + def items(self):
> + 'od.items() -> list of (key, value) pairs in od'
> + return [(key, self[key]) for key in self]
> +
> + def iteritems(self):
> + 'od.iteritems -> an iterator over the (key, value) pairs in od'
> + for k in self:
> + yield (k, self[k])
> +
> +
> +class LinkState(dict):
> + # dict: Link class -> timestamp
> + def __init__(self):
> + super(LinkState, self).__init__()
> + self._map = {}
> +
> + def get_peer(self, src):
> + return self._map.get(src, None)
> +
> + def update_link(self, src, dst):
> + link = Link(src, dst)
> +
> + self[link] = time.time()
> + self._map[src] = dst
> +
> + # return if the reverse link is also up or not
> + rev_link = Link(dst, src)
> + return rev_link in self
> +
> + def link_down(self, link):
> + del self[link]
> + del self._map[link.src]
> +
> + def rev_link_set_timestamp(self, rev_link, timestamp):
> + # rev_link may or may not in LinkSet
> + if rev_link in self:
> + self[rev_link] = timestamp
> +
> + def port_deleted(self, src):
> + dst = self.get_peer(src)
> + if dst is None:
> + raise KeyError()
> +
> + link = Link(src, dst)
> + rev_link = Link(dst, src)
> + del self[link]
> + del self._map[src]
> + # reverse link might not exist
> + self.pop(rev_link, None)
> + rev_link_dst = self._map.pop(dst, None)
> +
> + return dst, rev_link_dst
> +
> +
> +class LLDPPacket(object):
> + # make a LLDP packet for link discovery.
> +
> + CHASSIS_ID_PREFIX = 'dpid:'
> + CHASSIS_ID_PREFIX_LEN = len(CHASSIS_ID_PREFIX)
> + CHASSIS_ID_FMT = CHASSIS_ID_PREFIX + '%s'
> +
> + PORT_ID_STR = '!I' # uint32_t
> + PORT_ID_SIZE = 4
> +
> + class LLDPUnknownFormat(RyuException):
> + message = '%(msg)s'
> +
> + @staticmethod
> + def lldp_packet(dpid, port_no, dl_addr, ttl):
> + pkt = packet.Packet()
> +
> + dst = lldp.LLDP_MAC_NEAREST_BRIDGE
> + src = dl_addr
> + ethertype = ETH_TYPE_LLDP
> + eth_pkt = ethernet.ethernet(dst, src, ethertype)
> + pkt.add_protocol(eth_pkt)
> +
> + tlv_chassis_id = lldp.ChassisID(
> + subtype=lldp.ChassisID.SUB_LOCALLY_ASSIGNED,
> + chassis_id=LLDPPacket.CHASSIS_ID_FMT %
> + dpid_to_str(dpid))
> +
> + tlv_port_id = lldp.PortID(subtype=lldp.PortID.SUB_PORT_COMPONENT,
> + port_id=struct.pack(
> + LLDPPacket.PORT_ID_STR,
> + port_no))
> +
> + tlv_ttl = lldp.TTL(ttl=ttl)
> + tlv_end = lldp.End()
> +
> + tlvs = (tlv_chassis_id, tlv_port_id, tlv_ttl, tlv_end)
> + lldp_pkt = lldp.lldp(tlvs)
> + pkt.add_protocol(lldp_pkt)
> +
> + pkt.serialize()
> + return pkt.data
> +
> + @staticmethod
> + def lldp_parse(data):
> + pkt = packet.Packet(data)
> + eth_pkt = pkt.next()
> + assert type(eth_pkt) == ethernet.ethernet
> +
> + lldp_pkt = pkt.next()
> + if type(lldp_pkt) != lldp.lldp:
> + raise LLDPPacket.LLDPUnknownFormat()
> +
> + tlv_chassis_id = lldp_pkt.tlvs[0]
> + if tlv_chassis_id.subtype != lldp.ChassisID.SUB_LOCALLY_ASSIGNED:
> + raise LLDPPacket.LLDPUnknownFormat(
> + msg='unknown chassis id subtype %d' % tlv_chassis_id.subtype)
> + chassis_id = tlv_chassis_id.chassis_id
> + if not chassis_id.startswith(LLDPPacket.CHASSIS_ID_PREFIX):
> + raise LLDPPacket.LLDPUnknownFormat(
> + msg='unknown chassis id format %s' % chassis_id)
> + src_dpid = str_to_dpid(chassis_id[LLDPPacket.CHASSIS_ID_PREFIX_LEN:])
> +
> + tlv_port_id = lldp_pkt.tlvs[1]
> + if tlv_port_id.subtype != lldp.PortID.SUB_PORT_COMPONENT:
> + raise LLDPPacket.LLDPUnknownFormat(
> + msg='unknown port id subtype %d' % tlv_port_id.subtype)
> + port_id = tlv_port_id.port_id
> + if len(port_id) != LLDPPacket.PORT_ID_SIZE:
> + raise LLDPPacket.LLDPUnknownFormat(
> + msg='unknown port id %d' % port_id)
> + (src_port_no, ) = struct.unpack(LLDPPacket.PORT_ID_STR, port_id)
> +
> + return src_dpid, src_port_no
> +
> +
> class Switches(app_manager.RyuApp):
> _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave,
> event.EventPortAdd, event.EventPortDelete,
> - event.EventPortModify]
> + event.EventPortModify,
> + event.EventLinkAdd, event.EventLinkDelete]
> +
> + DEFAULT_TTL = 120 # unused. ignored.
> + LLDP_PACKET_LEN = len(LLDPPacket.lldp_packet(0, 0, DONTCARE, 0))
> +
> + LLDP_SEND_GUARD = .05
> + LLDP_SEND_PERIOD_PER_PORT = .9
> + TIMEOUT_CHECK_PERIOD = 5.
> + LINK_TIMEOUT = TIMEOUT_CHECK_PERIOD * 2
> + LINK_LLDP_DROP = 5
>
> def __init__(self):
> super(Switches, self).__init__()
>
> self.name = 'switches'
> - self.dps = {} # datapath_id => class Datapath
> - self.port_state = {} # datapath_id => ports
> + self.dps = {} # datapath_id => Datapath class
> + self.port_state = {} # datapath_id => ports
> + self.ports = PortDataState() # Port class -> PortData class
> + self.links = LinkState() # Link class -> timestamp
> + self.is_active = True
> +
> + self.link_discovery = CONF.observe_links
> + if self.link_discovery:
> + self.install_flow = CONF.install_lldp_flow
> + self.explicit_drop = CONF.explicit_drop
> + self.lldp_event = gevent.event.Event()
> + self.link_event = gevent.event.Event()
> + self.threads.append(gevent.spawn_later(0, self.lldp_loop))
> + self.threads.append(gevent.spawn_later(0, self.link_loop))
> +
> + def close(self):
> + self.is_active = False
> + if self.link_discovery:
> + self.lldp_event.set()
> + self.link_event.set()
> + gevent.joinall(self.threads)
>
> def _register(self, dp):
> assert dp.id is not None
> @@ -133,11 +461,40 @@ class Switches(app_manager.RyuApp):
> del self.dps[dp.id]
> del self.port_state[dp.id]
>
> - def _get_switch(self, dp):
> - switch = Switch(dp)
> - for ofpport in self.port_state[dp.id].itervalues():
> - switch.add_port(ofpport)
> - return switch
> + def _get_switch(self, dpid):
> + if dpid in self.dps:
> + switch = Switch(self.dps[dpid])
> + for ofpport in self.port_state[dpid].itervalues():
> + switch.add_port(ofpport)
> + return switch
> +
> + def _get_port(self, dpid, port_no):
> + switch = self._get_switch(dpid)
> + if switch:
> + for p in switch.ports:
> + if p.port_no == port_no:
> + return p
> +
> + def _port_added(self, port):
> + lldp_data = LLDPPacket.lldp_packet(
> + port.dpid, port.port_no, port.hw_addr, self.DEFAULT_TTL)
> + self.ports.add_port(port, lldp_data)
> + # LOG.debug('_port_added dpid=%s, port_no=%s, live=%s',
> + # port.dpid, port.port_no, port.is_live())
> +
> + def _link_down(self, port):
> + try:
> + dst, rev_link_dst = self.links.port_deleted(port)
> + except KeyError:
> + # LOG.debug('key error. src=%s, dst=%s',
> + # port, self.links.get_peer(port))
> + return
> + link = Link(port, dst)
> + self.send_event_to_observers(event.EventLinkDelete(link))
> + if rev_link_dst:
> + rev_link = Link(dst, rev_link_dst)
> + self.send_event_to_observers(event.EventLinkDelete(rev_link))
> + self.ports.move_front(dst)
>
> @set_ev_cls(ofp_event.EventOFPStateChange,
> [MAIN_DISPATCHER, DEAD_DISPATCHER])
> @@ -148,19 +505,54 @@ class Switches(app_manager.RyuApp):
>
> if ev.state == MAIN_DISPATCHER:
> self._register(dp)
> - switch = self._get_switch(dp)
> + switch = self._get_switch(dp.id)
> LOG.debug('register %s', switch)
> self.send_event_to_observers(event.EventSwitchEnter(switch))
>
> + if not self.link_discovery:
> + return
> +
> + if self.install_flow:
> + ofproto = dp.ofproto
> + ofproto_parser = dp.ofproto_parser
> +
> + # TODO:XXX need other versions
> + if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
> + rule = nx_match.ClsRule()
> + rule.set_dl_dst(lldp.LLDP_MAC_NEAREST_BRIDGE)
> + rule.set_dl_type(ETH_TYPE_LLDP)
> + actions = [ofproto_parser.OFPActionOutput(
> + ofproto.OFPP_CONTROLLER, self.LLDP_PACKET_LEN)]
> + dp.send_flow_mod(
> + rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
> + idle_timeout=0, hard_timeout=0, actions=actions)
> + else:
> + LOG.error('cannot install flow. unsupported version. %x',
> + dp.ofproto.OFP_VERSION)
> +
> + for port in switch.ports:
> + if not port.is_reserved():
> + self._port_added(port)
> + self.lldp_event.set()
> +
> elif ev.state == DEAD_DISPATCHER:
> # dp.id is None when datapath dies before handshake
> if dp.id is None:
> return
> - switch = self._get_switch(dp)
> + switch = self._get_switch(dp.id)
> self._unregister(dp)
> LOG.debug('unregister %s', switch)
> self.send_event_to_observers(event.EventSwitchLeave(switch))
>
> + if not self.link_discovery:
> + return
> +
> + for port in switch.ports:
> + if not port.is_reserved():
> + self.ports.del_port(port)
> + self._link_down(port)
> + self.lldp_event.set()
> +
> @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
> def port_status_handler(self, ev):
> msg = ev.msg
> @@ -176,6 +568,14 @@ class Switches(app_manager.RyuApp):
> self.send_event_to_observers(
> event.EventPortAdd(Port(dp.id, dp.ofproto, ofpport)))
>
> + if not self.link_discovery:
> + return
> +
> + port = self._get_port(dp.id, ofpport.port_no)
> + if port and not port.is_reserved():
> + self._port_added(port)
> + self.lldp_event.set()
> +
> elif reason == dp.ofproto.OFPPR_DELETE:
> #LOG.debug('A port was deleted.' +
> # '(datapath id = %s, port number = %s)',
> @@ -184,6 +584,15 @@ class Switches(app_manager.RyuApp):
> self.send_event_to_observers(
> event.EventPortDelete(Port(dp.id, dp.ofproto, ofpport)))
>
> + if not self.link_discovery:
> + return
> +
> + port = self._get_port(dp.id, ofpport.port_no)
> + if port and not port.is_reserved():
> + self.ports.del_port(port)
> + self._link_down(port)
> + self.lldp_event.set()
> +
> else:
> assert reason == dp.ofproto.OFPPR_MODIFY
> #LOG.debug('A port was modified.' +
> @@ -193,18 +602,184 @@ class Switches(app_manager.RyuApp):
> self.send_event_to_observers(
> event.EventPortModify(Port(dp.id, dp.ofproto, ofpport)))
>
> + if not self.link_discovery:
> + return
> +
> + port = self._get_port(dp.id, ofpport.port_no)
> + if port and not port.is_reserved():
> + if self.ports.set_down(port):
> + self._link_down(port)
> + self.lldp_event.set()
> +
> + @staticmethod
> + def _drop_packet(msg):
> + if msg.buffer_id == 0xffffffff:
> + return # TODO:use constant instead of -1
> +
> + dp = msg.datapath
> + # TODO:XXX
> + if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
> + dp.send_packet_out(dp.id, msg.in_port, [])
> + else:
> + LOG.error('cannot drop_packet. unsupported version. %x',
> + dp.ofproto.OFP_VERSION)
> +
> + @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
> + def packet_in_handler(self, ev):
> + if not self.link_discovery:
> + return
> +
> + msg = ev.msg
> + try:
> + src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data)
> + except LLDPPacket.LLDPUnknownFormat as e:
> + # This handler can receive all the packtes which can be
> + # not-LLDP packet. Ignore it silently
> + return
> + else:
> + dst_dpid = msg.datapath.id
> + dst_port_no = msg.in_port
> +
> + src = self._get_port(src_dpid, src_port_no)
> + if not src or src.dpid == dst_dpid:
> + return
> +
> + dst = self._get_port(dst_dpid, dst_port_no)
> + if not dst:
> + return
> +
> + old_peer = self.links.get_peer(src)
> + # LOG.debug("Packet-In")
> + # LOG.debug(" src=%s", src)
> + # LOG.debug(" dst=%s", dst)
> + # LOG.debug(" old_peer=%s", old_peer)
> + if old_peer and old_peer != dst:
> + old_link = Link(src, old_peer)
> + self.send_event_to_observers(event.EventLinkDelete(old_link))
> +
> + link = Link(src, dst)
> + if not link in self.links:
> + self.send_event_to_observers(event.EventLinkAdd(link))
> +
> + if not self.links.update_link(src, dst):
> + # reverse link is not detected yet.
> + # So schedule the check early because it's very likely it's
> up
> + try:
> + self.ports.lldp_received(dst)
> + except KeyError as e:
> + # There are races between EventOFPPacketIn and
> + # EventDPPortAdd. So packet-in event can happend before
> + # port add event. In that case key error can happend.
> + # LOG.debug('lldp_received: KeyError %s', e)
> + pass
> + else:
> + self.ports.move_front(dst)
> + self.lldp_event.set()
> + if self.explicit_drop:
> + self._drop_packet(msg)
> +
> + def send_lldp_packet(self, port):
> + try:
> + port_data = self.ports.lldp_sent(port)
> + except KeyError as e:
> + # ports can be modified during our sleep in self.lldp_loop()
> + # LOG.debug('send_lldp: KeyError %s', e)
> + return
> + if port_data.is_down:
> + return
> +
> + dp = self.dps.get(port.dpid, None)
> + if dp is None:
> + # datapath was already deleted
> + return
> +
> + # LOG.debug('lldp sent dpid=%s, port_no=%d', dp.id, port.port_no)
> + # TODO:XXX
> + if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
> + actions = [dp.ofproto_parser.OFPActionOutput(port.port_no)]
> + dp.send_packet_out(actions=actions, data=port_data.lldp_data)
> + else:
> + LOG.error('cannot send lldp packet. unsupported version. %x',
> + dp.ofproto.OFP_VERSION)
> +
> + def lldp_loop(self):
> + while self.is_active:
> + self.lldp_event.clear()
> +
> + now = time.time()
> + timeout = None
> + ports_now = []
> + ports = []
> + for (key, data) in self.ports.items():
> + if data.timestamp is None:
> + ports_now.append(key)
> + continue
> +
> + expire = data.timestamp + self.LLDP_SEND_PERIOD_PER_PORT
> + if expire <= now:
> + ports.append(key)
> + continue
> +
> + timeout = expire - now
> + break
> +
> + for port in ports_now:
> + self.send_lldp_packet(port)
> + for port in ports:
> + self.send_lldp_packet(port)
> + gevent.sleep(self.LLDP_SEND_GUARD) # don't burst
> +
> + if timeout is not None and ports:
> + timeout = 0 # We have already slept
> + # LOG.debug('lldp sleep %s', timeout)
> + self.lldp_event.wait(timeout=timeout)
> +
> + def link_loop(self):
> + while self.is_active:
> + self.link_event.clear()
> +
> + now = time.time()
> + deleted = []
> + for (link, timestamp) in self.links.items():
> + # LOG.debug('%s timestamp %d (now %d)', link, timestamp, now)
> + if timestamp + self.LINK_TIMEOUT < now:
> + src = link.src
> + if src in self.ports:
> + port_data = self.ports.get_port(src)
> + # LOG.debug('port_data %s', port_data)
> + if port_data.lldp_dropped() > self.LINK_LLDP_DROP:
> + deleted.append(link)
> +
> + for link in deleted:
> + self.links.link_down(link)
> + # LOG.debug('delete %s', link)
> + self.send_event_to_observers(event.EventLinkDelete(link))
> +
> + dst = link.dst
> + rev_link = Link(dst, link.src)
> + if rev_link not in deleted:
> + # It is very likely that the reverse link is also
> + # disconnected. Check it early.
> + expire = now - self.LINK_TIMEOUT
> + self.links.rev_link_set_timestamp(rev_link, expire)
> + if dst in self.ports:
> + self.ports.move_front(dst)
> + self.lldp_event.set()
> +
> + self.link_event.wait(timeout=self.TIMEOUT_CHECK_PERIOD)
> +
> @set_ev_cls(event.EventSwitchRequest)
> def switch_request_handler(self, req):
> - LOG.debug(req)
> + # LOG.debug(req)
> dpid = req.dpid
>
> switches = []
> if dpid is None:
> # reply all list
> for dp in self.dps.itervalues():
> - switches.append(self._get_switch(dp))
> + switches.append(self._get_switch(dp.id))
> elif dpid in self.dps:
> - switches.append(self._get_switch(self.dps[dpid]))
> + switches.append(self._get_switch(dpid))
>
> rep = event.EventSwitchReply(req.src, switches)
> if req.sync:
> @@ -212,10 +787,33 @@ class Switches(app_manager.RyuApp):
> else:
> self.send_event(req.src, rep)
>
> + @set_ev_cls(event.EventLinkRequest)
> + def link_request_handler(self, req):
> + # LOG.debug(req)
> + dpid = req.dpid
> +
> + if dpid is None:
> + links = self.links
> + else:
> + links = [link for link in self.links if link.src.dpid == dpid]
> + rep = event.EventLinkReply(req.src, dpid, links)
> + if req.sync:
> + self.send_reply(rep)
> + else:
> + self.send_event(req.src, rep)
> +
The logic of discovery seems to be separated from switch/port management
class. Can it be separated into independent class?
If it would be difficult, how about mixin class or inheritance?
Then it would clarify the layering.
>
> -def get(app, dpid=None):
> +def get_switch(app, dpid=None):
> return app.send_request(event.EventSwitchRequest(dpid))
>
>
> -def get_all(app):
> - return get(app)
> +def get_all_switch(app):
> + return get_switch(app)
> +
> +
> +def get_link(app, dpid=None):
> + return app.send_request(event.EventLinkRequest(dpid))
> +
> +
> +def get_all_link(app):
> + return get_link(app)
Can those exported function be split into other file? For example, api.py.
Then it would clarify that the exported API and internal implementation.
> --
> 1.7.9.5
>
>
>
> ------------------------------------------------------------------------------
> Everyone hates slow websites. So do we.
> Make your web apps faster with AppDynamics
> Download AppDynamics Lite for free today:
> http://p.sf.net/sfu/appdyn_d2d_mar
> _______________________________________________
> Ryu-devel mailing list
> [email protected]
> https://lists.sourceforge.net/lists/listinfo/ryu-devel
>
--
yamahata
------------------------------------------------------------------------------
Everyone hates slow websites. So do we.
Make your web apps faster with AppDynamics
Download AppDynamics Lite for free today:
http://p.sf.net/sfu/appdyn_d2d_mar
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel