From 1f650bbe5d0f57c8d1f4f85ae15ae5c8b7f98fea Mon Sep 17 00:00:00 2001
From: "Victor J. Orlikowski" <vjo@duke.edu>
Date: Fri, 26 Feb 2016 11:42:46 -0500
Subject: [PATCH] Clean up socket close() handling still more. Also,
 temporarily work around a bug in eventlet's Queue.put() by wrapping the
 send_q with a semaphore.

---
 ryu/controller/controller.py | 62 ++++++++++++++++++++++++++++++--------------
 1 file changed, 42 insertions(+), 20 deletions(-)

diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
index 25b8776..da57a8c 100644
--- a/ryu/controller/controller.py
+++ b/ryu/controller/controller.py
@@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer
 import traceback
 import random
 import ssl
-from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error as SocketError
+from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout
 import warnings
 
 import ryu.base.app_manager
@@ -41,8 +41,8 @@ from ryu.ofproto import ofproto_protocol
 from ryu.ofproto import ofproto_v1_0
 from ryu.ofproto import nx_match
 
-from ryu.controller import handler
 from ryu.controller import ofp_event
+from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
 
 from ryu.lib.dpid import dpid_to_str
 
@@ -103,8 +103,15 @@ def _deactivate(method):
         try:
             method(self)
         finally:
-            self.send_active = False
-            self.set_state(handler.DEAD_DISPATCHER)
+            try:
+                self.socket.shutdown(SHUT_RDWR)
+            except (EOFError, IOError):
+                pass
+
+            if not self.is_active:
+                self.socket.close()
+            if self.state is not DEAD_DISPATCHER:
+                self.set_state(DEAD_DISPATCHER)
     return deactivate
 
 
@@ -117,19 +124,20 @@ class Datapath(ofproto_protocol.ProtocolDesc):
         self.socket.settimeout(CONF.socket_timeout)
         self.address = address
 
-        self.send_active = True
+        self.is_active = True
         self.close_requested = False
 
         # The limit is arbitrary. We need to limit queue size to
         # prevent it from eating memory up
         self.send_q = hub.Queue(16)
+        self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
 
         self.xid = random.randint(0, self.ofproto.MAX_XID)
         self.id = None  # datapath_id is unknown yet
         self._ports = None
         self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
         self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
-        self.set_state(handler.HANDSHAKE_DISPATCHER)
+        self.set_state(HANDSHAKE_DISPATCHER)
 
     def _get_ports(self):
         if (self.ofproto_parser is not None and
@@ -167,19 +175,17 @@ class Datapath(ofproto_protocol.ProtocolDesc):
         required_len = ofproto_common.OFP_HEADER_SIZE
 
         count = 0
-        while True:
+        while not self.close_requested:
             ret = ""
 
             try:
                 ret = self.socket.recv(required_len)
             except SocketTimeout:
-                if not self.close_requested:
-                    continue
-            except SocketError:
-                self.close_requested = True
+                continue
+            except (EOFError, IOError):
+                break
 
-            if (len(ret) == 0) or (self.close_requested):
-                self.socket.close()
+            if len(ret) == 0:
                 break
 
             buf += ret
@@ -215,30 +221,45 @@ class Datapath(ofproto_protocol.ProtocolDesc):
                     count = 0
                     hub.sleep(0)
 
-    @_deactivate
     def _send_loop(self):
         try:
-            while self.send_active:
+            while True:
                 buf = self.send_q.get()
+                self._send_q_sem.release()
                 self.socket.sendall(buf)
+        except SocketTimeout:
+            LOG.debug("Socket timed out while sending data to switch at address %s",
+                      self.address)
         except IOError as ioe:
-            LOG.debug("Socket error while sending data to switch at address %s: [%d] %s",
-                      self.address, ioe.errno, ioe.strerror)
+            # Convert ioe.errno to a string, just in case it was somehow set to None.
+            errno = "%s" % ioe.errno
+            LOG.debug("Socket error while sending data to switch at address %s: [%s] %s",
+                      self.address, errno, ioe.strerror)
         finally:
             q = self.send_q
             # first, clear self.send_q to prevent new references.
             self.send_q = None
-            # there might be threads currently blocking in send_q.put().
-            # unblock them by draining the queue.
+            # Now, drain the send_q, releasing the associated semaphore for each entry.
+            # This should release all threads waiting to acquire the semaphore.
             try:
                 while q.get(block=False):
-                    pass
+                    self._send_q_sem.release()
             except hub.QueueEmpty:
                 pass
+            # Finally, ensure the _recv_loop terminates.
+            self.close()
 
     def send(self, buf):
+        msg_enqueued = False
+        self._send_q_sem.acquire()
         if self.send_q:
             self.send_q.put(buf)
+            msg_enqueued = True
+        else:
+            self._send_q_sem.release()
+        if not msg_enqueued:
+            LOG.debug('Datapath in process of terminating; send() to %s discarded.',
+                      self.address)
 
     def set_xid(self, msg):
         self.xid += 1
@@ -266,6 +287,7 @@ class Datapath(ofproto_protocol.ProtocolDesc):
         finally:
             hub.kill(send_thr)
             hub.joinall([send_thr])
+            self.is_active = False
 
     #
     # Utility methods for convenience
-- 
2.5.4 (Apple Git-61)

