Francesco Romani has uploaded a new change for review. Change subject: virt: add run_async helper ......................................................................
virt: add run_async helper Introduce the run_async function, to run one-shot long operations in background. run_async support the same arguments as concurrent.thread. It comes on two flavours: - if the `semaphore' argument is None: acts just as thin wrapper around concurrent.thread(). This helps used to have a common API and to factor out the common data - if the `semaphore' argument is not None, tries to acquire it before to spawn the callable, and releases when the callable ends. This is useful to throttle the executions of the callable, and it will be used by a future patch which wants to throttle the incoming migrations, which maps to a VM creation flow. if run_async runs in throttling mode, and fails to acquire the semaphore, uses the argument of the `error' parameter. If it is not None, raises AsyncStartError holding that value. Change-Id: Icfb405389c465d2a8b8fc8b6f958926d58167a26 Wiki: http://www.ovirt.org/Features/Migration_Enhancements Signed-off-by: Francesco Romani <from...@redhat.com> --- M tests/vmUtilsTests.py M vdsm/virt/utils.py M vdsm/virt/vm.py 3 files changed, 96 insertions(+), 4 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/70/49570/1 diff --git a/tests/vmUtilsTests.py b/tests/vmUtilsTests.py index a4e4534..3fda2e6 100644 --- a/tests/vmUtilsTests.py +++ b/tests/vmUtilsTests.py @@ -18,6 +18,8 @@ # Refer to the README and COPYING files for full details of the license # +import threading + from virt import utils from virt import vm from virt import vmexitreason @@ -151,3 +153,50 @@ @permutations([[-1], [1023]]) def test_eventToString_unknown_event(self, code): self.assertTrue(vm.eventToString(code)) + + +class RunAsyncTests(TestCaseBase): + + def test_do(self): + done = threading.Event() + + def helper(): + done.set() + + utils.run_async(helper) + self.assertTrue(done.wait(1.)) + + def test_ignores_error(self): + def helper(): + raise RuntimeError("Async() doesn't care") + + error = utils.run_async(helper) + self.assertEqual(error, None) + + +class RunAsyncThrottledTests(TestCaseBase): + + def setUp(self): + self.sem = threading.BoundedSemaphore(1) + + def test_do(self): + + done = threading.Event() + + def helper(): + done.set() + + utils.run_async(helper, semaphore=self.sem) + self.assertTrue(done.wait(1.)) + + def test_raises_if_busy(self): + + def helper(): + pass + + with self.sem: + self.assertRaises(utils.AsyncStartError, + utils.run_async, + helper, + semaphore=self.sem, + error='fake error') diff --git a/vdsm/virt/utils.py b/vdsm/virt/utils.py index 55a60c6..c2298dd 100644 --- a/vdsm/virt/utils.py +++ b/vdsm/virt/utils.py @@ -26,6 +26,7 @@ import threading from vdsm.utils import monotonic_time, rmFile +from vdsm import concurrent def isVdsmImage(drive): @@ -117,3 +118,41 @@ if os.path.islink(sock): rmFile(os.path.realpath(sock)) rmFile(sock) + + +class AsyncStartError(Exception): + def __init__(self, error): + self.error = error + + +def run_async(func, name=None, daemon=False, logger=None, + semaphore=None, error=None): + starting_error = [None] + started = threading.Event() + + def _throttle(): + acquired = semaphore.acquire(False) + if acquired: + started.set() + try: + func() + finally: + semaphore.release() + else: + starting_error[0] = error + started.set() + + def _run(): + if semaphore is None: + started.set() + func() + else: + _throttle() + + thread = concurrent.thread( + _run, name=name, daemon=daemon, logger=logger) + thread.start() + + started.wait() + if starting_error[0] is not None: + raise AsyncStartError(starting_error[0]) diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py index 8ec8b1d..d55eed7 100644 --- a/vdsm/virt/vm.py +++ b/vdsm/virt/vm.py @@ -36,7 +36,6 @@ import libvirt # vdsm imports -from vdsm import concurrent from vdsm import constants from vdsm import libvirtconnection from vdsm import netinfo @@ -78,6 +77,7 @@ from .vmxml import METADATA_VM_TUNE_URI, METADATA_VM_TUNE_ELEMENT from .vmxml import METADATA_VM_TUNE_PREFIX +from .utils import run_async, AsyncStartError from .utils import isVdsmImage, cleanup_guest_socket from vmpowerdown import VmShutdown, VmReboot @@ -278,7 +278,6 @@ self._confLock = threading.Lock() self._jobsLock = threading.Lock() self._statusLock = threading.Lock() - self._creationThread = concurrent.thread(self._startUnderlyingVm) if 'migrationDest' in self.conf: self._lastStatus = vmstatus.MIGRATION_DESTINATION elif 'restoreState' in self.conf: @@ -682,8 +681,13 @@ return [drv for order, drv in drives] - def run(self): - self._creationThread.start() + def run(self, spawn=run_async): + try: + spawn(self._startUnderlyingVm) + except AsyncStartError as ex: + return response.error(ex.error) + + return response.success(vmList=self.status()) def memCommit(self): """ -- To view, visit https://gerrit.ovirt.org/49570 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Icfb405389c465d2a8b8fc8b6f958926d58167a26 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <from...@redhat.com> _______________________________________________ vdsm-patches mailing list vdsm-patches@lists.fedorahosted.org https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches