Nir Soffer has uploaded a new change for review.

Change subject: vm: Prevent multiple threads blocking on same libvirt domain
......................................................................

vm: Prevent multiple threads blocking on same libvirt domain

If a libvirt call get stuck because a vm is not responding, we could
have multiple threads blocked on same vm without any limit, using
precious libvirt resources that could be used to run other vms.

This patch adds a new TimedLock lock, that raise a LockTimeout if the
lock cannot be acquired after configured timeout. Using this lock, a vm
allow now only one concurrent libvirt call. If a libvirt call get stuck,
and other threads are tyring to invoke libvirt calls on the same
vm, they will wait until the current call finish, or fail with a
timeout.

This should slow down calls for single vm, since each call is invoked
only when the previous call returns. However, when using many vms, this
creates natural round-robin scheduling, giving each vm equal chance to
make progress, and limiting the load on libvirt.

Change-Id: Ib459697b8688ebcba987cd6b9e11815826e92990
Signed-off-by: Nir Soffer <[email protected]>
---
M debian/vdsm-python.install
M lib/vdsm/Makefile.am
A lib/vdsm/locking.py
M tests/Makefile.am
A tests/lockingTests.py
M vdsm.spec.in
M vdsm/virt/vm.py
7 files changed, 172 insertions(+), 8 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/72/30772/1

diff --git a/debian/vdsm-python.install b/debian/vdsm-python.install
index 9d6a99c..b80fd4f 100644
--- a/debian/vdsm-python.install
+++ b/debian/vdsm-python.install
@@ -8,6 +8,7 @@
 ./usr/lib/python2.7/dist-packages/vdsm/exception.py
 ./usr/lib/python2.7/dist-packages/vdsm/ipwrapper.py
 ./usr/lib/python2.7/dist-packages/vdsm/libvirtconnection.py
+./usr/lib/python2.7/dist-packages/vdsm/locking.py
 ./usr/lib/python2.7/dist-packages/vdsm/netconfpersistence.py
 ./usr/lib/python2.7/dist-packages/vdsm/netinfo.py
 ./usr/lib/python2.7/dist-packages/vdsm/netlink/__init__.py
diff --git a/lib/vdsm/Makefile.am b/lib/vdsm/Makefile.am
index 8e90e6e..ffb642c 100644
--- a/lib/vdsm/Makefile.am
+++ b/lib/vdsm/Makefile.am
@@ -28,6 +28,7 @@
        exception.py \
        ipwrapper.py \
        libvirtconnection.py \
+       locking.py \
        netconfpersistence.py \
        netinfo.py \
        profile.py \
diff --git a/lib/vdsm/locking.py b/lib/vdsm/locking.py
new file mode 100644
index 0000000..9e92ca3
--- /dev/null
+++ b/lib/vdsm/locking.py
@@ -0,0 +1,54 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import threading
+import time
+
+
+class LockTimeout(Exception):
+    """ Timeout acquiring a TimedLock """
+
+
+class TimedLock(object):
+    """
+    A lock raising a LockTimeout if it cannot be acquired within timeout.
+    """
+
+    def __init__(self, name, timeout):
+        self._name = name
+        self._timeout = timeout
+        self._cond = threading.Condition(threading.Lock())
+        self._busy = False
+
+    def __enter__(self):
+        deadline = time.time() + self._timeout
+        with self._cond:
+            while self._busy:
+                now = time.time()
+                if now >= deadline:
+                    raise LockTimeout("Timeout acquiring lock %r" % self._name)
+                self._cond.wait(deadline - now)
+            self._busy = True
+        return self
+
+    def __exit__(self, *args):
+        with self._cond:
+            self._busy = False
+            self._cond.notify()
diff --git a/tests/Makefile.am b/tests/Makefile.am
index aa4a45e..10a2727 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -44,6 +44,7 @@
        jsonRpcTests.py \
        ksmTests.py \
        libvirtconnectionTests.py \
+       lockingTests.py \
        lvmTests.py \
        main.py \
        miscTests.py \
diff --git a/tests/lockingTests.py b/tests/lockingTests.py
new file mode 100644
index 0000000..9ac8a05
--- /dev/null
+++ b/tests/lockingTests.py
@@ -0,0 +1,89 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import time
+import threading
+from vdsm import locking
+from testlib import VdsmTestCase
+
+
+class TimedLockTests(VdsmTestCase):
+
+    def test_free(self):
+        lock = locking.TimedLock("xxx-yyy-zzz", 0)
+        with self.assertNotRaises():
+            with lock:
+                pass
+
+    def test_busy(self):
+        lock = locking.TimedLock("xxx-yyy-zzz", 0)
+        with self.assertRaises(locking.LockTimeout):
+            with lock:
+                with lock:
+                    pass
+
+    def test_serialize(self):
+        lock = locking.TimedLock("xxx-yyy-zzz", 0.4)
+        single_thread = threading.BoundedSemaphore(1)
+        passed = [0]
+        timedout = [0]
+
+        def run():
+            try:
+                with lock:
+                    with single_thread:
+                        time.sleep(0.1)
+                        passed[0] += 1
+            except locking.LockTimeout:
+                timedout[0] += 1
+
+        self.run_threads(3, run)
+        self.assertEquals(passed[0], 3)
+        self.assertEquals(timedout[0], 0)
+
+    def test_timeout(self):
+        lock = locking.TimedLock("xxx-yyy-zzz", 0.1)
+        single_thread = threading.BoundedSemaphore(1)
+        passed = [0]
+        timedout = [0]
+
+        def run():
+            try:
+                with lock:
+                    with single_thread:
+                        time.sleep(0.2)
+                        passed[0] += 1
+            except locking.LockTimeout:
+                timedout[0] += 1
+
+        self.run_threads(3, run)
+        self.assertEquals(passed[0], 1)
+        self.assertEquals(timedout[0], 2)
+
+    def run_threads(self, count, target):
+        threads = []
+        for i in range(count):
+            t = threading.Thread(target=target)
+            t.daemon = True
+            threads.append(t)
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
diff --git a/vdsm.spec.in b/vdsm.spec.in
index 2d371b1..c7a2b0b 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -1159,6 +1159,7 @@
 %{python_sitearch}/%{vdsm_name}/exception.py*
 %{python_sitearch}/%{vdsm_name}/ipwrapper.py*
 %{python_sitearch}/%{vdsm_name}/libvirtconnection.py*
+%{python_sitearch}/%{vdsm_name}/locking.py*
 %{python_sitearch}/%{vdsm_name}/netinfo.py*
 %{python_sitearch}/%{vdsm_name}/netlink/__init__.py*
 %{python_sitearch}/%{vdsm_name}/netlink/addr.py*
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 25eec71..9e9a844 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -37,6 +37,7 @@
 # vdsm imports
 from vdsm import constants
 from vdsm import libvirtconnection
+from vdsm import locking
 from vdsm import netinfo
 from vdsm import qemuimg
 from vdsm import utils
@@ -619,12 +620,24 @@
 
 
 class NotifyingVirDomain:
-    # virDomain wrapper that notifies vm when a method raises an exception with
-    # get_error_code() = VIR_ERR_OPERATION_TIMEOUT
+    """
+    Wrap virDomain object for limiting concurrent calls and reporting timeouts.
 
-    def __init__(self, dom, tocb):
+    The wrapper allows only one concurrent call per vm, to prevent blocking of
+    multiple threads when underlying libvirt call get stuck. Invoking a domain
+    method will block if the domain is busy with another call. If the domain is
+    not available after timeout seconds, a timeout is reported and a
+    TimeoutError is raised.
+
+    If a domain method was invoked, and the libvirt call failed with with
+    VIR_ERR_OPERATION_TIMEOUT error code, the timeout is reported, and
+    TimeoutError is raised.
+    """
+
+    def __init__(self, dom, tocb, vmid, timeout=30):
         self._dom = dom
         self._cb = tocb
+        self._timedlock = locking.TimedLock(vmid, timeout)
 
     def __getattr__(self, name):
         attr = getattr(self._dom, name)
@@ -633,7 +646,8 @@
 
         def f(*args, **kwargs):
             try:
-                ret = attr(*args, **kwargs)
+                with self._timedlock:
+                    ret = attr(*args, **kwargs)
                 self._cb(False)
                 return ret
             except libvirt.libvirtError as e:
@@ -643,6 +657,9 @@
                     toe.err = e.err
                     raise toe
                 raise
+            except locking.LockTimeout as e:
+                self._cb(True)
+                raise TimeoutError(e)
         return f
 
 
@@ -2892,7 +2909,7 @@
         if self.recovering:
             self._dom = NotifyingVirDomain(
                 self._connection.lookupByUUIDString(self.id),
-                self._timeoutExperienced)
+                self._timeoutExperienced, self.id)
         elif 'restoreState' in self.conf:
             fromSnapshot = self.conf.get('restoreFromSnapshot', False)
             srcDomXML = self.conf.pop('_srcDomXML')
@@ -2912,7 +2929,7 @@
 
             self._dom = NotifyingVirDomain(
                 self._connection.lookupByUUIDString(self.id),
-                self._timeoutExperienced)
+                self._timeoutExperienced, self.id)
         else:
             flags = libvirt.VIR_DOMAIN_NONE
             if 'launchPaused' in self.conf:
@@ -2921,7 +2938,7 @@
                 del self.conf['launchPaused']
             self._dom = NotifyingVirDomain(
                 self._connection.createXML(domxml, flags),
-                self._timeoutExperienced)
+                self._timeoutExperienced, self.id)
             hooks.after_vm_start(self._dom.XMLDesc(0), self.conf)
             for dev in self._customDevices():
                 hooks.after_device_create(dev._deviceXML, self.conf,
@@ -3690,7 +3707,7 @@
                 # or restart vdsm if connection to libvirt was lost
                 self._dom = NotifyingVirDomain(
                     self._connection.lookupByUUIDString(self.id),
-                    self._timeoutExperienced)
+                    self._timeoutExperienced, self.id)
 
                 if not self._incomingMigrationFinished.isSet():
                     state = self._dom.state(0)


-- 
To view, visit http://gerrit.ovirt.org/30772
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib459697b8688ebcba987cd6b9e11815826e92990
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to