Saggi Mizrahi has uploaded a new change for review. Change subject: utils: Create AsyncProcessOperation ......................................................................
utils: Create AsyncProcessOperation This is an object to wrap a process so it can be treated as an AsyncOperation. Features: * Takes care of pid collection * Has pluggable output parsing * Process appropriate implementation of wait() and stop() Related-Bug-Url: http://bugzilla.redhat.com/964595 Change-Id: I79d0eefc9d917a4a93916d52867fb4f1e793c60e Signed-off-by: Saggi Mizrahi <smizr...@redhat.com> --- M lib/vdsm/utils.py M tests/utilsTests.py 2 files changed, 113 insertions(+), 0 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/35/21135/1 diff --git a/lib/vdsm/utils.py b/lib/vdsm/utils.py index d3e0c2b..14f9dfa 100644 --- a/lib/vdsm/utils.py +++ b/lib/vdsm/utils.py @@ -48,6 +48,7 @@ import subprocess import threading import time +import zombiereaper from cpopen import CPopen as BetterPopen from config import config @@ -914,6 +915,55 @@ time.sleep(sleep) +class AsyncProcessOperation(object): + def __init__(self, proc, resultParser=None): + """Wraps a running process operation. + + resultParser should be of type callback(rc, out, err) and can return + anything or throw exceptions.""" + self._lock = threading.Lock() + + self._result = None + self._resultParser = resultParser + + self._proc = proc + + def wait(self, timeout=None, cond=None): + """Waits until the process has exited, the timeout has been reached or + the condition has been met""" + return self._proc.wait(timeout, cond) + + def stop(self): + """Stops the running operation, effectively sending a kill signal to + the process""" + self._proc.kill() + + def result(self): + """Returns the result in the as a tuple of (result, error). + If the operation is still running it will block until it returns. + + If no resultParser has been set the default result + is (rc, out, err) """ + with self._lock: + if self._result is None: + out, err = self._proc.communicate() + rc = self._proc.returncode + if self._resultParser is not None: + try: + self._result = (self._resultParser(rc, out, err), + None) + except Exception as e: + self._result = (None, e) + else: + self._result = ((rc, out, err), None) + + return self._result + + def __del__(self): + if self._proc.returncode is None: + zombiereaper.autoReapPID(self._proc.pid) + + def panic(msg): logging.error("Panic: %s", msg, exc_info=True) os.killpg(0, 9) diff --git a/tests/utilsTests.py b/tests/utilsTests.py index a6a222d..656c121 100644 --- a/tests/utilsTests.py +++ b/tests/utilsTests.py @@ -22,6 +22,7 @@ from testrunner import VdsmTestCase as TestCaseBase from vdsm import utils from storage import misc +import time class RetryTests(TestCaseBase): @@ -80,3 +81,65 @@ class GeneralUtilsTests(TestCaseBase): def testPanic(self): self.assertRaises(AssertionError, utils.panic, "panic test") + + +class AsyncProcessOperationTests(TestCaseBase): + def _echo(self, text): + proc = misc.execCmd(["echo", "-n", "test"], sync=False) + + def parse(rc, out, err): + return out + + return utils.AsyncProcessOperation(proc, parse) + + def _sleep(self, t): + proc = misc.execCmd(["sleep", str(t)], sync=False) + return utils.AsyncProcessOperation(proc) + + def _fail(self, t): + proc = misc.execCmd(["sleep", str(t)], sync=False) + + def parse(rc, out, err): + raise Exception("TEST!!!") + + return utils.AsyncProcessOperation(proc, parse) + + def test(self): + aop = self._sleep(1) + self.assertEquals(aop.result(), ((0, "", ""), None)) + + def testAlreadyExitedSuccess(self): + aop = self._sleep(0) + time.sleep(1) + self.assertEquals(aop.result(), ((0, "", ""), None)) + + def testAlreadyExitedFail(self): + aop = self._sleep("hello") + time.sleep(1) + ((rc, out, err), err) = aop.result() + self.assertEquals(err, None) + self.assertEquals(rc, 1) + + def testWait(self): + aop = self._sleep(1) + aop.wait(timeout=2) + + def testParser(self): + aop = self._echo("test") + self.assertEquals(aop.result(), ("test", None)) + + def testStop(self): + aop = self._sleep(10) + aop.stop() + + start = time.time() + aop.result() + end = time.time() + duration = end - start + self.assertTrue(duration < 2) + + def testException(self): + aop = self._fail(1) + res, err = aop.result() + self.assertEquals(res, None) + self.assertNotEquals(err, None) -- To view, visit http://gerrit.ovirt.org/21135 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I79d0eefc9d917a4a93916d52867fb4f1e793c60e Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: ovirt-3.3 Gerrit-Owner: Saggi Mizrahi <smizr...@redhat.com> _______________________________________________ vdsm-patches mailing list vdsm-patches@lists.fedorahosted.org https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches