Signed-off-by: IWASE Yusuke <iwase.yusu...@gmail.com>
---
 ryu/services/protocols/bgp/net_ctrl.py | 36 ++++++++++++++++++++++------------
 1 file changed, 24 insertions(+), 12 deletions(-)

diff --git a/ryu/services/protocols/bgp/net_ctrl.py 
b/ryu/services/protocols/bgp/net_ctrl.py
index 400032c..972d0b9 100644
--- a/ryu/services/protocols/bgp/net_ctrl.py
+++ b/ryu/services/protocols/bgp/net_ctrl.py
@@ -94,19 +94,21 @@ class RpcSession(Activity):
     and utilities that use these. It also cares about socket communication w/
     RPC peer.
     """
+    NAME_FMT = 'RpcSession%s'
 
     def __init__(self, sock, outgoing_msg_sink_iter):
-        super(RpcSession, self).__init__("RpcSession(%s)" % sock)
+        self.peer_name = str(sock.getpeername())
+        super(RpcSession, self).__init__(self.NAME_FMT % self.peer_name)
         self._packer = msgpack.Packer(encoding='utf-8')
         self._unpacker = msgpack.Unpacker(encoding='utf-8')
         self._next_msgid = 0
         self._socket = sock
         self._outgoing_msg_sink_iter = outgoing_msg_sink_iter
-        self.peer_name = str(self._socket.getpeername())
         self.is_connected = True
 
     def stop(self):
         super(RpcSession, self).stop()
+        self.is_connected = False
         LOG.info('RPC Session to %s stopped', self.peer_name)
 
     def _run(self):
@@ -330,7 +332,8 @@ class _NetworkController(FlexinetPeer, Activity):
         # Outstanding requests, i.e. requests for which we are yet to receive
         # response from peer. We currently do not have any requests going out.
         self._outstanding_reqs = {}
-        self._rpc_session = None
+        # Dictionary for Peer name to RPC session.
+        self._rpc_sessions = {}
 
     def _run(self, *args, **kwargs):
         """Runs RPC server.
@@ -352,19 +355,28 @@ class _NetworkController(FlexinetPeer, Activity):
     def _start_rpc_session(self, sock):
         """Starts a new RPC session with given connection.
         """
-        if self._rpc_session and self._rpc_session.started:
-            self._rpc_session.stop()
+        session_name = RpcSession.NAME_FMT % str(sock.getpeername())
+        self._stop_child_activities(session_name)
 
-        self._rpc_session = RpcSession(sock, self)
-        self._rpc_session.start()
+        rpc_session = RpcSession(sock, self)
+        self._spawn_activity(rpc_session)
+
+    def _send_rpc_notification_to_session(self, session, method, params):
+        if not session.is_connected:
+            # Stops disconnected RPC session.
+            self._stop_child_activities(session.name)
+            return
+
+        return session.send_notification(method, params)
 
     def send_rpc_notification(self, method, params):
-        if not self.started or self._rpc_session is None:
+        if not self.started:
             return
-        elif not self._rpc_session.is_connected:
-            self._rpc_session = None
-        elif self._rpc_session.started:
-            return self._rpc_session.send_notification(method, params)
+
+        for session in list(self._child_activity_map.values()):
+            if not isinstance(session, RpcSession):
+                continue
+            self._send_rpc_notification_to_session(session, method, params)
 
 
 def _handle_response(response):
-- 
2.7.4


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most 
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
_______________________________________________
Ryu-devel mailing list
Ryu-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to