Hi everyone,
Attached is the "final" patch addressing the following issues

1) Reloading the config for an fcgi process group did not close the fcgi
socket - now, the socket is closed whenever the group is stopped as a unit
(including during config update).  However, if you stop all the processes in
a group individually, the socket will remain open to allow for graceful
restarts of FCGI daemons
2) Rereading the config did not pick up changes to the socket parameter in a
fcgi-program section - this was a simple fix requiring a little
customization of the __eq__() method
3) Made a more friendly exception message when a FCGI socket cannot be
created

Let me know if you find any issues.  Thanks,

Roger

On Fri, Aug 28, 2009 at 6:38 AM, Marco Vittorini Orgeas <ma...@mavior.eu>wrote:

> All right,
>
> just drop a mail when the final patch is out.
> thanks again,
>
> --
> Marco
>
> On Thu, August 27, 2009 9:07 pm, Roger Hoover wrote:
> > Hi Marco,
> >
> > My original patch is functional and tested just not as ideal as I want.
>  I
> > got most of way toward a final patch before getting swamped with a hard
> > deadline at work.  I'll probably be able to get it finished early next
> > week.
> >
> > Roger
> >
> > On Thu, Aug 27, 2009 at 7:24 AM, Marco Vittorini Orgeas
> > <ma...@mavior.eu>wrote:
> >
> >> Hi Roger,
> >> did you end up with a final patch for this or the good one is the one
> >> you
> >> already have attached?
> >> If you have any new, can you please post it here or write the revision
> >> number in
> >> which is contained, I'd like to test against latest stable.
> >>
> >>
> >> Please keep me CC'ed,
> >>
> >> thank you
> >>
> >> Marco
> >>
> >> _______________________________________________
> >> Supervisor-users mailing list
> >> Supervisor-users@lists.supervisord.org
> >> http://lists.supervisord.org/mailman/listinfo/supervisor-users
> >>
> >
>
>
>
Index: src/supervisor/options.py
===================================================================
--- src/supervisor/options.py   (revision 881)
+++ src/supervisor/options.py   (working copy)
@@ -59,8 +59,6 @@
 from supervisor.datatypes import profile_options
 from supervisor.datatypes import set_here
 
-from supervisor.socket_manager import SocketManager
-
 from supervisor import loggers
 from supervisor import states
 from supervisor import xmlrpc
@@ -664,7 +662,7 @@
                                                   FastCGIProcessConfig)
             groups.append(
                 FastCGIGroupConfig(self, program_name, priority, processes,
-                                   SocketManager(socket_config))
+                                   socket_config)
                 )
         
 
@@ -681,8 +679,7 @@
             path = normalize_path(path)
             return UnixStreamSocketConfig(path)
         
-        tcp_re = re.compile(r'^tcp://([^\s:]+):(\d+)$')
-        m = tcp_re.match(sock)
+        m = re.match(r'tcp://([^\s:]+):(\d+)$', sock)
         if m:
             host = m.group(1)
             port = int(m.group(2))
@@ -1557,6 +1554,7 @@
         return dispatchers, p
 
 class FastCGIProcessConfig(ProcessConfig):
+    
     def make_process(self, group=None):
         if group is None:
             raise NotImplementedError('FastCGI programs require a group')
@@ -1627,16 +1625,25 @@
 
 class FastCGIGroupConfig(ProcessGroupConfig):        
     def __init__(self, options, name, priority, process_configs,
-                 socket_manager):
+                 socket_config):
         self.options = options
         self.name = name
         self.priority = priority
         self.process_configs = process_configs
-        self.socket_manager = socket_manager
+        self.socket_config = socket_config
 
-    def after_setuid(self):
-        ProcessGroupConfig.after_setuid(self)
-        self.socket_manager.prepare_socket()
+    def __eq__(self, other):
+        if not isinstance(other, FastCGIGroupConfig):
+            return False
+        
+        if self.socket_config != other.socket_config:
+            return False
+            
+        return ProcessGroupConfig.__eq__(self, other)
+        
+    def make_group(self):
+        from supervisor.process import FastCGIProcessGroup
+        return FastCGIProcessGroup(self)        
     
 def readFile(filename, offset, length):
     """ Read length bytes from the file named by filename starting at
Index: src/supervisor/datatypes.py
===================================================================
--- src/supervisor/datatypes.py (revision 881)
+++ src/supervisor/datatypes.py (working copy)
@@ -184,7 +184,22 @@
         return '<%s at %s for %s>' % (self.__class__,
                                       id(self),
                                       self.url)
+                                      
+    def __str__(self):
+        return str(self.url)
+        
+    def __eq__(self, other):
+        if not isinstance(other, SocketConfig):
+            return False
 
+        if self.url != other.url:
+            return False
+
+        return True
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
     def addr(self):
         raise NotImplementedError
         
Index: src/supervisor/rpcinterface.py
===================================================================
--- src/supervisor/rpcinterface.py      (revision 881)
+++ src/supervisor/rpcinterface.py      (working copy)
@@ -427,6 +427,8 @@
         if group is None:
             raise RPCError(Faults.BAD_NAME, name)
 
+        group.stop_requested()
+
         processes = group.processes.values()
         processes.sort()
         processes = [ (group, process) for process in processes ]
Index: src/supervisor/tests/test_options.py
===================================================================
--- src/supervisor/tests/test_options.py        (revision 881)
+++ src/supervisor/tests/test_options.py        (working copy)
@@ -15,7 +15,7 @@
 from supervisor.tests.base import DummyPConfig
 from supervisor.tests.base import DummyPGroupConfig
 from supervisor.tests.base import DummyProcess
-from supervisor.tests.base import DummySocketManager
+from supervisor.tests.base import DummySocketConfig
 from supervisor.tests.base import lstrip
 
 class OptionTests(unittest.TestCase):
@@ -806,7 +806,7 @@
         self.assertEqual(gconfig0.__class__, FastCGIGroupConfig)
         self.assertEqual(gconfig0.name, 'foo')
         self.assertEqual(gconfig0.priority, 1)
-        self.assertEqual(gconfig0.socket_manager.config().url,
+        self.assertEqual(gconfig0.socket_config.url,
                          'unix:///tmp/foo.sock')
         self.assertEqual(len(gconfig0.process_configs), 2)
         self.assertEqual(gconfig0.process_configs[0].__class__,
@@ -817,7 +817,7 @@
         gconfig1 = gconfigs[1]
         self.assertEqual(gconfig1.name, 'bar')
         self.assertEqual(gconfig1.priority, 999)
-        self.assertEqual(gconfig1.socket_manager.config().url,
+        self.assertEqual(gconfig1.socket_config.url,
                          'tcp://localhost:6000')
         self.assertEqual(len(gconfig1.process_configs), 3)
 
@@ -1300,23 +1300,36 @@
 
     def test_ctor(self):
         options = DummyOptions()
-        sock_manager = DummySocketManager(6)
-        instance = self._makeOne(options, 'whatever', 999, [], sock_manager)
+        sock_config = DummySocketConfig(6)
+        instance = self._makeOne(options, 'whatever', 999, [], sock_config)
         self.assertEqual(instance.options, options)
         self.assertEqual(instance.name, 'whatever')
         self.assertEqual(instance.priority, 999)
         self.assertEqual(instance.process_configs, [])
-        self.assertEqual(instance.socket_manager, sock_manager)
+        self.assertEqual(instance.socket_config, sock_config)
     
-    def test_after_setuid(self):
+    def test_same_sockets_are_equal(self):
         options = DummyOptions()
-        sock_manager = DummySocketManager(6)
-        pconfigs = [DummyPConfig(options, 'process1', '/bin/process1')]
-        instance = self._makeOne(options, 'whatever', 999, pconfigs, 
sock_manager)
-        instance.after_setuid()
-        self.assertTrue(pconfigs[0].autochildlogs_created)
-        self.assertTrue(instance.socket_manager.prepare_socket_called)
+        sock_config1 = DummySocketConfig(6)
+        instance1 = self._makeOne(options, 'whatever', 999, [], sock_config1)
 
+        sock_config2 = DummySocketConfig(6)
+        instance2 = self._makeOne(options, 'whatever', 999, [], sock_config2)
+
+        self.assertTrue(instance1 == instance2)
+        self.assertFalse(instance1 != instance2)
+
+    def test_diff_sockets_are_not_equal(self):
+        options = DummyOptions()
+        sock_config1 = DummySocketConfig(6)
+        instance1 = self._makeOne(options, 'whatever', 999, [], sock_config1)
+
+        sock_config2 = DummySocketConfig(7)
+        instance2 = self._makeOne(options, 'whatever', 999, [], sock_config2)
+
+        self.assertTrue(instance1 != instance2)
+        self.assertFalse(instance1 == instance2)
+
 class UtilFunctionsTests(unittest.TestCase):
     def test_make_namespec(self):
         from supervisor.options import make_namespec
Index: src/supervisor/tests/test_datatypes.py
===================================================================
--- src/supervisor/tests/test_datatypes.py      (revision 881)
+++ src/supervisor/tests/test_datatypes.py      (working copy)
@@ -183,6 +183,25 @@
         self.assertTrue(reuse)
         sock.close
         
+    def test_same_urls_are_equal(self):
+        conf1 = self._makeOne('localhost', '5001')
+        conf2 = self._makeOne('localhost', '5001')
+        self.assertTrue(conf1 == conf2)
+        self.assertFalse(conf1 != conf2)
+
+    def test_diff_urls_are_not_equal(self):
+        conf1 = self._makeOne('localhost', '5001')
+        conf2 = self._makeOne('localhost', '5002')
+        self.assertTrue(conf1 != conf2)
+        self.assertFalse(conf1 == conf2)
+
+    def test_diff_objs_are_not_equal(self):
+        conf1 = self._makeOne('localhost', '5001')
+        conf2 = 'blah'
+        self.assertTrue(conf1 != conf2)
+        self.assertFalse(conf1 == conf2)    
+        
+        
 class UnixStreamSocketConfigTests(unittest.TestCase):
     def _getTargetClass(self):
         return datatypes.UnixStreamSocketConfig
@@ -213,6 +232,24 @@
         sock = conf.create()
         self.assertFalse(os.path.exists(tf_name))
         sock.close
+        
+    def test_same_paths_are_equal(self):
+        conf1 = self._makeOne('/tmp/foo.sock')
+        conf2 = self._makeOne('/tmp/foo.sock')
+        self.assertTrue(conf1 == conf2)
+        self.assertFalse(conf1 != conf2)
+        
+    def test_diff_paths_are_not_equal(self):
+        conf1 = self._makeOne('/tmp/foo.sock')
+        conf2 = self._makeOne('/tmp/bar.sock')
+        self.assertTrue(conf1 != conf2)
+        self.assertFalse(conf1 == conf2)
+        
+    def test_diff_objs_are_not_equal(self):
+        conf1 = self._makeOne('/tmp/foo.sock')
+        conf2 = 'blah'
+        self.assertTrue(conf1 != conf2)
+        self.assertFalse(conf1 == conf2)    
 
 def test_suite():
     return unittest.findTestCases(sys.modules[__name__])
Index: src/supervisor/tests/base.py
===================================================================
--- src/supervisor/tests/base.py        (revision 881)
+++ src/supervisor/tests/base.py        (working copy)
@@ -293,6 +293,63 @@
     def get_state(self):
         return self.options.mood
 
+class DummySocket:
+    bind_called = False
+    bind_addr = None
+    listen_called = False
+    listen_backlog = None
+    close_called = False
+
+    def __init__(self, fd):
+        self.fd = fd
+
+    def fileno(self):
+        return self.fd
+
+    def bind(self, addr):
+        self.bind_called = True
+        self.bind_addr = addr
+
+    def listen(self, backlog):
+        self.listen_called = True
+        self.listen_backlog = backlog
+
+    def close(self):
+        self.close_called = True
+
+    def __str__(self):
+        return 'dummy socket'
+
+class DummySocketConfig:
+    def __init__(self, fd):
+        self.fd = fd
+
+    def addr(self):
+        return 'dummy addr'
+
+    def __eq__(self, other):
+        return self.fd == other.fd
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def create(self):
+        return DummySocket(self.fd)
+
+class DummySocketManager:
+    def __init__(self, config, **kwargs):
+        self._config = config
+        self.request_close_called = False
+
+    def config(self):
+        return self._config
+
+    def get_socket(self):
+        return DummySocket(self._config.fd)
+        
+    def request_close(self):
+        self.request_close_called = True
+
 class DummyProcess:
     # Initial state; overridden by instance variables
     pid = 0 # Subprocess pid; 0 when not running
@@ -861,9 +918,9 @@
                                                  self.name)
 
 class DummyFCGIGroupConfig(DummyPGroupConfig):
-    def __init__(self, options, name, priority, pconfigs, socket_manager):
+    def __init__(self, options, name='whatever', priority=999, pconfigs=None, 
socket_config=DummySocketConfig(1)):
         DummyPGroupConfig.__init__(self, options, name, priority, pconfigs)
-        self.socket_manager = socket_manager
+        self.socket_config = socket_config
 
 class DummyProcessGroup:
     def __init__(self, config):
@@ -872,18 +929,28 @@
         self.all_stopped = False
         self.dispatchers = {}
         self.unstopped_processes = []
+        self.stop_was_requested = False
 
     def transition(self):
         self.transitioned = True
 
     def stop_all(self):
         self.all_stopped = True
+        
+    def stop_requested(self):
+        self.stop_was_requested = True
 
     def get_unstopped_processes(self):
         return self.unstopped_processes
 
     def get_dispatchers(self):
         return self.dispatchers
+        
+class DummyFCGIProcessGroup(DummyProcessGroup):
+    
+    def __init__(self, config):
+        DummyProcessGroup.__init__(self, config)
+        self.socket_manager = DummySocketManager(config.socket_config)
 
 class PopulatedDummySupervisor(DummySupervisor):
     def __init__(self, options, group_name, *pconfigs):
@@ -977,55 +1044,7 @@
 
     def __str__(self):
         return 'dummy event'
-
-class DummySocket:
-    bind_called = False
-    bind_addr = None
-    listen_called = False
-    listen_backlog = None
-    close_called = False
-    
-    def __init__(self, fd):
-        self.fd = fd
         
-    def fileno(self):
-        return self.fd
-
-    def bind(self, addr):
-        self.bind_called = True
-        self.bind_addr = addr
-        
-    def listen(self, backlog):
-        self.listen_called = True
-        self.listen_backlog = backlog
-        
-    def close(self):
-        self.close_called = True
-
-    def __str__(self):
-        return 'dummy socket'
-
-class DummySocketConfig:
-    def __init__(self, fd):
-        self.fd = fd
-    
-    def addr(self):
-        return 'dummy addr'
-        
-    def create(self):
-        return DummySocket(self.fd)
-
-class DummySocketManager:
-    def __init__(self, sock_fd):
-        self.sock_fd = sock_fd
-        self.prepare_socket_called = False
-    
-    def prepare_socket(self):
-        self.prepare_socket_called = True
-        
-    def get_socket(self):
-        return DummySocket(self.sock_fd)
-        
 def dummy_handler(event, result):
     pass
 
Index: src/supervisor/tests/test_process.py
===================================================================
--- src/supervisor/tests/test_process.py        (revision 881)
+++ src/supervisor/tests/test_process.py        (working copy)
@@ -12,8 +12,10 @@
 from supervisor.tests.base import DummyDispatcher
 from supervisor.tests.base import DummyEvent
 from supervisor.tests.base import DummyFCGIGroupConfig
+from supervisor.tests.base import DummySocketConfig
+from supervisor.tests.base import DummyProcessGroup
+from supervisor.tests.base import DummyFCGIProcessGroup
 from supervisor.tests.base import DummySocketManager
-from supervisor.tests.base import DummyProcessGroup
 
 class SubprocessTests(unittest.TestCase):
     def _getTargetClass(self):
@@ -1185,10 +1187,10 @@
         options.forkpid = 0
         config = DummyPConfig(options, 'good', '/good/filename', uid=1)
         instance = self._makeOne(config)
-        sock_manager = DummySocketManager(7)
+        sock_config = DummySocketConfig(7)
         gconfig = DummyFCGIGroupConfig(options, 'whatever', 999, None, 
-                                       sock_manager)
-        instance.group = DummyProcessGroup(gconfig)
+                                       sock_config)
+        instance.group = DummyFCGIProcessGroup(gconfig)
         result = instance.spawn()
         self.assertEqual(result, None)
         self.assertEqual(len(options.duped), 3)
@@ -1203,15 +1205,35 @@
         config = DummyPConfig(options, 'good', '/good/filename', uid=1)
         config.redirect_stderr = True
         instance = self._makeOne(config)
-        sock_manager = DummySocketManager(13)
+        sock_config = DummySocketConfig(13)
         gconfig = DummyFCGIGroupConfig(options, 'whatever', 999, None, 
-                                       sock_manager)
-        instance.group = DummyProcessGroup(gconfig)
+                                       sock_config)
+        instance.group = DummyFCGIProcessGroup(gconfig)
         result = instance.spawn()
         self.assertEqual(result, None)
         self.assertEqual(len(options.duped), 2)
         self.assertEqual(options.duped[13], 0)
         self.assertEqual(len(options.fds_closed), options.minfds - 3)
+        
+    def test_before_spawn_gets_socket_ref(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'good', '/good/filename', uid=1)
+        instance = self._makeOne(config)
+        sock_config = DummySocketConfig(7)
+        gconfig = DummyFCGIGroupConfig(options, 'whatever', 999, None, 
+                                       sock_config)
+        instance.group = DummyFCGIProcessGroup(gconfig)
+        self.assertTrue(instance.fcgi_sock is None)
+        instance.before_spawn()
+        self.assertFalse(instance.fcgi_sock is None)
+        
+    def test_after_finish_removes_socket_ref(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'good', '/good/filename', uid=1)
+        instance = self._makeOne(config)
+        instance.fcgi_sock = 'hello'
+        instance.after_finish()
+        self.assertTrue(instance.fcgi_sock is None)
 
 class ProcessGroupBaseTests(unittest.TestCase):
     def _getTargetClass(self):
@@ -1340,6 +1362,21 @@
         group.transition()
         self.assertEqual(process1.transitioned, True)
         
+class FastCGIProcessGroupTests(unittest.TestCase):
+    def _getTargetClass(self):
+        from supervisor.process import FastCGIProcessGroup
+        return FastCGIProcessGroup
+        
+    def _makeOne(self, *args, **kw):
+        return self._getTargetClass()(*args, **kw)
+        
+    def test_stop_requested_signals_socket_close(self):
+        options = DummyOptions()
+        gconfig = DummyFCGIGroupConfig(options)
+        group = self._makeOne(gconfig, socketManager=DummySocketManager)
+        group.stop_requested()
+        self.assertTrue(group.socket_manager.request_close_called)
+        
 class EventListenerPoolTests(ProcessGroupBaseTests):
     def setUp(self):
         from supervisor.events import clear
Index: src/supervisor/tests/test_socket_manager.py
===================================================================
--- src/supervisor/tests/test_socket_manager.py (revision 881)
+++ src/supervisor/tests/test_socket_manager.py (working copy)
@@ -10,6 +10,80 @@
 from supervisor.datatypes import UnixStreamSocketConfig
 from supervisor.datatypes import InetStreamSocketConfig
 
+class TestObject():
+    
+    def __init__(self):
+        self.value = 5
+    
+    def getValue(self):
+        return self.value
+        
+    def setValue(self, val):
+        self.value = val
+
+class ProxyTest(unittest.TestCase):
+    
+    def setUp(self):
+        self.on_deleteCalled = False
+    
+    def _getTargetClass(self):
+        from supervisor.socket_manager import Proxy
+        return Proxy
+
+    def _makeOne(self, *args, **kw):
+        return self._getTargetClass()(*args, **kw)
+    
+    def setOnDeleteCalled(self):
+        self.on_deleteCalled = True
+    
+    def test_proxy_getattr(self):
+        proxy = self._makeOne(TestObject())
+        self.assertEquals(5, proxy.getValue())
+        
+    def test_on_delete(self):
+        proxy = self._makeOne(TestObject(), on_delete=self.setOnDeleteCalled)
+        self.assertEquals(5, proxy.getValue())
+        proxy = None
+        self.assertTrue(self.on_deleteCalled)
+        
+class ReferenceCounterTest(unittest.TestCase):
+
+    def setUp(self):
+        self.running = False
+
+    def start(self):
+        self.running = True
+        
+    def stop(self):
+        self.running = False
+
+    def _getTargetClass(self):
+        from supervisor.socket_manager import ReferenceCounter
+        return ReferenceCounter
+
+    def _makeOne(self, *args, **kw):
+        return self._getTargetClass()(*args, **kw)
+
+    def test_incr_and_decr(self):
+        ctr = self._makeOne(on_zero=self.stop,on_non_zero=self.start)
+        self.assertFalse(self.running)
+        ctr.increment()
+        self.assertTrue(self.running)
+        self.assertEquals(1, ctr.get_count())
+        ctr.increment()
+        self.assertTrue(self.running)
+        self.assertEquals(2, ctr.get_count())
+        ctr.decrement()
+        self.assertTrue(self.running)
+        self.assertEquals(1, ctr.get_count())
+        ctr.decrement()
+        self.assertFalse(self.running)
+        self.assertEquals(0, ctr.get_count())
+    
+    def test_decr_at_zero_raises_error(self):
+        ctr = self._makeOne(on_zero=self.stop,on_non_zero=self.start)
+        self.assertRaises(Exception, ctr.decrement)
+        
 class SocketManagerTest(unittest.TestCase):
     def _getTargetClass(self):
         from supervisor.socket_manager import SocketManager
@@ -29,7 +103,7 @@
         self.assertEqual(sock_manager.socket_config, conf)
         sock = sock_manager.get_socket()
         self.assertEqual(sock.getsockname(), ('127.0.0.1', 12345))
-        sock_manager.close()
+        sock_manager.request_close()
 
     def test_tcp_w_ip(self):
         conf = InetStreamSocketConfig('127.0.0.1', 12345)
@@ -37,7 +111,7 @@
         self.assertEqual(sock_manager.socket_config, conf)
         sock = sock_manager.get_socket()
         self.assertEqual(sock.getsockname(), ('127.0.0.1', 12345))
-        sock_manager.close()
+        sock_manager.request_close()
 
     def test_unix(self):
         (tf_fd, tf_name) = tempfile.mkstemp();
@@ -46,54 +120,81 @@
         self.assertEqual(sock_manager.socket_config, conf)
         sock = sock_manager.get_socket()
         self.assertEqual(sock.getsockname(), tf_name)
-        sock_manager.close()
+        sock = None
+        sock_manager.request_close()
         os.close(tf_fd)
         
-    def test_get_socket(self):
+    def test_socket_lifecycle(self):
         conf = DummySocketConfig(2)
         sock_manager = self._makeOne(conf)
+        #Assert that sockets are created on demand
+        self.assertFalse(sock_manager.is_prepared())
+        #Get two socket references
         sock = sock_manager.get_socket()
+        self.assertTrue(sock_manager.is_prepared()) #socket created on demand
+        sock_id = id(sock._get())
         sock2 = sock_manager.get_socket()
-        self.assertEqual(sock, sock2)
-        sock_manager.close()
+        sock2_id = id(sock2._get())
+        #Assert that they are not the same proxy object
+        self.assertNotEqual(sock, sock2)
+        #Assert that they are the same underlying socket
+        self.assertEqual(sock_id, sock2_id)
+        #Request socket close
+        sock_manager.request_close()
+        #Socket not actually closed yet b/c ref ct is 2
+        self.assertTrue(sock_manager.is_prepared())
+        self.assertFalse(sock_manager.socket.close_called)
+        sock = None
+        #Socket not actually closed yet b/c ref ct is 1
+        self.assertTrue(sock_manager.is_prepared())
+        self.assertFalse(sock_manager.socket.close_called)
+        sock2 = None
+        #Socket closed
+        self.assertFalse(sock_manager.is_prepared())
+        self.assertTrue(sock_manager.socket.close_called)
+        
+        #Get a new socket reference
         sock3 = sock_manager.get_socket()
-        self.assertNotEqual(sock, sock3)
+        self.assertTrue(sock_manager.is_prepared())
+        sock3_id = id(sock3._get())
+        #Assert that it is not the same socket
+        self.assertNotEqual(sock_id, sock3_id)
+        #Drop ref ct to zero
+        del sock3
+        #Assert that socket is still open until close is requested
+        self.assertTrue(sock_manager.is_prepared())
+        self.assertFalse(sock_manager.socket.close_called)
+        sock_manager.request_close()
+        #Now assert that socket is closed
+        self.assertFalse(sock_manager.is_prepared())
+        self.assertTrue(sock_manager.socket.close_called)
 
     def test_prepare_socket(self):
         conf = DummySocketConfig(1)
         sock_manager = self._makeOne(conf)
         sock = sock_manager.get_socket()
-        self.assertTrue(sock_manager.prepared)
+        self.assertTrue(sock_manager.is_prepared())
         self.assertTrue(sock.bind_called)
         self.assertEqual(sock.bind_addr, 'dummy addr')
         self.assertTrue(sock.listen_called)
         self.assertEqual(sock.listen_backlog, socket.SOMAXCONN)
         self.assertFalse(sock.close_called)
-
-    def test_close(self):
-        conf = DummySocketConfig(6)
-        sock_manager = self._makeOne(conf)
-        sock = sock_manager.get_socket()
-        self.assertFalse(sock.close_called)
-        self.assertTrue(sock_manager.prepared)
-        sock_manager.close()
-        self.assertFalse(sock_manager.prepared)
-        self.assertTrue(sock.close_called)
+        sock_manager.request_close()
     
     def test_tcp_socket_already_taken(self):
         conf = InetStreamSocketConfig('127.0.0.1', 12345)
         sock_manager = self._makeOne(conf)
-        sock_manager.get_socket()
+        sock = sock_manager.get_socket()
         sock_manager2 = self._makeOne(conf)
-        self.assertRaises(socket.error, sock_manager2.prepare_socket)
-        sock_manager.close()
+        self.assertRaises(socket.error, sock_manager2.get_socket)
+        sock = None
+        sock_manager.request_close()
         
     def test_unix_bad_sock(self):
         conf = UnixStreamSocketConfig('/notthere/foo.sock')
         sock_manager = self._makeOne(conf)
-        self.assertRaises(socket.error, sock_manager.get_socket)
-        sock_manager.close()
-            
+        self.assertRaises(socket.error, sock_manager.get_socket)        
+
 def test_suite():
     return unittest.findTestCases(sys.modules[__name__])
 
Index: src/supervisor/tests/test_rpcinterfaces.py
===================================================================
--- src/supervisor/tests/test_rpcinterfaces.py  (revision 881)
+++ src/supervisor/tests/test_rpcinterfaces.py  (working copy)
@@ -732,6 +732,7 @@
         self.assertEqual(process1.stop_called, True)
         process2 = supervisord.process_groups['foo'].processes['process2']
         self.assertEqual(process2.stop_called, True)
+        self.assertTrue(supervisord.process_groups['foo'].stop_was_requested)
 
     def test_stopProcessGroup_nowait(self):
         options = DummyOptions()
Index: src/supervisor/process.py
===================================================================
--- src/supervisor/process.py   (revision 881)
+++ src/supervisor/process.py   (working copy)
@@ -38,6 +38,8 @@
 
 from supervisor.datatypes import RestartUnconditionally
 
+from supervisor.socket_manager import SocketManager
+
 class Subprocess:
 
     """A class to manage a subprocess."""
@@ -233,7 +235,7 @@
             self._assertInState(ProcessStates.STARTING)
             self.change_state(ProcessStates.BACKOFF)
             return
-
+        
         try:
             pid = options.fork()
         except OSError, why:
@@ -557,18 +559,49 @@
 class FastCGISubprocess(Subprocess):
     """Extends Subprocess class to handle FastCGI subprocesses"""
 
-    def _prepare_child_fds(self):
+    def __init__(self, config):
+        Subprocess.__init__(self, config)
+        self.fcgi_sock = None
+
+    def before_spawn(self):
+        """
+        The FastCGI socket needs to be created by the parent before we fork
+        """
         if self.group is None:
             raise NotImplementedError('No group set for FastCGISubprocess')
-        if not hasattr(self.group, 'config'):
-            raise NotImplementedError('No config found for group on '
-                                      'FastCGISubprocess')
-        if not hasattr(self.group.config, 'socket_manager'):
+        if not hasattr(self.group, 'socket_manager'):
             raise NotImplementedError('No SocketManager set for '
-                                      'FastCGISubprocess group')
-        sock = self.group.config.socket_manager.get_socket()
-        sock_fd = sock.fileno()
+                                      '%s:%s' % (self.group, dir(self.group)))
+        self.fcgi_sock = self.group.socket_manager.get_socket()
+
+    def spawn(self):
+        """
+        Overrides Subprocess.spawn() so we can hook in before it happens
+        """
+        self.before_spawn()
+        Subprocess.spawn(self)
         
+    def after_finish(self):
+        """
+        Releases reference to FastCGI socket when process is reaped
+        """
+        #Remove object reference to decrement the reference count
+        self.fcgi_sock = None
+        
+    def finish(self, pid, sts):
+        """
+        Overrides Subprocess.finish() so we can hook in after it happens
+        """
+        Subprocess.finish(self, pid, sts)
+        self.after_finish()
+
+    def _prepare_child_fds(self):
+        """
+        Overrides Subprocess._prepare_child_fds()
+        The FastCGI socket needs to be set to file descriptor 0 in the child
+        """
+        sock_fd = self.fcgi_sock.fileno()
+        
         options = self.config.options
         options.dup2(sock_fd, 0)
         options.dup2(self.pipes['child_stdout'], 1)
@@ -578,7 +611,7 @@
             options.dup2(self.pipes['child_stderr'], 2)
         for i in range(3, options.minfds):
             options.close_fd(i)
-    
+                
 class ProcessGroupBase:
     def __init__(self, config):
         self.config = config
@@ -602,6 +635,12 @@
         for process in self.processes.values():
             process.reopenlogs()
 
+    def stop_requested(self):
+        """ Hook so that the process group gets notified by
+            it's geting stopped by an RPC interface call
+        """
+        pass
+
     def stop_all(self):
         processes = self.processes.values()
         processes.sort()
@@ -634,7 +673,30 @@
     def transition(self):
         for proc in self.processes.values():
             proc.transition()
+            
+class FastCGIProcessGroup(ProcessGroup):
 
+    def __init__(self, config, **kwargs):
+        ProcessGroup.__init__(self, config)
+        sockManagerKlass = kwargs.get('socketManager', SocketManager)
+        self.socket_manager = sockManagerKlass(config.socket_config, 
+                                               logger=config.options.logger)
+        #It's not required to call get_socket() here but we want
+        #to fail early during start up if there is a config error
+        try:
+            sock = self.socket_manager.get_socket()
+        except Exception, e:
+            raise ValueError('Could not create FastCGI socket %s: %s' % 
(self.socket_manager.config(), e))
+
+    def stop_requested(self):
+        """ Overriden from ProcessGroup
+            Request close on FCGI socket (it will actually be close when all
+            the child processes are reaped)
+        """
+        self.config.options.logger.debug('Stop requested for fcgi group %s' 
+                                         % self.config.name)
+        self.socket_manager.request_close()
+
 class EventListenerPool(ProcessGroupBase):
     def __init__(self, config):
         ProcessGroupBase.__init__(self, config)
Index: src/supervisor/socket_manager.py
===================================================================
--- src/supervisor/socket_manager.py    (revision 881)
+++ src/supervisor/socket_manager.py    (working copy)
@@ -14,16 +14,63 @@
 
 import socket
 
+class Proxy():
+    """ Class for wrapping a shared resource object and getting
+        notified when it's deleted
+    """
+    
+    def __init__(self, object, **kwargs):
+        self.object = object
+        self.on_delete = kwargs.get('on_delete', None)
+
+    def __del__(self):
+        if self.on_delete:
+            self.on_delete()
+    
+    def __getattr__(self, name):
+        return getattr(self.object, name)
+        
+    def _get(self):
+        return self.object
+        
+class ReferenceCounter():
+    """ Class for tracking references to a shared resource
+    """
+    
+    def __init__(self, **kwargs):
+        self.on_non_zero = kwargs['on_non_zero']
+        self.on_zero = kwargs['on_zero']
+        self.ref_count = 0
+    
+    def get_count(self):
+        return self.ref_count
+    
+    def increment(self):
+        if self.ref_count == 0:
+            self.on_non_zero()
+        self.ref_count = self.ref_count + 1
+        
+    def decrement(self):
+        if self.ref_count <= 0:
+            raise Exception('Illegal operation: cannot decrement below zero')
+        self.ref_count -= 1
+        if self.ref_count == 0:
+            self.on_zero()
+
 class SocketManager:
     """ Class for managing sockets in servers that create/bind/listen
-        before forking multiple child processes to accept() """
-        
-    socket_config = None #SocketConfig object
-    socket = None #Socket being managed
-    prepared = False
+        before forking multiple child processes to accept() 
+        Sockets are managed at the process group level and referenced counted
+        at the process level b/c that's really the only place to hook in
+    """
     
-    def __init__(self, socket_config):
+    def __init__(self, socket_config, **kwargs):
+        self.logger = kwargs.get('logger', None)
+        self.socket = None
+        self.prepared = False
         self.socket_config = socket_config
+        self.close_requested = False
+        self.ref_ctr = ReferenceCounter(on_zero=self._on_ref_ct_zero, 
on_non_zero=self._prepare_socket)
         
     def __repr__(self):
         return '<%s at %s for %s>' % (self.__class__,
@@ -33,17 +80,46 @@
     def config(self):
         return self.socket_config
         
-    def prepare_socket(self):
-        self.socket = self.socket_config.create()
-        self.socket.bind(self.socket_config.addr())
-        self.socket.listen(socket.SOMAXCONN)
-        self.prepared = True
+    def is_prepared(self):
+        return self.prepared
+
+    def get_socket(self):
+        self.ref_ctr.increment()
+        self._require_prepared()
+        return Proxy(self.socket, on_delete=self.ref_ctr.decrement)
         
-    def get_socket(self):
+    def get_socket_ref_count(self):
+        self._require_prepared()
+        return self.ref_ctr.get_count()
+    
+    def request_close(self):
+        if self.prepared:
+            if self.ref_ctr.get_count() == 0:
+                self._close()
+            else:
+                self.close_requested = True
+    
+    def _require_prepared(self):
         if not self.prepared:
-            self.prepare_socket()
-        return self.socket
-        
-    def close(self):
+            raise Exception('Socket has not been prepared')
+    
+    def _prepare_socket(self):
+        if not self.prepared:
+            if self.logger:
+                self.logger.info('Creating socket %s' % self.socket_config)
+            self.socket = self.socket_config.create()
+            self.socket.bind(self.socket_config.addr())
+            self.socket.listen(socket.SOMAXCONN)
+            self.prepared = True
+    
+    def _on_ref_ct_zero(self):
+        if self.close_requested:
+            self.close_requested = False
+            self._close()
+    
+    def _close(self):
+        self._require_prepared()
+        if self.logger:
+            self.logger.info('Closing socket %s' % self.socket_config)
         self.socket.close()
         self.prepared = False
_______________________________________________
Supervisor-users mailing list
Supervisor-users@lists.supervisord.org
http://lists.supervisord.org/mailman/listinfo/supervisor-users

Reply via email to