Hi everyone, Attached is the "final" patch addressing the following issues
1) Reloading the config for an fcgi process group did not close the fcgi socket - now, the socket is closed whenever the group is stopped as a unit (including during config update). However, if you stop all the processes in a group individually, the socket will remain open to allow for graceful restarts of FCGI daemons 2) Rereading the config did not pick up changes to the socket parameter in a fcgi-program section - this was a simple fix requiring a little customization of the __eq__() method 3) Made a more friendly exception message when a FCGI socket cannot be created Let me know if you find any issues. Thanks, Roger On Fri, Aug 28, 2009 at 6:38 AM, Marco Vittorini Orgeas <ma...@mavior.eu>wrote: > All right, > > just drop a mail when the final patch is out. > thanks again, > > -- > Marco > > On Thu, August 27, 2009 9:07 pm, Roger Hoover wrote: > > Hi Marco, > > > > My original patch is functional and tested just not as ideal as I want. > I > > got most of way toward a final patch before getting swamped with a hard > > deadline at work. I'll probably be able to get it finished early next > > week. > > > > Roger > > > > On Thu, Aug 27, 2009 at 7:24 AM, Marco Vittorini Orgeas > > <ma...@mavior.eu>wrote: > > > >> Hi Roger, > >> did you end up with a final patch for this or the good one is the one > >> you > >> already have attached? > >> If you have any new, can you please post it here or write the revision > >> number in > >> which is contained, I'd like to test against latest stable. > >> > >> > >> Please keep me CC'ed, > >> > >> thank you > >> > >> Marco > >> > >> _______________________________________________ > >> Supervisor-users mailing list > >> Supervisor-users@lists.supervisord.org > >> http://lists.supervisord.org/mailman/listinfo/supervisor-users > >> > > > > >
Index: src/supervisor/options.py =================================================================== --- src/supervisor/options.py (revision 881) +++ src/supervisor/options.py (working copy) @@ -59,8 +59,6 @@ from supervisor.datatypes import profile_options from supervisor.datatypes import set_here -from supervisor.socket_manager import SocketManager - from supervisor import loggers from supervisor import states from supervisor import xmlrpc @@ -664,7 +662,7 @@ FastCGIProcessConfig) groups.append( FastCGIGroupConfig(self, program_name, priority, processes, - SocketManager(socket_config)) + socket_config) ) @@ -681,8 +679,7 @@ path = normalize_path(path) return UnixStreamSocketConfig(path) - tcp_re = re.compile(r'^tcp://([^\s:]+):(\d+)$') - m = tcp_re.match(sock) + m = re.match(r'tcp://([^\s:]+):(\d+)$', sock) if m: host = m.group(1) port = int(m.group(2)) @@ -1557,6 +1554,7 @@ return dispatchers, p class FastCGIProcessConfig(ProcessConfig): + def make_process(self, group=None): if group is None: raise NotImplementedError('FastCGI programs require a group') @@ -1627,16 +1625,25 @@ class FastCGIGroupConfig(ProcessGroupConfig): def __init__(self, options, name, priority, process_configs, - socket_manager): + socket_config): self.options = options self.name = name self.priority = priority self.process_configs = process_configs - self.socket_manager = socket_manager + self.socket_config = socket_config - def after_setuid(self): - ProcessGroupConfig.after_setuid(self) - self.socket_manager.prepare_socket() + def __eq__(self, other): + if not isinstance(other, FastCGIGroupConfig): + return False + + if self.socket_config != other.socket_config: + return False + + return ProcessGroupConfig.__eq__(self, other) + + def make_group(self): + from supervisor.process import FastCGIProcessGroup + return FastCGIProcessGroup(self) def readFile(filename, offset, length): """ Read length bytes from the file named by filename starting at Index: src/supervisor/datatypes.py =================================================================== --- src/supervisor/datatypes.py (revision 881) +++ src/supervisor/datatypes.py (working copy) @@ -184,7 +184,22 @@ return '<%s at %s for %s>' % (self.__class__, id(self), self.url) + + def __str__(self): + return str(self.url) + + def __eq__(self, other): + if not isinstance(other, SocketConfig): + return False + if self.url != other.url: + return False + + return True + + def __ne__(self, other): + return not self.__eq__(other) + def addr(self): raise NotImplementedError Index: src/supervisor/rpcinterface.py =================================================================== --- src/supervisor/rpcinterface.py (revision 881) +++ src/supervisor/rpcinterface.py (working copy) @@ -427,6 +427,8 @@ if group is None: raise RPCError(Faults.BAD_NAME, name) + group.stop_requested() + processes = group.processes.values() processes.sort() processes = [ (group, process) for process in processes ] Index: src/supervisor/tests/test_options.py =================================================================== --- src/supervisor/tests/test_options.py (revision 881) +++ src/supervisor/tests/test_options.py (working copy) @@ -15,7 +15,7 @@ from supervisor.tests.base import DummyPConfig from supervisor.tests.base import DummyPGroupConfig from supervisor.tests.base import DummyProcess -from supervisor.tests.base import DummySocketManager +from supervisor.tests.base import DummySocketConfig from supervisor.tests.base import lstrip class OptionTests(unittest.TestCase): @@ -806,7 +806,7 @@ self.assertEqual(gconfig0.__class__, FastCGIGroupConfig) self.assertEqual(gconfig0.name, 'foo') self.assertEqual(gconfig0.priority, 1) - self.assertEqual(gconfig0.socket_manager.config().url, + self.assertEqual(gconfig0.socket_config.url, 'unix:///tmp/foo.sock') self.assertEqual(len(gconfig0.process_configs), 2) self.assertEqual(gconfig0.process_configs[0].__class__, @@ -817,7 +817,7 @@ gconfig1 = gconfigs[1] self.assertEqual(gconfig1.name, 'bar') self.assertEqual(gconfig1.priority, 999) - self.assertEqual(gconfig1.socket_manager.config().url, + self.assertEqual(gconfig1.socket_config.url, 'tcp://localhost:6000') self.assertEqual(len(gconfig1.process_configs), 3) @@ -1300,23 +1300,36 @@ def test_ctor(self): options = DummyOptions() - sock_manager = DummySocketManager(6) - instance = self._makeOne(options, 'whatever', 999, [], sock_manager) + sock_config = DummySocketConfig(6) + instance = self._makeOne(options, 'whatever', 999, [], sock_config) self.assertEqual(instance.options, options) self.assertEqual(instance.name, 'whatever') self.assertEqual(instance.priority, 999) self.assertEqual(instance.process_configs, []) - self.assertEqual(instance.socket_manager, sock_manager) + self.assertEqual(instance.socket_config, sock_config) - def test_after_setuid(self): + def test_same_sockets_are_equal(self): options = DummyOptions() - sock_manager = DummySocketManager(6) - pconfigs = [DummyPConfig(options, 'process1', '/bin/process1')] - instance = self._makeOne(options, 'whatever', 999, pconfigs, sock_manager) - instance.after_setuid() - self.assertTrue(pconfigs[0].autochildlogs_created) - self.assertTrue(instance.socket_manager.prepare_socket_called) + sock_config1 = DummySocketConfig(6) + instance1 = self._makeOne(options, 'whatever', 999, [], sock_config1) + sock_config2 = DummySocketConfig(6) + instance2 = self._makeOne(options, 'whatever', 999, [], sock_config2) + + self.assertTrue(instance1 == instance2) + self.assertFalse(instance1 != instance2) + + def test_diff_sockets_are_not_equal(self): + options = DummyOptions() + sock_config1 = DummySocketConfig(6) + instance1 = self._makeOne(options, 'whatever', 999, [], sock_config1) + + sock_config2 = DummySocketConfig(7) + instance2 = self._makeOne(options, 'whatever', 999, [], sock_config2) + + self.assertTrue(instance1 != instance2) + self.assertFalse(instance1 == instance2) + class UtilFunctionsTests(unittest.TestCase): def test_make_namespec(self): from supervisor.options import make_namespec Index: src/supervisor/tests/test_datatypes.py =================================================================== --- src/supervisor/tests/test_datatypes.py (revision 881) +++ src/supervisor/tests/test_datatypes.py (working copy) @@ -183,6 +183,25 @@ self.assertTrue(reuse) sock.close + def test_same_urls_are_equal(self): + conf1 = self._makeOne('localhost', '5001') + conf2 = self._makeOne('localhost', '5001') + self.assertTrue(conf1 == conf2) + self.assertFalse(conf1 != conf2) + + def test_diff_urls_are_not_equal(self): + conf1 = self._makeOne('localhost', '5001') + conf2 = self._makeOne('localhost', '5002') + self.assertTrue(conf1 != conf2) + self.assertFalse(conf1 == conf2) + + def test_diff_objs_are_not_equal(self): + conf1 = self._makeOne('localhost', '5001') + conf2 = 'blah' + self.assertTrue(conf1 != conf2) + self.assertFalse(conf1 == conf2) + + class UnixStreamSocketConfigTests(unittest.TestCase): def _getTargetClass(self): return datatypes.UnixStreamSocketConfig @@ -213,6 +232,24 @@ sock = conf.create() self.assertFalse(os.path.exists(tf_name)) sock.close + + def test_same_paths_are_equal(self): + conf1 = self._makeOne('/tmp/foo.sock') + conf2 = self._makeOne('/tmp/foo.sock') + self.assertTrue(conf1 == conf2) + self.assertFalse(conf1 != conf2) + + def test_diff_paths_are_not_equal(self): + conf1 = self._makeOne('/tmp/foo.sock') + conf2 = self._makeOne('/tmp/bar.sock') + self.assertTrue(conf1 != conf2) + self.assertFalse(conf1 == conf2) + + def test_diff_objs_are_not_equal(self): + conf1 = self._makeOne('/tmp/foo.sock') + conf2 = 'blah' + self.assertTrue(conf1 != conf2) + self.assertFalse(conf1 == conf2) def test_suite(): return unittest.findTestCases(sys.modules[__name__]) Index: src/supervisor/tests/base.py =================================================================== --- src/supervisor/tests/base.py (revision 881) +++ src/supervisor/tests/base.py (working copy) @@ -293,6 +293,63 @@ def get_state(self): return self.options.mood +class DummySocket: + bind_called = False + bind_addr = None + listen_called = False + listen_backlog = None + close_called = False + + def __init__(self, fd): + self.fd = fd + + def fileno(self): + return self.fd + + def bind(self, addr): + self.bind_called = True + self.bind_addr = addr + + def listen(self, backlog): + self.listen_called = True + self.listen_backlog = backlog + + def close(self): + self.close_called = True + + def __str__(self): + return 'dummy socket' + +class DummySocketConfig: + def __init__(self, fd): + self.fd = fd + + def addr(self): + return 'dummy addr' + + def __eq__(self, other): + return self.fd == other.fd + + def __ne__(self, other): + return not self.__eq__(other) + + def create(self): + return DummySocket(self.fd) + +class DummySocketManager: + def __init__(self, config, **kwargs): + self._config = config + self.request_close_called = False + + def config(self): + return self._config + + def get_socket(self): + return DummySocket(self._config.fd) + + def request_close(self): + self.request_close_called = True + class DummyProcess: # Initial state; overridden by instance variables pid = 0 # Subprocess pid; 0 when not running @@ -861,9 +918,9 @@ self.name) class DummyFCGIGroupConfig(DummyPGroupConfig): - def __init__(self, options, name, priority, pconfigs, socket_manager): + def __init__(self, options, name='whatever', priority=999, pconfigs=None, socket_config=DummySocketConfig(1)): DummyPGroupConfig.__init__(self, options, name, priority, pconfigs) - self.socket_manager = socket_manager + self.socket_config = socket_config class DummyProcessGroup: def __init__(self, config): @@ -872,18 +929,28 @@ self.all_stopped = False self.dispatchers = {} self.unstopped_processes = [] + self.stop_was_requested = False def transition(self): self.transitioned = True def stop_all(self): self.all_stopped = True + + def stop_requested(self): + self.stop_was_requested = True def get_unstopped_processes(self): return self.unstopped_processes def get_dispatchers(self): return self.dispatchers + +class DummyFCGIProcessGroup(DummyProcessGroup): + + def __init__(self, config): + DummyProcessGroup.__init__(self, config) + self.socket_manager = DummySocketManager(config.socket_config) class PopulatedDummySupervisor(DummySupervisor): def __init__(self, options, group_name, *pconfigs): @@ -977,55 +1044,7 @@ def __str__(self): return 'dummy event' - -class DummySocket: - bind_called = False - bind_addr = None - listen_called = False - listen_backlog = None - close_called = False - - def __init__(self, fd): - self.fd = fd - def fileno(self): - return self.fd - - def bind(self, addr): - self.bind_called = True - self.bind_addr = addr - - def listen(self, backlog): - self.listen_called = True - self.listen_backlog = backlog - - def close(self): - self.close_called = True - - def __str__(self): - return 'dummy socket' - -class DummySocketConfig: - def __init__(self, fd): - self.fd = fd - - def addr(self): - return 'dummy addr' - - def create(self): - return DummySocket(self.fd) - -class DummySocketManager: - def __init__(self, sock_fd): - self.sock_fd = sock_fd - self.prepare_socket_called = False - - def prepare_socket(self): - self.prepare_socket_called = True - - def get_socket(self): - return DummySocket(self.sock_fd) - def dummy_handler(event, result): pass Index: src/supervisor/tests/test_process.py =================================================================== --- src/supervisor/tests/test_process.py (revision 881) +++ src/supervisor/tests/test_process.py (working copy) @@ -12,8 +12,10 @@ from supervisor.tests.base import DummyDispatcher from supervisor.tests.base import DummyEvent from supervisor.tests.base import DummyFCGIGroupConfig +from supervisor.tests.base import DummySocketConfig +from supervisor.tests.base import DummyProcessGroup +from supervisor.tests.base import DummyFCGIProcessGroup from supervisor.tests.base import DummySocketManager -from supervisor.tests.base import DummyProcessGroup class SubprocessTests(unittest.TestCase): def _getTargetClass(self): @@ -1185,10 +1187,10 @@ options.forkpid = 0 config = DummyPConfig(options, 'good', '/good/filename', uid=1) instance = self._makeOne(config) - sock_manager = DummySocketManager(7) + sock_config = DummySocketConfig(7) gconfig = DummyFCGIGroupConfig(options, 'whatever', 999, None, - sock_manager) - instance.group = DummyProcessGroup(gconfig) + sock_config) + instance.group = DummyFCGIProcessGroup(gconfig) result = instance.spawn() self.assertEqual(result, None) self.assertEqual(len(options.duped), 3) @@ -1203,15 +1205,35 @@ config = DummyPConfig(options, 'good', '/good/filename', uid=1) config.redirect_stderr = True instance = self._makeOne(config) - sock_manager = DummySocketManager(13) + sock_config = DummySocketConfig(13) gconfig = DummyFCGIGroupConfig(options, 'whatever', 999, None, - sock_manager) - instance.group = DummyProcessGroup(gconfig) + sock_config) + instance.group = DummyFCGIProcessGroup(gconfig) result = instance.spawn() self.assertEqual(result, None) self.assertEqual(len(options.duped), 2) self.assertEqual(options.duped[13], 0) self.assertEqual(len(options.fds_closed), options.minfds - 3) + + def test_before_spawn_gets_socket_ref(self): + options = DummyOptions() + config = DummyPConfig(options, 'good', '/good/filename', uid=1) + instance = self._makeOne(config) + sock_config = DummySocketConfig(7) + gconfig = DummyFCGIGroupConfig(options, 'whatever', 999, None, + sock_config) + instance.group = DummyFCGIProcessGroup(gconfig) + self.assertTrue(instance.fcgi_sock is None) + instance.before_spawn() + self.assertFalse(instance.fcgi_sock is None) + + def test_after_finish_removes_socket_ref(self): + options = DummyOptions() + config = DummyPConfig(options, 'good', '/good/filename', uid=1) + instance = self._makeOne(config) + instance.fcgi_sock = 'hello' + instance.after_finish() + self.assertTrue(instance.fcgi_sock is None) class ProcessGroupBaseTests(unittest.TestCase): def _getTargetClass(self): @@ -1340,6 +1362,21 @@ group.transition() self.assertEqual(process1.transitioned, True) +class FastCGIProcessGroupTests(unittest.TestCase): + def _getTargetClass(self): + from supervisor.process import FastCGIProcessGroup + return FastCGIProcessGroup + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_stop_requested_signals_socket_close(self): + options = DummyOptions() + gconfig = DummyFCGIGroupConfig(options) + group = self._makeOne(gconfig, socketManager=DummySocketManager) + group.stop_requested() + self.assertTrue(group.socket_manager.request_close_called) + class EventListenerPoolTests(ProcessGroupBaseTests): def setUp(self): from supervisor.events import clear Index: src/supervisor/tests/test_socket_manager.py =================================================================== --- src/supervisor/tests/test_socket_manager.py (revision 881) +++ src/supervisor/tests/test_socket_manager.py (working copy) @@ -10,6 +10,80 @@ from supervisor.datatypes import UnixStreamSocketConfig from supervisor.datatypes import InetStreamSocketConfig +class TestObject(): + + def __init__(self): + self.value = 5 + + def getValue(self): + return self.value + + def setValue(self, val): + self.value = val + +class ProxyTest(unittest.TestCase): + + def setUp(self): + self.on_deleteCalled = False + + def _getTargetClass(self): + from supervisor.socket_manager import Proxy + return Proxy + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def setOnDeleteCalled(self): + self.on_deleteCalled = True + + def test_proxy_getattr(self): + proxy = self._makeOne(TestObject()) + self.assertEquals(5, proxy.getValue()) + + def test_on_delete(self): + proxy = self._makeOne(TestObject(), on_delete=self.setOnDeleteCalled) + self.assertEquals(5, proxy.getValue()) + proxy = None + self.assertTrue(self.on_deleteCalled) + +class ReferenceCounterTest(unittest.TestCase): + + def setUp(self): + self.running = False + + def start(self): + self.running = True + + def stop(self): + self.running = False + + def _getTargetClass(self): + from supervisor.socket_manager import ReferenceCounter + return ReferenceCounter + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_incr_and_decr(self): + ctr = self._makeOne(on_zero=self.stop,on_non_zero=self.start) + self.assertFalse(self.running) + ctr.increment() + self.assertTrue(self.running) + self.assertEquals(1, ctr.get_count()) + ctr.increment() + self.assertTrue(self.running) + self.assertEquals(2, ctr.get_count()) + ctr.decrement() + self.assertTrue(self.running) + self.assertEquals(1, ctr.get_count()) + ctr.decrement() + self.assertFalse(self.running) + self.assertEquals(0, ctr.get_count()) + + def test_decr_at_zero_raises_error(self): + ctr = self._makeOne(on_zero=self.stop,on_non_zero=self.start) + self.assertRaises(Exception, ctr.decrement) + class SocketManagerTest(unittest.TestCase): def _getTargetClass(self): from supervisor.socket_manager import SocketManager @@ -29,7 +103,7 @@ self.assertEqual(sock_manager.socket_config, conf) sock = sock_manager.get_socket() self.assertEqual(sock.getsockname(), ('127.0.0.1', 12345)) - sock_manager.close() + sock_manager.request_close() def test_tcp_w_ip(self): conf = InetStreamSocketConfig('127.0.0.1', 12345) @@ -37,7 +111,7 @@ self.assertEqual(sock_manager.socket_config, conf) sock = sock_manager.get_socket() self.assertEqual(sock.getsockname(), ('127.0.0.1', 12345)) - sock_manager.close() + sock_manager.request_close() def test_unix(self): (tf_fd, tf_name) = tempfile.mkstemp(); @@ -46,54 +120,81 @@ self.assertEqual(sock_manager.socket_config, conf) sock = sock_manager.get_socket() self.assertEqual(sock.getsockname(), tf_name) - sock_manager.close() + sock = None + sock_manager.request_close() os.close(tf_fd) - def test_get_socket(self): + def test_socket_lifecycle(self): conf = DummySocketConfig(2) sock_manager = self._makeOne(conf) + #Assert that sockets are created on demand + self.assertFalse(sock_manager.is_prepared()) + #Get two socket references sock = sock_manager.get_socket() + self.assertTrue(sock_manager.is_prepared()) #socket created on demand + sock_id = id(sock._get()) sock2 = sock_manager.get_socket() - self.assertEqual(sock, sock2) - sock_manager.close() + sock2_id = id(sock2._get()) + #Assert that they are not the same proxy object + self.assertNotEqual(sock, sock2) + #Assert that they are the same underlying socket + self.assertEqual(sock_id, sock2_id) + #Request socket close + sock_manager.request_close() + #Socket not actually closed yet b/c ref ct is 2 + self.assertTrue(sock_manager.is_prepared()) + self.assertFalse(sock_manager.socket.close_called) + sock = None + #Socket not actually closed yet b/c ref ct is 1 + self.assertTrue(sock_manager.is_prepared()) + self.assertFalse(sock_manager.socket.close_called) + sock2 = None + #Socket closed + self.assertFalse(sock_manager.is_prepared()) + self.assertTrue(sock_manager.socket.close_called) + + #Get a new socket reference sock3 = sock_manager.get_socket() - self.assertNotEqual(sock, sock3) + self.assertTrue(sock_manager.is_prepared()) + sock3_id = id(sock3._get()) + #Assert that it is not the same socket + self.assertNotEqual(sock_id, sock3_id) + #Drop ref ct to zero + del sock3 + #Assert that socket is still open until close is requested + self.assertTrue(sock_manager.is_prepared()) + self.assertFalse(sock_manager.socket.close_called) + sock_manager.request_close() + #Now assert that socket is closed + self.assertFalse(sock_manager.is_prepared()) + self.assertTrue(sock_manager.socket.close_called) def test_prepare_socket(self): conf = DummySocketConfig(1) sock_manager = self._makeOne(conf) sock = sock_manager.get_socket() - self.assertTrue(sock_manager.prepared) + self.assertTrue(sock_manager.is_prepared()) self.assertTrue(sock.bind_called) self.assertEqual(sock.bind_addr, 'dummy addr') self.assertTrue(sock.listen_called) self.assertEqual(sock.listen_backlog, socket.SOMAXCONN) self.assertFalse(sock.close_called) - - def test_close(self): - conf = DummySocketConfig(6) - sock_manager = self._makeOne(conf) - sock = sock_manager.get_socket() - self.assertFalse(sock.close_called) - self.assertTrue(sock_manager.prepared) - sock_manager.close() - self.assertFalse(sock_manager.prepared) - self.assertTrue(sock.close_called) + sock_manager.request_close() def test_tcp_socket_already_taken(self): conf = InetStreamSocketConfig('127.0.0.1', 12345) sock_manager = self._makeOne(conf) - sock_manager.get_socket() + sock = sock_manager.get_socket() sock_manager2 = self._makeOne(conf) - self.assertRaises(socket.error, sock_manager2.prepare_socket) - sock_manager.close() + self.assertRaises(socket.error, sock_manager2.get_socket) + sock = None + sock_manager.request_close() def test_unix_bad_sock(self): conf = UnixStreamSocketConfig('/notthere/foo.sock') sock_manager = self._makeOne(conf) - self.assertRaises(socket.error, sock_manager.get_socket) - sock_manager.close() - + self.assertRaises(socket.error, sock_manager.get_socket) + def test_suite(): return unittest.findTestCases(sys.modules[__name__]) Index: src/supervisor/tests/test_rpcinterfaces.py =================================================================== --- src/supervisor/tests/test_rpcinterfaces.py (revision 881) +++ src/supervisor/tests/test_rpcinterfaces.py (working copy) @@ -732,6 +732,7 @@ self.assertEqual(process1.stop_called, True) process2 = supervisord.process_groups['foo'].processes['process2'] self.assertEqual(process2.stop_called, True) + self.assertTrue(supervisord.process_groups['foo'].stop_was_requested) def test_stopProcessGroup_nowait(self): options = DummyOptions() Index: src/supervisor/process.py =================================================================== --- src/supervisor/process.py (revision 881) +++ src/supervisor/process.py (working copy) @@ -38,6 +38,8 @@ from supervisor.datatypes import RestartUnconditionally +from supervisor.socket_manager import SocketManager + class Subprocess: """A class to manage a subprocess.""" @@ -233,7 +235,7 @@ self._assertInState(ProcessStates.STARTING) self.change_state(ProcessStates.BACKOFF) return - + try: pid = options.fork() except OSError, why: @@ -557,18 +559,49 @@ class FastCGISubprocess(Subprocess): """Extends Subprocess class to handle FastCGI subprocesses""" - def _prepare_child_fds(self): + def __init__(self, config): + Subprocess.__init__(self, config) + self.fcgi_sock = None + + def before_spawn(self): + """ + The FastCGI socket needs to be created by the parent before we fork + """ if self.group is None: raise NotImplementedError('No group set for FastCGISubprocess') - if not hasattr(self.group, 'config'): - raise NotImplementedError('No config found for group on ' - 'FastCGISubprocess') - if not hasattr(self.group.config, 'socket_manager'): + if not hasattr(self.group, 'socket_manager'): raise NotImplementedError('No SocketManager set for ' - 'FastCGISubprocess group') - sock = self.group.config.socket_manager.get_socket() - sock_fd = sock.fileno() + '%s:%s' % (self.group, dir(self.group))) + self.fcgi_sock = self.group.socket_manager.get_socket() + + def spawn(self): + """ + Overrides Subprocess.spawn() so we can hook in before it happens + """ + self.before_spawn() + Subprocess.spawn(self) + def after_finish(self): + """ + Releases reference to FastCGI socket when process is reaped + """ + #Remove object reference to decrement the reference count + self.fcgi_sock = None + + def finish(self, pid, sts): + """ + Overrides Subprocess.finish() so we can hook in after it happens + """ + Subprocess.finish(self, pid, sts) + self.after_finish() + + def _prepare_child_fds(self): + """ + Overrides Subprocess._prepare_child_fds() + The FastCGI socket needs to be set to file descriptor 0 in the child + """ + sock_fd = self.fcgi_sock.fileno() + options = self.config.options options.dup2(sock_fd, 0) options.dup2(self.pipes['child_stdout'], 1) @@ -578,7 +611,7 @@ options.dup2(self.pipes['child_stderr'], 2) for i in range(3, options.minfds): options.close_fd(i) - + class ProcessGroupBase: def __init__(self, config): self.config = config @@ -602,6 +635,12 @@ for process in self.processes.values(): process.reopenlogs() + def stop_requested(self): + """ Hook so that the process group gets notified by + it's geting stopped by an RPC interface call + """ + pass + def stop_all(self): processes = self.processes.values() processes.sort() @@ -634,7 +673,30 @@ def transition(self): for proc in self.processes.values(): proc.transition() + +class FastCGIProcessGroup(ProcessGroup): + def __init__(self, config, **kwargs): + ProcessGroup.__init__(self, config) + sockManagerKlass = kwargs.get('socketManager', SocketManager) + self.socket_manager = sockManagerKlass(config.socket_config, + logger=config.options.logger) + #It's not required to call get_socket() here but we want + #to fail early during start up if there is a config error + try: + sock = self.socket_manager.get_socket() + except Exception, e: + raise ValueError('Could not create FastCGI socket %s: %s' % (self.socket_manager.config(), e)) + + def stop_requested(self): + """ Overriden from ProcessGroup + Request close on FCGI socket (it will actually be close when all + the child processes are reaped) + """ + self.config.options.logger.debug('Stop requested for fcgi group %s' + % self.config.name) + self.socket_manager.request_close() + class EventListenerPool(ProcessGroupBase): def __init__(self, config): ProcessGroupBase.__init__(self, config) Index: src/supervisor/socket_manager.py =================================================================== --- src/supervisor/socket_manager.py (revision 881) +++ src/supervisor/socket_manager.py (working copy) @@ -14,16 +14,63 @@ import socket +class Proxy(): + """ Class for wrapping a shared resource object and getting + notified when it's deleted + """ + + def __init__(self, object, **kwargs): + self.object = object + self.on_delete = kwargs.get('on_delete', None) + + def __del__(self): + if self.on_delete: + self.on_delete() + + def __getattr__(self, name): + return getattr(self.object, name) + + def _get(self): + return self.object + +class ReferenceCounter(): + """ Class for tracking references to a shared resource + """ + + def __init__(self, **kwargs): + self.on_non_zero = kwargs['on_non_zero'] + self.on_zero = kwargs['on_zero'] + self.ref_count = 0 + + def get_count(self): + return self.ref_count + + def increment(self): + if self.ref_count == 0: + self.on_non_zero() + self.ref_count = self.ref_count + 1 + + def decrement(self): + if self.ref_count <= 0: + raise Exception('Illegal operation: cannot decrement below zero') + self.ref_count -= 1 + if self.ref_count == 0: + self.on_zero() + class SocketManager: """ Class for managing sockets in servers that create/bind/listen - before forking multiple child processes to accept() """ - - socket_config = None #SocketConfig object - socket = None #Socket being managed - prepared = False + before forking multiple child processes to accept() + Sockets are managed at the process group level and referenced counted + at the process level b/c that's really the only place to hook in + """ - def __init__(self, socket_config): + def __init__(self, socket_config, **kwargs): + self.logger = kwargs.get('logger', None) + self.socket = None + self.prepared = False self.socket_config = socket_config + self.close_requested = False + self.ref_ctr = ReferenceCounter(on_zero=self._on_ref_ct_zero, on_non_zero=self._prepare_socket) def __repr__(self): return '<%s at %s for %s>' % (self.__class__, @@ -33,17 +80,46 @@ def config(self): return self.socket_config - def prepare_socket(self): - self.socket = self.socket_config.create() - self.socket.bind(self.socket_config.addr()) - self.socket.listen(socket.SOMAXCONN) - self.prepared = True + def is_prepared(self): + return self.prepared + + def get_socket(self): + self.ref_ctr.increment() + self._require_prepared() + return Proxy(self.socket, on_delete=self.ref_ctr.decrement) - def get_socket(self): + def get_socket_ref_count(self): + self._require_prepared() + return self.ref_ctr.get_count() + + def request_close(self): + if self.prepared: + if self.ref_ctr.get_count() == 0: + self._close() + else: + self.close_requested = True + + def _require_prepared(self): if not self.prepared: - self.prepare_socket() - return self.socket - - def close(self): + raise Exception('Socket has not been prepared') + + def _prepare_socket(self): + if not self.prepared: + if self.logger: + self.logger.info('Creating socket %s' % self.socket_config) + self.socket = self.socket_config.create() + self.socket.bind(self.socket_config.addr()) + self.socket.listen(socket.SOMAXCONN) + self.prepared = True + + def _on_ref_ct_zero(self): + if self.close_requested: + self.close_requested = False + self._close() + + def _close(self): + self._require_prepared() + if self.logger: + self.logger.info('Closing socket %s' % self.socket_config) self.socket.close() self.prepared = False
_______________________________________________ Supervisor-users mailing list Supervisor-users@lists.supervisord.org http://lists.supervisord.org/mailman/listinfo/supervisor-users