Tomas Golembiovsky has uploaded a new change for review. Change subject: v2v: Add PipelineProc, pipeline wrapper object ......................................................................
v2v: Add PipelineProc, pipeline wrapper object We plan to pipe the output of virt-v2v to tee to store the log file. In order to manage the pipelined processes new class is introduced. Change-Id: I0c3741ae7ef9731a2cd9d587e86766b9e6e64f62 Signed-off-by: Tomáš Golembiovský <[email protected]> --- M lib/vdsm/v2v.py M tests/v2vTests.py 2 files changed, 132 insertions(+), 1 deletion(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/94/62094/1 diff --git a/lib/vdsm/v2v.py b/lib/vdsm/v2v.py index 860768e..e149ddc 100644 --- a/lib/vdsm/v2v.py +++ b/lib/vdsm/v2v.py @@ -34,6 +34,7 @@ import re import signal import tarfile +import time import threading import xml.etree.ElementTree as ET import zipfile @@ -46,7 +47,8 @@ from vdsm.constants import P_VDSM_RUN, EXT_KVM_2_OVIRT from vdsm.define import errCode, doneCode from vdsm import cmdutils, concurrent, libvirtconnection, response -from vdsm.utils import traceback, CommandPath, NICENESS, IOCLASS +from vdsm.utils import monotonic_time, traceback, CommandPath, \ + NICENESS, IOCLASS try: import ovirt_imageio_common @@ -635,6 +637,78 @@ return ret +class PipelineProc(object): + + def __init__(self, proc1, proc2): + self._proc = (proc1, proc2) + self._stdout = proc2.stdout + + def kill(self): + """ + Kill all processes in a pipeline. Unlike regular kill() we do not raise + OSError if the processess have already terminated. + """ + for p in self._proc: + try: + logging.debug("Killing pid=%d", p.pid) + p.kill() + except OSError as e: + # Probably the process has already terminated + if e.errno != errno.ESRCH: + raise e + + @property + def pid(self): + return [p.pid for p in self._proc] + + @property + def returncode(self): + """ + Returns None if any of the processes is still running. Returns 0 if all + processes have finished with a zero exit code, otherwise return first + nonzero exit code. + """ + ret = 0 + for p in self._proc: + p.poll() + if p.returncode is None: + return None + if p.returncode != 0 and ret == 0: + # One of the processes has failed + ret = p.returncode + + # All processes have finished + return ret + + @property + def stdout(self): + return self._stdout + + def wait(self, timeout=None): + if timeout is not None: + deadline = monotonic_time() + timeout + else: + deadline = None + + for p in self._proc: + if deadline is not None: + # NOTE: CPopen doesn't support timeout argument. + while monotonic_time() < deadline: + time.sleep(1) + p.poll() + if p.returncode is not None: + break + else: + p.wait() + + if deadline is not None: + if deadline < monotonic_time() or self.returncode is None: + # Timed out + return False + + return True + + class ImportVm(object): TERM_DELAY = 30 PROC_WAIT_TIMEOUT = 30 diff --git a/tests/v2vTests.py b/tests/v2vTests.py index 6d3a8f4..a0ae3b7 100644 --- a/tests/v2vTests.py +++ b/tests/v2vTests.py @@ -579,6 +579,63 @@ self.assertEqual(out, msg) +@expandPermutations +class PipelineProcTests(TestCaseBase): + + PROC_WAIT_TIMEOUT = 30 + + def testRun(self): + msg = 'foo\nbar' + p1 = v2v._simple_exec_cmd(['echo', '-n', msg], + stdout=subprocess.PIPE) + p2 = v2v._simple_exec_cmd(['cat'], + stdin=p1.stdout, + stdout=subprocess.PIPE) + + p = v2v.PipelineProc(p1, p2) + self.assertEqual(p.pid, [p1.pid, p2.pid]) + + ret = p.wait(self.PROC_WAIT_TIMEOUT) + self.assertEqual(ret, True) + + out = p.stdout.read() + self.assertEqual(out, msg) + + @permutations([ + # (cmd1, cmd2, returncode) + ['false', 'true', 1], + ['true', 'false', 1], + ['true', 'true', 0], + ]) + def testReturncode(self, cmd1, cmd2, returncode): + p1 = v2v._simple_exec_cmd([cmd1], + stdout=subprocess.PIPE) + p2 = v2v._simple_exec_cmd([cmd2], + stdin=p1.stdout, + stdout=subprocess.PIPE) + p = v2v.PipelineProc(p1, p2) + p.wait(self.PROC_WAIT_TIMEOUT) + self.assertEqual(p.returncode, returncode) + + @permutations([ + # (cmd1, cmd2, waitRet) + [['sleep', '1'], ['sleep', '1'], True], + [['sleep', '1'], ['sleep', '3'], False], + [['sleep', '3'], ['sleep', '1'], False], + [['sleep', '3'], ['sleep', '3'], False], + ]) + def testWait(self, cmd1, cmd2, waitRet): + p1 = v2v._simple_exec_cmd(cmd1, + stdout=subprocess.PIPE) + p2 = v2v._simple_exec_cmd(cmd2, + stdin=p1.stdout, + stdout=subprocess.PIPE) + p = v2v.PipelineProc(p1, p2) + ret = p.wait(2) + p.kill() + self.assertEqual(ret, waitRet) + + class MockVirConnectTests(TestCaseBase): def setUp(self): self._vms = [MockVirDomain(*spec) for spec in VM_SPECS] -- To view, visit https://gerrit.ovirt.org/62094 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I0c3741ae7ef9731a2cd9d587e86766b9e6e64f62 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Tomas Golembiovsky <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/admin/lists/[email protected]
