Francesco Romani has uploaded a new change for review.

Change subject: virt: add run_async helper
......................................................................

virt: add run_async helper

Introduce the run_async function, to run one-shot long operations in
background.
run_async support the same arguments as concurrent.thread.

It comes on two flavours:
- if the `semaphore' argument is None: acts just as thin wrapper around
  concurrent.thread(). This helps used to have a common API and to
  factor out the common data

- if the `semaphore' argument is not None, tries to acquire it before
  to spawn the callable, and releases when the callable ends.
  This is useful to throttle the executions of the callable, and it will
  be used by a future patch which wants to throttle the incoming migrations,
  which maps to a VM creation flow.

  if run_async runs in throttling mode, and fails to acquire the
  semaphore, uses the argument of the `error' parameter.
  If it is not None, raises AsyncStartError holding that value.

Change-Id: Icfb405389c465d2a8b8fc8b6f958926d58167a26
Wiki: http://www.ovirt.org/Features/Migration_Enhancements
Signed-off-by: Francesco Romani <from...@redhat.com>
---
M tests/vmUtilsTests.py
M vdsm/virt/utils.py
M vdsm/virt/vm.py
3 files changed, 96 insertions(+), 4 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/70/49570/1

diff --git a/tests/vmUtilsTests.py b/tests/vmUtilsTests.py
index a4e4534..3fda2e6 100644
--- a/tests/vmUtilsTests.py
+++ b/tests/vmUtilsTests.py
@@ -18,6 +18,8 @@
 # Refer to the README and COPYING files for full details of the license
 #
 
+import threading
+
 from virt import utils
 from virt import vm
 from virt import vmexitreason
@@ -151,3 +153,50 @@
     @permutations([[-1], [1023]])
     def test_eventToString_unknown_event(self, code):
         self.assertTrue(vm.eventToString(code))
+
+
+class RunAsyncTests(TestCaseBase):
+
+    def test_do(self):
+        done = threading.Event()
+
+        def helper():
+            done.set()
+
+        utils.run_async(helper)
+        self.assertTrue(done.wait(1.))
+
+    def test_ignores_error(self):
+        def helper():
+            raise RuntimeError("Async() doesn't care")
+
+        error = utils.run_async(helper)
+        self.assertEqual(error, None)
+
+
+class RunAsyncThrottledTests(TestCaseBase):
+
+    def setUp(self):
+        self.sem = threading.BoundedSemaphore(1)
+
+    def test_do(self):
+
+        done = threading.Event()
+
+        def helper():
+            done.set()
+
+        utils.run_async(helper, semaphore=self.sem)
+        self.assertTrue(done.wait(1.))
+
+    def test_raises_if_busy(self):
+
+        def helper():
+            pass
+
+        with self.sem:
+            self.assertRaises(utils.AsyncStartError,
+                              utils.run_async,
+                              helper,
+                              semaphore=self.sem,
+                              error='fake error')
diff --git a/vdsm/virt/utils.py b/vdsm/virt/utils.py
index 55a60c6..c2298dd 100644
--- a/vdsm/virt/utils.py
+++ b/vdsm/virt/utils.py
@@ -26,6 +26,7 @@
 import threading
 
 from vdsm.utils import monotonic_time, rmFile
+from vdsm import concurrent
 
 
 def isVdsmImage(drive):
@@ -117,3 +118,41 @@
     if os.path.islink(sock):
         rmFile(os.path.realpath(sock))
     rmFile(sock)
+
+
+class AsyncStartError(Exception):
+    def __init__(self, error):
+        self.error = error
+
+
+def run_async(func, name=None, daemon=False, logger=None,
+              semaphore=None, error=None):
+    starting_error = [None]
+    started = threading.Event()
+
+    def _throttle():
+        acquired = semaphore.acquire(False)
+        if acquired:
+            started.set()
+            try:
+                func()
+            finally:
+                semaphore.release()
+        else:
+            starting_error[0] = error
+            started.set()
+
+    def _run():
+        if semaphore is None:
+            started.set()
+            func()
+        else:
+            _throttle()
+
+    thread = concurrent.thread(
+        _run, name=name, daemon=daemon, logger=logger)
+    thread.start()
+
+    started.wait()
+    if starting_error[0] is not None:
+        raise AsyncStartError(starting_error[0])
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 8ec8b1d..d55eed7 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -36,7 +36,6 @@
 import libvirt
 
 # vdsm imports
-from vdsm import concurrent
 from vdsm import constants
 from vdsm import libvirtconnection
 from vdsm import netinfo
@@ -78,6 +77,7 @@
 from .vmxml import METADATA_VM_TUNE_URI, METADATA_VM_TUNE_ELEMENT
 from .vmxml import METADATA_VM_TUNE_PREFIX
 
+from .utils import run_async, AsyncStartError
 from .utils import isVdsmImage, cleanup_guest_socket
 from vmpowerdown import VmShutdown, VmReboot
 
@@ -278,7 +278,6 @@
         self._confLock = threading.Lock()
         self._jobsLock = threading.Lock()
         self._statusLock = threading.Lock()
-        self._creationThread = concurrent.thread(self._startUnderlyingVm)
         if 'migrationDest' in self.conf:
             self._lastStatus = vmstatus.MIGRATION_DESTINATION
         elif 'restoreState' in self.conf:
@@ -682,8 +681,13 @@
 
         return [drv for order, drv in drives]
 
-    def run(self):
-        self._creationThread.start()
+    def run(self, spawn=run_async):
+        try:
+            spawn(self._startUnderlyingVm)
+        except AsyncStartError as ex:
+            return response.error(ex.error)
+
+        return response.success(vmList=self.status())
 
     def memCommit(self):
         """


-- 
To view, visit https://gerrit.ovirt.org/49570
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Icfb405389c465d2a8b8fc8b6f958926d58167a26
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <from...@redhat.com>
_______________________________________________
vdsm-patches mailing list
vdsm-patches@lists.fedorahosted.org
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to