Giuseppe Lavagetto has uploaded a new change for review. https://gerrit.wikimedia.org/r/204035
Change subject: runcommand: Twisted 13.x compatibility ...................................................................... runcommand: Twisted 13.x compatibility As twisted.internet.process.Process has evolved significantly over the years, the old ProcessGroupProcess implementation was now incompatible with it. By exploiting the better structure of newer version of the base class, we avoid reimplementing parts of the logic, and just add our additional features (process groups support and timeouts) to the base class. Please note that this is /not/ compatible with twisted 11.x which we have on Ubuntu Precise, so if we decide to merge this we should probably create a precise branch of pybal where we can backport other, compatible changes. Change-Id: I84dd674cd803c523bcdad1072108266778965aa8 --- M pybal/monitors/runcommand.py 1 file changed, 42 insertions(+), 179 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/debs/pybal refs/changes/35/204035/1 diff --git a/pybal/monitors/runcommand.py b/pybal/monitors/runcommand.py index 38830de..140fb7c 100644 --- a/pybal/monitors/runcommand.py +++ b/pybal/monitors/runcommand.py @@ -16,165 +16,29 @@ Derivative of twisted.internet.process that supports Unix process groups, sessions and timeouts """ - def __init__(self, reactor, command, args, environment, path, proto, uid=None, gid=None, childFDs=None, sessionLeader=False, timeout=None): - """Spawn an operating-system process. - This is where the hard work of disconnecting all currently open - files / forking / executing the new process happens. (This is - executed automatically when a Process is instantiated.) + self.sessionLeader = sessionLeader + self.timeout = timeout + self.timeoutCall = None + super(ProcessGroupProcess, self).__init__( + reactor, command, args, environment, path, proto, + uid=uid, gid=gid, childFDs=childFDs + ) - This will also run the subprocess as a given user ID and group ID, if - specified. (Implementation Note: this doesn't support all the arcane - nuances of setXXuid on UNIX: it will assume that either your effective - or real UID is 0.) - """ - if not proto: - assert 'r' not in childFDs.values() - assert 'w' not in childFDs.values() - if not signal.getsignal(signal.SIGCHLD): - log.msg("spawnProcess called, but the SIGCHLD handler is not " - "installed. This probably means you have not yet " - "called reactor.run, or called " - "reactor.run(installSignalHandler=0). You will probably " - "never see this process finish, and it may become a " - "zombie process.") - # if you see this message during a unit test, look in - # test-standard.xhtml or twisted.test.test_process.SignalMixin - # for a workaround + def _execChild(self, path, uid, gid, executable, args, environment): + if self.sessionLeader: + self._setupSession() + super(ProcessGroupProcess, self)._execChild(path, uid, gid, executable, args, environment) - self.lostProcess = False - - settingUID = (uid is not None) or (gid is not None) - if settingUID: - curegid = os.getegid() - currgid = os.getgid() - cureuid = os.geteuid() - curruid = os.getuid() - if uid is None: - uid = cureuid - if gid is None: - gid = curegid - # prepare to change UID in subprocess - os.setuid(0) - os.setgid(0) - - self.pipes = {} - # keys are childFDs, we can sense them closing - # values are ProcessReader/ProcessWriters - - helpers = {} - # keys are childFDs - # values are parentFDs - - if childFDs is None: - childFDs = {0: "w", # we write to the child's stdin - 1: "r", # we read from their stdout - 2: "r", # and we read from their stderr - } - - debug = self.debug - if debug: print "childFDs", childFDs - - # fdmap.keys() are filenos of pipes that are used by the child. - fdmap = {} # maps childFD to parentFD - for childFD, target in childFDs.items(): - if debug: print "[%d]" % childFD, target - if target == "r": - # we need a pipe that the parent can read from - readFD, writeFD = os.pipe() - if debug: print "readFD=%d, writeFD%d" % (readFD, writeFD) - fdmap[childFD] = writeFD # child writes to this - helpers[childFD] = readFD # parent reads from this - elif target == "w": - # we need a pipe that the parent can write to - readFD, writeFD = os.pipe() - if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD) - fdmap[childFD] = readFD # child reads from this - helpers[childFD] = writeFD # parent writes to this - else: - assert type(target) == int, '%r should be an int' % (target,) - fdmap[childFD] = target # parent ignores this - if debug: print "fdmap", fdmap - if debug: print "helpers", helpers - # the child only cares about fdmap.values() - - self.pid = os.fork() - if self.pid == 0: # pid is 0 in the child process - - # do not put *ANY* code outside the try block. The child process - # must either exec or _exit. If it gets outside this block (due - # to an exception that is not handled here, but which might be - # handled higher up), there will be two copies of the parent - # running in parallel, doing all kinds of damage. - - # After each change to this code, review it to make sure there - # are no exit paths. - - try: - # stop debugging, if I am! I don't care anymore! - sys.settrace(None) - # close all parent-side pipes - self._setupChild(fdmap) - # Make a session/process group leader if requested - if sessionLeader: self._setupSession() - self._execChild(path, settingUID, uid, gid, - command, args, environment) - except Exception: - # If there are errors, bail and try to write something - # descriptive to stderr. - # XXX: The parent's stderr isn't necessarily fd 2 anymore, or - # even still available - # XXXX: however even libc assumes write(2,err) is a useful - # thing to attempt - try: - stderr = os.fdopen(2,'w') - stderr.write("Upon execvpe %s %s in environment %s\n:" % - (command, str(args), - "id %s" % id(environment))) - traceback.print_exc(file=stderr) - stderr.flush() - for fd in range(3): - os.close(fd) - except Exception: - pass # make *sure* the child terminates - # Did you read the comment about not adding code here? - os._exit(1) - - # we are the parent - - if settingUID: - os.setregid(currgid, curegid) - os.setreuid(curruid, cureuid) - self.status = -1 # this records the exit status of the child - - if timeout: - self.timeoutCall = reactor.callLater(timeout, self._processTimeout) - - self.proto = proto - - # arrange for the parent-side pipes to be read and written - for childFD, parentFD in helpers.items(): - os.close(fdmap[childFD]) - - if childFDs[childFD] == "r": - reader = process.ProcessReader(reactor, self, childFD, parentFD) - self.pipes[childFD] = reader - - if childFDs[childFD] == "w": - writer = process.ProcessWriter(reactor, self, childFD, parentFD, forceReadHack=True) - self.pipes[childFD] = writer - - try: - # the 'transport' is used for some compatibility methods - if self.proto is not None: - self.proto.makeConnection(self) - except Exception: - log.err() - process.registerReapProcessHandler(self.pid, self) + def _fork(self, path, uid, gid, executable, args, environment, **kwargs): + super(ProcessGroupProcess, self)._fork(path, uid, gid, executable, args, environment, **kwargs) + # In case we set timeouts, just respect them. + if self.timeout: + self.timeoutCall = reactor.callLater(self.timeout, self._processTimeout) def processEnded(self, status): if self.timeoutCall: @@ -184,7 +48,7 @@ pgid = -self.pid try: process.Process.processEnded(self, status) - finally: + finally: # The process group leader may have terminated, but child process in # the group may still be alive. Mass slaughter. try: @@ -196,20 +60,19 @@ print "pgid:", pgid, "e:", e raise else: - self.proto.leftoverProcesses(True) + self.proto.leftoverProcesses(True) def _setupSession(self): os.setsid() - + def _processTimeout(self): """ Called when the timeout expires. """ - # Kill the process group if not self.lostProcess: self.signalProcessGroup(signal.SIGKILL) - + def signalProcessGroup(self, signal, pgid=None): os.kill(pgid or -self.pid, signal) @@ -217,16 +80,16 @@ """ Monitor that checks server uptime by repeatedly fetching a certain URL """ - + __name__ = 'RunCommand' - + INTV_CHECK = 60 - + TIMEOUT_RUN = 20 - + def __init__(self, coordinator, server, configuration={}): - """Constructor""" - + """Constructor""" + # Call ancestor constructor super(RunCommandMonitoringProtocol, self).__init__(coordinator, server, configuration) @@ -241,18 +104,18 @@ self.checkCall = None self.runningProcess = None - + def run(self): """Start the monitoring""" - + super(RunCommandMonitoringProtocol, self).run() - + if not self.checkCall or not self.checkCall.active(): self.checkCall = reactor.callLater(self.intvCheck, self.runCommand) def stop(self): """Stop all running and/or upcoming checks""" - + super(RunCommandMonitoringProtocol, self).stop() if self.checkCall and self.checkCall.active(): @@ -262,41 +125,41 @@ if self.runningProcess is not None: try: self.runningProcess.signalProcess(signal.SIGKILL) except error.ProcessExitedAlready: pass - + def runCommand(self): """Periodically called method that does a single uptime check.""" self.runningProcess = self._spawnProcess(self, self.command, [self.command] + self.arguments, sessionLeader=True, timeout=(self.timeout or None)) - + def makeConnection(self, process): pass - def childDataReceived(self, childFD, data): + def childDataReceived(self, childFD, data): if not self.logOutput: return - + # Escape control chars map = {'\n': r'\n', '\r': r'\r', '\t': r'\t'} for char, subst in map.iteritems(): data = data.replace(char, subst) - + self.report("Cmd stdout: " + data) def childConnectionLost(self, childFD): pass - + def processEnded(self, reason): """ Called when the process has ended """ - + if reason.check(error.ProcessDone): self._resultUp() elif reason.check(error.ProcessTerminated): self._resultDown(reason.getErrorMessage()) - + # Schedule the next check if self.active: self.checkCall = reactor.callLater(self.intvCheck, self.runCommand) @@ -308,7 +171,7 @@ Called when the child terminated cleanly, but left some of its child processes behind """ - + if allKilled: msg = "WARNING: Command %s %s left child processes behind, which have been killed!" else: @@ -321,10 +184,10 @@ sessionLeader=False, timeout=None): """ Replacement for posixbase.PosixReactorBase.spawnProcess with added - process group / session and timeout support, and support for + process group / session and timeout support, and support for non-POSIX platforms and PTYs removed. - """ - + """ + args, env = reactor._checkProcessArgs(args, env) return ProcessGroupProcess(reactor, executable, args, env, path, processProtocol, uid, gid, childFDs, -- To view, visit https://gerrit.wikimedia.org/r/204035 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I84dd674cd803c523bcdad1072108266778965aa8 Gerrit-PatchSet: 1 Gerrit-Project: operations/debs/pybal Gerrit-Branch: master Gerrit-Owner: Giuseppe Lavagetto <glavage...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits