Tomas Golembiovsky has uploaded a new change for review.

Change subject: v2v: Add PipelineProc, pipeline wrapper object
......................................................................

v2v: Add PipelineProc, pipeline wrapper object

We plan to pipe the output of virt-v2v to tee to store the log file. In
order to manage the pipelined processes new class is introduced.

Change-Id: I0c3741ae7ef9731a2cd9d587e86766b9e6e64f62
Signed-off-by: Tomáš Golembiovský <[email protected]>
---
M lib/vdsm/v2v.py
M tests/v2vTests.py
2 files changed, 132 insertions(+), 1 deletion(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/94/62094/1

diff --git a/lib/vdsm/v2v.py b/lib/vdsm/v2v.py
index 860768e..e149ddc 100644
--- a/lib/vdsm/v2v.py
+++ b/lib/vdsm/v2v.py
@@ -34,6 +34,7 @@
 import re
 import signal
 import tarfile
+import time
 import threading
 import xml.etree.ElementTree as ET
 import zipfile
@@ -46,7 +47,8 @@
 from vdsm.constants import P_VDSM_RUN, EXT_KVM_2_OVIRT
 from vdsm.define import errCode, doneCode
 from vdsm import cmdutils, concurrent, libvirtconnection, response
-from vdsm.utils import traceback, CommandPath, NICENESS, IOCLASS
+from vdsm.utils import monotonic_time, traceback, CommandPath, \
+    NICENESS, IOCLASS
 
 try:
     import ovirt_imageio_common
@@ -635,6 +637,78 @@
         return ret
 
 
+class PipelineProc(object):
+
+    def __init__(self, proc1, proc2):
+        self._proc = (proc1, proc2)
+        self._stdout = proc2.stdout
+
+    def kill(self):
+        """
+        Kill all processes in a pipeline. Unlike regular kill() we do not raise
+        OSError if the processess have already terminated.
+        """
+        for p in self._proc:
+            try:
+                logging.debug("Killing pid=%d", p.pid)
+                p.kill()
+            except OSError as e:
+                # Probably the process has already terminated
+                if e.errno != errno.ESRCH:
+                    raise e
+
+    @property
+    def pid(self):
+        return [p.pid for p in self._proc]
+
+    @property
+    def returncode(self):
+        """
+        Returns None if any of the processes is still running. Returns 0 if all
+        processes have finished with a zero exit code, otherwise return first
+        nonzero exit code.
+        """
+        ret = 0
+        for p in self._proc:
+            p.poll()
+            if p.returncode is None:
+                return None
+            if p.returncode != 0 and ret == 0:
+                # One of the processes has failed
+                ret = p.returncode
+
+        # All processes have finished
+        return ret
+
+    @property
+    def stdout(self):
+        return self._stdout
+
+    def wait(self, timeout=None):
+        if timeout is not None:
+            deadline = monotonic_time() + timeout
+        else:
+            deadline = None
+
+        for p in self._proc:
+            if deadline is not None:
+                # NOTE: CPopen doesn't support timeout argument.
+                while monotonic_time() < deadline:
+                    time.sleep(1)
+                    p.poll()
+                    if p.returncode is not None:
+                        break
+            else:
+                p.wait()
+
+        if deadline is not None:
+            if deadline < monotonic_time() or self.returncode is None:
+                # Timed out
+                return False
+
+        return True
+
+
 class ImportVm(object):
     TERM_DELAY = 30
     PROC_WAIT_TIMEOUT = 30
diff --git a/tests/v2vTests.py b/tests/v2vTests.py
index 6d3a8f4..a0ae3b7 100644
--- a/tests/v2vTests.py
+++ b/tests/v2vTests.py
@@ -579,6 +579,63 @@
         self.assertEqual(out, msg)
 
 
+@expandPermutations
+class PipelineProcTests(TestCaseBase):
+
+    PROC_WAIT_TIMEOUT = 30
+
+    def testRun(self):
+        msg = 'foo\nbar'
+        p1 = v2v._simple_exec_cmd(['echo', '-n', msg],
+                                  stdout=subprocess.PIPE)
+        p2 = v2v._simple_exec_cmd(['cat'],
+                                  stdin=p1.stdout,
+                                  stdout=subprocess.PIPE)
+
+        p = v2v.PipelineProc(p1, p2)
+        self.assertEqual(p.pid, [p1.pid, p2.pid])
+
+        ret = p.wait(self.PROC_WAIT_TIMEOUT)
+        self.assertEqual(ret, True)
+
+        out = p.stdout.read()
+        self.assertEqual(out, msg)
+
+    @permutations([
+        # (cmd1, cmd2, returncode)
+        ['false', 'true', 1],
+        ['true', 'false', 1],
+        ['true', 'true', 0],
+    ])
+    def testReturncode(self, cmd1, cmd2, returncode):
+        p1 = v2v._simple_exec_cmd([cmd1],
+                                  stdout=subprocess.PIPE)
+        p2 = v2v._simple_exec_cmd([cmd2],
+                                  stdin=p1.stdout,
+                                  stdout=subprocess.PIPE)
+        p = v2v.PipelineProc(p1, p2)
+        p.wait(self.PROC_WAIT_TIMEOUT)
+        self.assertEqual(p.returncode, returncode)
+
+    @permutations([
+        # (cmd1, cmd2, waitRet)
+        [['sleep', '1'], ['sleep', '1'], True],
+        [['sleep', '1'], ['sleep', '3'], False],
+        [['sleep', '3'], ['sleep', '1'], False],
+        [['sleep', '3'], ['sleep', '3'], False],
+    ])
+    def testWait(self, cmd1, cmd2, waitRet):
+        p1 = v2v._simple_exec_cmd(cmd1,
+                                  stdout=subprocess.PIPE)
+        p2 = v2v._simple_exec_cmd(cmd2,
+                                  stdin=p1.stdout,
+                                  stdout=subprocess.PIPE)
+        p = v2v.PipelineProc(p1, p2)
+        ret = p.wait(2)
+        p.kill()
+        self.assertEqual(ret, waitRet)
+
+
 class MockVirConnectTests(TestCaseBase):
     def setUp(self):
         self._vms = [MockVirDomain(*spec) for spec in VM_SPECS]


-- 
To view, visit https://gerrit.ovirt.org/62094
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0c3741ae7ef9731a2cd9d587e86766b9e6e64f62
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Tomas Golembiovsky <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/admin/lists/[email protected]

Reply via email to