event.py: add link events.
switches.py: add link discovery (only of1.0).
dumper.py: add handler for link events.

TODO: support other OpenFlow version.

Signed-off-by: YAMADA Hideki <[email protected]>
---
 bin/ryu-manager          |    1 +
 ryu/topology/dumper.py   |   79 +++++-
 ryu/topology/event.py    |   42 ++++
 ryu/topology/switches.py |  630 ++++++++++++++++++++++++++++++++++++++++++++--
 4 files changed, 726 insertions(+), 26 deletions(-)

diff --git a/bin/ryu-manager b/bin/ryu-manager
index 03b1b79..2924bd4 100755
--- a/bin/ryu-manager
+++ b/bin/ryu-manager
@@ -41,6 +41,7 @@ from ryu import version
 from ryu.app import wsgi
 from ryu.base.app_manager import AppManager
 from ryu.controller import controller
+from ryu.topology import switches
 
 
 CONF = cfg.CONF
diff --git a/ryu/topology/dumper.py b/ryu/topology/dumper.py
index 426bcb5..b48d06f 100644
--- a/ryu/topology/dumper.py
+++ b/ryu/topology/dumper.py
@@ -21,7 +21,7 @@ import time
 
 from ryu.base import app_manager
 from ryu.controller.handler import set_ev_handler
-from ryu.topology import switches, event
+from ryu.topology import event
 
 LOG = logging.getLogger(__name__)
 
@@ -34,8 +34,15 @@ class DiscoveryEventDumper(app_manager.RyuApp):
         super(DiscoveryEventDumper, self).__init__()
 
         # For testing when sync and async request.
-#        self.threads.append(gevent.spawn_later(0, self._request_sync, 5))
-        self.threads.append(gevent.spawn_later(0, self._request_async, 10))
+#        self.threads.append(
+#            gevent.spawn_later(0, self._switch_request_sync, 5))
+#        self.threads.append(
+#            gevent.spawn_later(0, self._switch_request_async, 10))
+#
+#        self.threads.append(
+#            gevent.spawn_later(0, self._link_request_sync, 5))
+#        self.threads.append(
+#            gevent.spawn_later(0, self._link_request_async, 10))
 
         self.is_active = True
 
@@ -59,22 +66,30 @@ class DiscoveryEventDumper(app_manager.RyuApp):
     def port_modify_handler(self, ev):
         LOG.debug(ev)
 
-    def _request_sync(self, interval):
+    @set_ev_handler(event.EventLinkAdd)
+    def link_add_handler(self, ev):
+        LOG.debug(ev)
+
+    @set_ev_handler(event.EventLinkDelete)
+    def link_del_handler(self, ev):
+        LOG.debug(ev)
+
+    def _switch_request_sync(self, interval):
         while self.is_active:
             request = event.EventSwitchRequest()
-            LOG.debug('request sync %s thread(%s)',
+            LOG.debug('switch_request sync %s thread(%s)',
                       request, id(gevent.getcurrent()))
             reply = self.send_request(request)
-            LOG.debug('reply sync %s', reply)
+            LOG.debug('switch_reply sync %s', reply)
             if len(reply.switches) > 0:
                 for sw in reply.switches:
                     LOG.debug('  %s', sw)
             gevent.sleep(interval)
 
-    def _request_async(self, interval):
+    def _switch_request_async(self, interval):
         while self.is_active:
             request = event.EventSwitchRequest()
-            LOG.debug('request async %s thread(%s)',
+            LOG.debug('switch_request async %s thread(%s)',
                       request, id(gevent.getcurrent()))
             self.send_event(request.dst, request)
 
@@ -86,7 +101,7 @@ class DiscoveryEventDumper(app_manager.RyuApp):
                     i += 1
                     LOG.debug('  thread is busy... %s/%s thread(%s)',
                               i, busy, id(gevent.getcurrent()))
-            LOG.debug('  thread yield to reply handler. thread(%s)',
+            LOG.debug('  thread yield to switch_reply handler. thread(%s)',
                       id(gevent.getcurrent()))
 
             # yield
@@ -98,7 +113,51 @@ class DiscoveryEventDumper(app_manager.RyuApp):
 
     @set_ev_handler(event.EventSwitchReply)
     def switch_reply_handler(self, reply):
-        LOG.debug('reply async %s', reply)
+        LOG.debug('switch_reply async %s', reply)
         if len(reply.switches) > 0:
             for sw in reply.switches:
                 LOG.debug('  %s', sw)
+
+    def _link_request_sync(self, interval):
+        while self.is_active:
+            request = event.EventLinkRequest()
+            LOG.debug('link_request sync %s thread(%s)',
+                      request, id(gevent.getcurrent()))
+            reply = self.send_request(request)
+            LOG.debug('link_reply sync %s', reply)
+            if len(reply.links) > 0:
+                for link in reply.links:
+                    LOG.debug('  %s', link)
+            gevent.sleep(interval)
+
+    def _link_request_async(self, interval):
+        while self.is_active:
+            request = event.EventLinkRequest()
+            LOG.debug('link_request async %s thread(%s)',
+                      request, id(gevent.getcurrent()))
+            self.send_event(request.dst, request)
+
+            start = time.time()
+            busy = interval / 2
+            i = 0
+            while i < busy:
+                if time.time() > start + i:
+                    i += 1
+                    LOG.debug('  thread is busy... %s/%s thread(%s)',
+                              i, busy, id(gevent.getcurrent()))
+            LOG.debug('  thread yield to link_reply handler. thread(%s)',
+                      id(gevent.getcurrent()))
+
+            # yield
+            gevent.sleep(0)
+
+            LOG.debug('  thread get back. thread(%s)',
+                      id(gevent.getcurrent()))
+            gevent.sleep(interval - busy)
+
+    @set_ev_handler(event.EventLinkReply)
+    def link_reply_handler(self, reply):
+        LOG.debug('link_reply async %s', reply)
+        if len(reply.links) > 0:
+            for link in reply.links:
+                LOG.debug('  %s', link)
diff --git a/ryu/topology/event.py b/ryu/topology/event.py
index 0c9ce7c..bd87ab0 100644
--- a/ryu/topology/event.py
+++ b/ryu/topology/event.py
@@ -84,3 +84,45 @@ class EventSwitchReply(event.EventReplyBase):
     def __str__(self):
         return 'EventSwitchReply<dst=%s, %s>' % \
             (self.dst, self.switches)
+
+
+class EventLinkBase(event.EventBase):
+    def __init__(self, link):
+        super(EventLinkBase, self).__init__()
+        self.link = link
+
+    def __str__(self):
+        return '%s<%s>' % (self.__class__.__name__, self.link)
+
+
+class EventLinkAdd(EventLinkBase):
+    def __init__(self, link):
+        super(EventLinkAdd, self).__init__(link)
+
+
+class EventLinkDelete(EventLinkBase):
+    def __init__(self, link):
+        super(EventLinkDelete, self).__init__(link)
+
+
+class EventLinkRequest(event.EventRequestBase):
+    # If dpid is None, reply all list
+    def __init__(self, dpid=None):
+        super(EventLinkRequest, self).__init__()
+        self.dst = 'switches'
+        self.dpid = dpid
+
+    def __str__(self):
+        return 'EventLinkRequest<src=%s, dpid=%s>' % \
+            (self.src, self.dpid)
+
+
+class EventLinkReply(event.EventReplyBase):
+    def __init__(self, dst, dpid, links):
+        super(EventLinkReply, self).__init__(dst)
+        self.dpid = dpid
+        self.links = links
+
+    def __str__(self):
+        return 'EventLinkReply<dst=%s, dpid=%s, links=%s>' % \
+            (self.dst, self.dpid, len(self.links))
diff --git a/ryu/topology/switches.py b/ryu/topology/switches.py
index 6cf9af7..6b69a00 100644
--- a/ryu/topology/switches.py
+++ b/ryu/topology/switches.py
@@ -14,16 +14,43 @@
 # limitations under the License.
 
 import logging
+import gevent
+import struct
+import time
+from oslo.config import cfg
 
 from ryu.topology import event
 from ryu.base import app_manager
 from ryu.controller import ofp_event
 from ryu.controller.handler import set_ev_cls
 from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
+from ryu.exception import RyuException
+from ryu.lib.mac import DONTCARE
+from ryu.lib.dpid import dpid_to_str, str_to_dpid
+from ryu.lib.packet import packet, ethernet, lldp
+from ryu.ofproto.ether import ETH_TYPE_LLDP
+from ryu.ofproto import ofproto_v1_0
+from ryu.ofproto import nx_match
+from ryu.ofproto import ofproto_v1_2
+from ryu.ofproto import ofproto_v1_3
+
 
 LOG = logging.getLogger(__name__)
 
 
+CONF = cfg.CONF
+
+CONF.register_cli_opts([
+    cfg.BoolOpt('observe-links', default=False,
+                help='observe link discovery events.'),
+    cfg.BoolOpt('install-lldp-flow', default=True,
+                help='link discovery: explicitly install flow entry '
+                     'to send lldp packet to controller'),
+    cfg.BoolOpt('explicit-drop', default=True,
+                help='link discovery: explicitly drop lldp packet in')
+])
+
+
 class Port(object):
     # This is data class passed by EventPortXXX
     def __init__(self, dpid, ofproto, ofpport):
@@ -91,6 +118,27 @@ class Switch(object):
         return msg
 
 
+class Link(object):
+    # This is data class passed by EventLinkXXX
+    def __init__(self, src, dst):
+        super(Link, self).__init__()
+        self.src = src
+        self.dst = dst
+
+    # this type is used for key value of LinkState
+    def __eq__(self, other):
+        return self.src == other.src and self.dst == other.dst
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __hash__(self):
+        return hash((self.src, self.dst))
+
+    def __str__(self):
+        return 'Link: %s to %s' % (self.src, self.dst)
+
+
 class PortState(dict):
     # dict: int port_no -> OFPPort port
     # OFPPort is defined in ryu.ofproto.ofproto_v1_X_parser
@@ -107,17 +155,297 @@ class PortState(dict):
         self[port_no] = port
 
 
+class PortData(object):
+    def __init__(self, is_down, lldp_data):
+        super(PortData, self).__init__()
+        self.is_down = is_down
+        self.lldp_data = lldp_data
+        self.timestamp = None
+        self.sent = 0
+
+    def lldp_sent(self):
+        self.timestamp = time.time()
+        self.sent += 1
+
+    def lldp_received(self):
+        self.sent = 0
+
+    def lldp_dropped(self):
+        return self.sent
+
+    def clear_timestamp(self):
+        self.timestamp = None
+
+    def set_down(self, is_down):
+        self.is_down = is_down
+
+    def __str__(self):
+        return 'PortData<live=%s, timestamp=%s, sent=%d>' \
+            % (not self.is_down, self.timestamp, self.sent)
+
+
+class PortDataState(dict):
+    # dict: Port class -> PortData class
+    # slimed down version of OrderedDict as python 2.6 doesn't support it.
+    _PREV = 0
+    _NEXT = 1
+    _KEY = 2
+
+    def __init__(self):
+        super(PortDataState, self).__init__()
+        self._root = root = []          # sentinel node
+        root[:] = [root, root, None]    # [_PREV, _NEXT, _KEY]
+                                        # doubly linked list
+        self._map = {}
+
+    def _remove_key(self, key):
+        link_prev, link_next, key = self._map.pop(key)
+        link_prev[self._NEXT] = link_next
+        link_next[self._PREV] = link_prev
+
+    def _append_key(self, key):
+        root = self._root
+        last = root[self._PREV]
+        last[self._NEXT] = root[self._PREV] = self._map[key] = [last, root,
+                                                                key]
+
+    def _prepend_key(self, key):
+        root = self._root
+        first = root[self._NEXT]
+        first[self._PREV] = root[self._NEXT] = self._map[key] = [root, first,
+                                                                 key]
+
+    def _move_last_key(self, key):
+        self._remove_key(key)
+        self._append_key(key)
+
+    def _move_front_key(self, key):
+        self._remove_key(key)
+        self._prepend_key(key)
+
+    def add_port(self, port, lldp_data):
+        if port not in self:
+            self._prepend_key(port)
+            self[port] = PortData(port.is_down(), lldp_data)
+        else:
+            self[port].is_down = port.is_down()
+
+    def lldp_sent(self, port):
+        port_data = self[port]
+        port_data.lldp_sent()
+        self._move_last_key(port)
+        return port_data
+
+    def lldp_received(self, port):
+        self[port].lldp_received()
+
+    def move_front(self, port):
+        port_data = self.get(port, None)
+        if port_data is not None:
+            port_data.clear_timestamp()
+            self._move_front_key(port)
+
+    def set_down(self, port):
+        is_down = port.is_down()
+        port_data = self[port]
+        port_data.set_down(is_down)
+        port_data.clear_timestamp()
+        if not is_down:
+            self._move_front_key(port)
+        return is_down
+
+    def get_port(self, port):
+        return self[port]
+
+    def del_port(self, port):
+        del self[port]
+        self._remove_key(port)
+
+    def __iter__(self):
+        root = self._root
+        curr = root[self._NEXT]
+        while curr is not root:
+            yield curr[self._KEY]
+            curr = curr[self._NEXT]
+
+    def clear(self):
+        for node in self._map.itervalues():
+            del node[:]
+        root = self._root
+        root[:] = [root, root, None]
+        self._map.clear()
+        dict.clear(self)
+
+    def items(self):
+        'od.items() -> list of (key, value) pairs in od'
+        return [(key, self[key]) for key in self]
+
+    def iteritems(self):
+        'od.iteritems -> an iterator over the (key, value) pairs in od'
+        for k in self:
+            yield (k, self[k])
+
+
+class LinkState(dict):
+    # dict: Link class -> timestamp
+    def __init__(self):
+        super(LinkState, self).__init__()
+        self._map = {}
+
+    def get_peer(self, src):
+        return self._map.get(src, None)
+
+    def update_link(self, src, dst):
+        link = Link(src, dst)
+
+        self[link] = time.time()
+        self._map[src] = dst
+
+        # return if the reverse link is also up or not
+        rev_link = Link(dst, src)
+        return rev_link in self
+
+    def link_down(self, link):
+        del self[link]
+        del self._map[link.src]
+
+    def rev_link_set_timestamp(self, rev_link, timestamp):
+        # rev_link may or may not in LinkSet
+        if rev_link in self:
+            self[rev_link] = timestamp
+
+    def port_deleted(self, src):
+        dst = self.get_peer(src)
+        if dst is None:
+            raise KeyError()
+
+        link = Link(src, dst)
+        rev_link = Link(dst, src)
+        del self[link]
+        del self._map[src]
+        # reverse link might not exist
+        self.pop(rev_link, None)
+        rev_link_dst = self._map.pop(dst, None)
+
+        return dst, rev_link_dst
+
+
+class LLDPPacket(object):
+    # make a LLDP packet for link discovery.
+
+    CHASSIS_ID_PREFIX = 'dpid:'
+    CHASSIS_ID_PREFIX_LEN = len(CHASSIS_ID_PREFIX)
+    CHASSIS_ID_FMT = CHASSIS_ID_PREFIX + '%s'
+
+    PORT_ID_STR = '!I'      # uint32_t
+    PORT_ID_SIZE = 4
+
+    class LLDPUnknownFormat(RyuException):
+        message = '%(msg)s'
+
+    @staticmethod
+    def lldp_packet(dpid, port_no, dl_addr, ttl):
+        pkt = packet.Packet()
+
+        dst = lldp.LLDP_MAC_NEAREST_BRIDGE
+        src = dl_addr
+        ethertype = ETH_TYPE_LLDP
+        eth_pkt = ethernet.ethernet(dst, src, ethertype)
+        pkt.add_protocol(eth_pkt)
+
+        tlv_chassis_id = lldp.ChassisID(
+            subtype=lldp.ChassisID.SUB_LOCALLY_ASSIGNED,
+            chassis_id=LLDPPacket.CHASSIS_ID_FMT %
+            dpid_to_str(dpid))
+
+        tlv_port_id = lldp.PortID(subtype=lldp.PortID.SUB_PORT_COMPONENT,
+                                  port_id=struct.pack(
+                                      LLDPPacket.PORT_ID_STR,
+                                      port_no))
+
+        tlv_ttl = lldp.TTL(ttl=ttl)
+        tlv_end = lldp.End()
+
+        tlvs = (tlv_chassis_id, tlv_port_id, tlv_ttl, tlv_end)
+        lldp_pkt = lldp.lldp(tlvs)
+        pkt.add_protocol(lldp_pkt)
+
+        pkt.serialize()
+        return pkt.data
+
+    @staticmethod
+    def lldp_parse(data):
+        pkt = packet.Packet(data)
+        eth_pkt = pkt.next()
+        assert type(eth_pkt) == ethernet.ethernet
+
+        lldp_pkt = pkt.next()
+        if type(lldp_pkt) != lldp.lldp:
+            raise LLDPPacket.LLDPUnknownFormat()
+
+        tlv_chassis_id = lldp_pkt.tlvs[0]
+        if tlv_chassis_id.subtype != lldp.ChassisID.SUB_LOCALLY_ASSIGNED:
+            raise LLDPPacket.LLDPUnknownFormat(
+                msg='unknown chassis id subtype %d' % tlv_chassis_id.subtype)
+        chassis_id = tlv_chassis_id.chassis_id
+        if not chassis_id.startswith(LLDPPacket.CHASSIS_ID_PREFIX):
+            raise LLDPPacket.LLDPUnknownFormat(
+                msg='unknown chassis id format %s' % chassis_id)
+        src_dpid = str_to_dpid(chassis_id[LLDPPacket.CHASSIS_ID_PREFIX_LEN:])
+
+        tlv_port_id = lldp_pkt.tlvs[1]
+        if tlv_port_id.subtype != lldp.PortID.SUB_PORT_COMPONENT:
+            raise LLDPPacket.LLDPUnknownFormat(
+                msg='unknown port id subtype %d' % tlv_port_id.subtype)
+        port_id = tlv_port_id.port_id
+        if len(port_id) != LLDPPacket.PORT_ID_SIZE:
+            raise LLDPPacket.LLDPUnknownFormat(
+                msg='unknown port id %d' % port_id)
+        (src_port_no, ) = struct.unpack(LLDPPacket.PORT_ID_STR, port_id)
+
+        return src_dpid, src_port_no
+
+
 class Switches(app_manager.RyuApp):
     _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave,
                event.EventPortAdd, event.EventPortDelete,
-               event.EventPortModify]
+               event.EventPortModify,
+               event.EventLinkAdd, event.EventLinkDelete]
+
+    DEFAULT_TTL = 120  # unused. ignored.
+    LLDP_PACKET_LEN = len(LLDPPacket.lldp_packet(0, 0, DONTCARE, 0))
+
+    LLDP_SEND_GUARD = .05
+    LLDP_SEND_PERIOD_PER_PORT = .9
+    TIMEOUT_CHECK_PERIOD = 5.
+    LINK_TIMEOUT = TIMEOUT_CHECK_PERIOD * 2
+    LINK_LLDP_DROP = 5
 
     def __init__(self):
         super(Switches, self).__init__()
 
         self.name = 'switches'
-        self.dps = {}   # datapath_id => class Datapath
-        self.port_state = {}  # datapath_id => ports
+        self.dps = {}                 # datapath_id => Datapath class
+        self.port_state = {}          # datapath_id => ports
+        self.ports = PortDataState()  # Port class -> PortData class
+        self.links = LinkState()      # Link class -> timestamp
+        self.is_active = True
+
+        self.link_discovery = CONF.observe_links
+        if self.link_discovery:
+            self.install_flow = CONF.install_lldp_flow
+            self.explicit_drop = CONF.explicit_drop
+            self.lldp_event = gevent.event.Event()
+            self.link_event = gevent.event.Event()
+            self.threads.append(gevent.spawn_later(0, self.lldp_loop))
+            self.threads.append(gevent.spawn_later(0, self.link_loop))
+
+    def close(self):
+        self.is_active = False
+        if self.link_discovery:
+            self.lldp_event.set()
+            self.link_event.set()
+            gevent.joinall(self.threads)
 
     def _register(self, dp):
         assert dp.id is not None
@@ -133,11 +461,40 @@ class Switches(app_manager.RyuApp):
             del self.dps[dp.id]
             del self.port_state[dp.id]
 
-    def _get_switch(self, dp):
-        switch = Switch(dp)
-        for ofpport in self.port_state[dp.id].itervalues():
-            switch.add_port(ofpport)
-        return switch
+    def _get_switch(self, dpid):
+        if dpid in self.dps:
+            switch = Switch(self.dps[dpid])
+            for ofpport in self.port_state[dpid].itervalues():
+                switch.add_port(ofpport)
+            return switch
+
+    def _get_port(self, dpid, port_no):
+        switch = self._get_switch(dpid)
+        if switch:
+            for p in switch.ports:
+                if p.port_no == port_no:
+                    return p
+
+    def _port_added(self, port):
+        lldp_data = LLDPPacket.lldp_packet(
+            port.dpid, port.port_no, port.hw_addr, self.DEFAULT_TTL)
+        self.ports.add_port(port, lldp_data)
+        # LOG.debug('_port_added dpid=%s, port_no=%s, live=%s',
+        #           port.dpid, port.port_no, port.is_live())
+
+    def _link_down(self, port):
+        try:
+            dst, rev_link_dst = self.links.port_deleted(port)
+        except KeyError:
+            # LOG.debug('key error. src=%s, dst=%s',
+            #           port, self.links.get_peer(port))
+            return
+        link = Link(port, dst)
+        self.send_event_to_observers(event.EventLinkDelete(link))
+        if rev_link_dst:
+            rev_link = Link(dst, rev_link_dst)
+            self.send_event_to_observers(event.EventLinkDelete(rev_link))
+        self.ports.move_front(dst)
 
     @set_ev_cls(ofp_event.EventOFPStateChange,
                 [MAIN_DISPATCHER, DEAD_DISPATCHER])
@@ -148,19 +505,54 @@ class Switches(app_manager.RyuApp):
 
         if ev.state == MAIN_DISPATCHER:
             self._register(dp)
-            switch = self._get_switch(dp)
+            switch = self._get_switch(dp.id)
             LOG.debug('register %s', switch)
             self.send_event_to_observers(event.EventSwitchEnter(switch))
 
+            if not self.link_discovery:
+                return
+
+            if self.install_flow:
+                ofproto = dp.ofproto
+                ofproto_parser = dp.ofproto_parser
+
+                # TODO:XXX need other versions
+                if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
+                    rule = nx_match.ClsRule()
+                    rule.set_dl_dst(lldp.LLDP_MAC_NEAREST_BRIDGE)
+                    rule.set_dl_type(ETH_TYPE_LLDP)
+                    actions = [ofproto_parser.OFPActionOutput(
+                        ofproto.OFPP_CONTROLLER, self.LLDP_PACKET_LEN)]
+                    dp.send_flow_mod(
+                        rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
+                        idle_timeout=0, hard_timeout=0, actions=actions)
+                else:
+                    LOG.error('cannot install flow. unsupported version. %x',
+                              dp.ofproto.OFP_VERSION)
+
+            for port in switch.ports:
+                if not port.is_reserved():
+                    self._port_added(port)
+            self.lldp_event.set()
+
         elif ev.state == DEAD_DISPATCHER:
             # dp.id is None when datapath dies before handshake
             if dp.id is None:
                 return
-            switch = self._get_switch(dp)
+            switch = self._get_switch(dp.id)
             self._unregister(dp)
             LOG.debug('unregister %s', switch)
             self.send_event_to_observers(event.EventSwitchLeave(switch))
 
+            if not self.link_discovery:
+                return
+
+            for port in switch.ports:
+                if not port.is_reserved():
+                    self.ports.del_port(port)
+                    self._link_down(port)
+            self.lldp_event.set()
+
     @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
     def port_status_handler(self, ev):
         msg = ev.msg
@@ -176,6 +568,14 @@ class Switches(app_manager.RyuApp):
             self.send_event_to_observers(
                 event.EventPortAdd(Port(dp.id, dp.ofproto, ofpport)))
 
+            if not self.link_discovery:
+                return
+
+            port = self._get_port(dp.id, ofpport.port_no)
+            if port and not port.is_reserved():
+                self._port_added(port)
+                self.lldp_event.set()
+
         elif reason == dp.ofproto.OFPPR_DELETE:
             #LOG.debug('A port was deleted.' +
             #          '(datapath id = %s, port number = %s)',
@@ -184,6 +584,15 @@ class Switches(app_manager.RyuApp):
             self.send_event_to_observers(
                 event.EventPortDelete(Port(dp.id, dp.ofproto, ofpport)))
 
+            if not self.link_discovery:
+                return
+
+            port = self._get_port(dp.id, ofpport.port_no)
+            if port and not port.is_reserved():
+                self.ports.del_port(port)
+                self._link_down(port)
+                self.lldp_event.set()
+
         else:
             assert reason == dp.ofproto.OFPPR_MODIFY
             #LOG.debug('A port was modified.' +
@@ -193,18 +602,184 @@ class Switches(app_manager.RyuApp):
             self.send_event_to_observers(
                 event.EventPortModify(Port(dp.id, dp.ofproto, ofpport)))
 
+            if not self.link_discovery:
+                return
+
+            port = self._get_port(dp.id, ofpport.port_no)
+            if port and not port.is_reserved():
+                if self.ports.set_down(port):
+                    self._link_down(port)
+                self.lldp_event.set()
+
+    @staticmethod
+    def _drop_packet(msg):
+        if msg.buffer_id == 0xffffffff:
+            return  # TODO:use constant instead of -1
+
+        dp = msg.datapath
+        # TODO:XXX
+        if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
+            dp.send_packet_out(dp.id, msg.in_port, [])
+        else:
+            LOG.error('cannot drop_packet. unsupported version. %x',
+                      dp.ofproto.OFP_VERSION)
+
+    @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
+    def packet_in_handler(self, ev):
+        if not self.link_discovery:
+            return
+
+        msg = ev.msg
+        try:
+            src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data)
+        except LLDPPacket.LLDPUnknownFormat as e:
+            # This handler can receive all the packtes which can be
+            # not-LLDP packet. Ignore it silently
+            return
+        else:
+            dst_dpid = msg.datapath.id
+            dst_port_no = msg.in_port
+
+            src = self._get_port(src_dpid, src_port_no)
+            if not src or src.dpid == dst_dpid:
+                return
+
+            dst = self._get_port(dst_dpid, dst_port_no)
+            if not dst:
+                return
+
+            old_peer = self.links.get_peer(src)
+            # LOG.debug("Packet-In")
+            # LOG.debug("  src=%s", src)
+            # LOG.debug("  dst=%s", dst)
+            # LOG.debug("  old_peer=%s", old_peer)
+            if old_peer and old_peer != dst:
+                old_link = Link(src, old_peer)
+                self.send_event_to_observers(event.EventLinkDelete(old_link))
+
+            link = Link(src, dst)
+            if not link in self.links:
+                self.send_event_to_observers(event.EventLinkAdd(link))
+
+            if not self.links.update_link(src, dst):
+                # reverse link is not detected yet.
+                # So schedule the check early because it's very likely it's up
+                try:
+                    self.ports.lldp_received(dst)
+                except KeyError as e:
+                    # There are races between EventOFPPacketIn and
+                    # EventDPPortAdd. So packet-in event can happend before
+                    # port add event. In that case key error can happend.
+                    # LOG.debug('lldp_received: KeyError %s', e)
+                    pass
+                else:
+                    self.ports.move_front(dst)
+                    self.lldp_event.set()
+            if self.explicit_drop:
+                self._drop_packet(msg)
+
+    def send_lldp_packet(self, port):
+        try:
+            port_data = self.ports.lldp_sent(port)
+        except KeyError as e:
+            # ports can be modified during our sleep in self.lldp_loop()
+            # LOG.debug('send_lldp: KeyError %s', e)
+            return
+        if port_data.is_down:
+            return
+
+        dp = self.dps.get(port.dpid, None)
+        if dp is None:
+            # datapath was already deleted
+            return
+
+        # LOG.debug('lldp sent dpid=%s, port_no=%d', dp.id, port.port_no)
+        # TODO:XXX
+        if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
+            actions = [dp.ofproto_parser.OFPActionOutput(port.port_no)]
+            dp.send_packet_out(actions=actions, data=port_data.lldp_data)
+        else:
+            LOG.error('cannot send lldp packet. unsupported version. %x',
+                      dp.ofproto.OFP_VERSION)
+
+    def lldp_loop(self):
+        while self.is_active:
+            self.lldp_event.clear()
+
+            now = time.time()
+            timeout = None
+            ports_now = []
+            ports = []
+            for (key, data) in self.ports.items():
+                if data.timestamp is None:
+                    ports_now.append(key)
+                    continue
+
+                expire = data.timestamp + self.LLDP_SEND_PERIOD_PER_PORT
+                if expire <= now:
+                    ports.append(key)
+                    continue
+
+                timeout = expire - now
+                break
+
+            for port in ports_now:
+                self.send_lldp_packet(port)
+            for port in ports:
+                self.send_lldp_packet(port)
+                gevent.sleep(self.LLDP_SEND_GUARD)      # don't burst
+
+            if timeout is not None and ports:
+                timeout = 0     # We have already slept
+            # LOG.debug('lldp sleep %s', timeout)
+            self.lldp_event.wait(timeout=timeout)
+
+    def link_loop(self):
+        while self.is_active:
+            self.link_event.clear()
+
+            now = time.time()
+            deleted = []
+            for (link, timestamp) in self.links.items():
+                # LOG.debug('%s timestamp %d (now %d)', link, timestamp, now)
+                if timestamp + self.LINK_TIMEOUT < now:
+                    src = link.src
+                    if src in self.ports:
+                        port_data = self.ports.get_port(src)
+                        # LOG.debug('port_data %s', port_data)
+                        if port_data.lldp_dropped() > self.LINK_LLDP_DROP:
+                            deleted.append(link)
+
+            for link in deleted:
+                self.links.link_down(link)
+                # LOG.debug('delete %s', link)
+                self.send_event_to_observers(event.EventLinkDelete(link))
+
+                dst = link.dst
+                rev_link = Link(dst, link.src)
+                if rev_link not in deleted:
+                    # It is very likely that the reverse link is also
+                    # disconnected. Check it early.
+                    expire = now - self.LINK_TIMEOUT
+                    self.links.rev_link_set_timestamp(rev_link, expire)
+                    if dst in self.ports:
+                        self.ports.move_front(dst)
+                        self.lldp_event.set()
+
+            self.link_event.wait(timeout=self.TIMEOUT_CHECK_PERIOD)
+
     @set_ev_cls(event.EventSwitchRequest)
     def switch_request_handler(self, req):
-        LOG.debug(req)
+        # LOG.debug(req)
         dpid = req.dpid
 
         switches = []
         if dpid is None:
             # reply all list
             for dp in self.dps.itervalues():
-                switches.append(self._get_switch(dp))
+                switches.append(self._get_switch(dp.id))
         elif dpid in self.dps:
-            switches.append(self._get_switch(self.dps[dpid]))
+            switches.append(self._get_switch(dpid))
 
         rep = event.EventSwitchReply(req.src, switches)
         if req.sync:
@@ -212,10 +787,33 @@ class Switches(app_manager.RyuApp):
         else:
             self.send_event(req.src, rep)
 
+    @set_ev_cls(event.EventLinkRequest)
+    def link_request_handler(self, req):
+        # LOG.debug(req)
+        dpid = req.dpid
+
+        if dpid is None:
+            links = self.links
+        else:
+            links = [link for link in self.links if link.src.dpid == dpid]
+        rep = event.EventLinkReply(req.src, dpid, links)
+        if req.sync:
+            self.send_reply(rep)
+        else:
+            self.send_event(req.src, rep)
+
 
-def get(app, dpid=None):
+def get_switch(app, dpid=None):
     return app.send_request(event.EventSwitchRequest(dpid))
 
 
-def get_all(app):
-    return get(app)
+def get_all_switch(app):
+    return get_switch(app)
+
+
+def get_link(app, dpid=None):
+    return app.send_request(event.EventLinkRequest(dpid))
+
+
+def get_all_link(app):
+    return get_link(app)
-- 
1.7.9.5



------------------------------------------------------------------------------
Everyone hates slow websites. So do we.
Make your web apps faster with AppDynamics
Download AppDynamics Lite for free today:
http://p.sf.net/sfu/appdyn_d2d_mar
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to