Adam Litke has uploaded a new change for review.

Change subject: wip: copy_data locking
......................................................................

wip: copy_data locking

Change-Id: Ie44e1f1709ee81c14b3d165bbf3a6597ac50b68e
Signed-off-by: Adam Litke <ali...@redhat.com>
---
M tests/storage_sdm_copy_data_test.py
M vdsm/storage/sdm/api/copy_data.py
2 files changed, 94 insertions(+), 16 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/25/60825/1

diff --git a/tests/storage_sdm_copy_data_test.py 
b/tests/storage_sdm_copy_data_test.py
index 3e90778..b6e2f98 100644
--- a/tests/storage_sdm_copy_data_test.py
+++ b/tests/storage_sdm_copy_data_test.py
@@ -37,6 +37,11 @@
 import storage.sdm.api.copy_data
 
 
+class FakeTaskContext(object):
+    def getSharedLock(*args):
+        pass
+
+
 @expandPermutations
 class CopyDataTests(VdsmTestCase):
     SIZE = 1048576
@@ -54,8 +59,11 @@
 
         with make_env() as env:
             rm = FakeResourceManager()
+            fake_task_ctx = FakeTaskContext()
             with MonkeyPatchScope([
+                (storage.sdm.api.copy_data, 'rmanager', rm),
                 (storage.sdm.api.copy_data, 'sdCache', env.sdcache),
+                (storage.sdm.api.copy_data.vars, 'task', fake_task_ctx),
                 (blockVolume, 'rmanager', rm),
             ]):
                 src_img_id = str(uuid.uuid4())
diff --git a/vdsm/storage/sdm/api/copy_data.py 
b/vdsm/storage/sdm/api/copy_data.py
index 82e0e61..5956477 100644
--- a/vdsm/storage/sdm/api/copy_data.py
+++ b/vdsm/storage/sdm/api/copy_data.py
@@ -19,6 +19,7 @@
 #
 
 from __future__ import absolute_import
+from collections import namedtuple
 from contextlib import contextmanager
 import logging
 
@@ -26,11 +27,19 @@
 from vdsm import properties
 from vdsm import qemuimg
 from vdsm.storage import constants as sc
+from vdsm.storage.threadlocal import vars
 
-from storage import volume
+from storage import resourceManager as rm
+from storage import sd, volume
+from storage.resourceFactories import IMAGE_NAMESPACE
 from storage.sdc import sdCache
 
 from . import base
+
+rmanager = rm.ResourceManager.getInstance()
+
+
+DivEndpointImageLock = namedtuple('DivEndpointImageLock', 'ns, img, type')
 
 
 class Job(base.Job):
@@ -43,8 +52,13 @@
     def __init__(self, job_id, host_id, source, destination):
         super(Job, self).__init__(job_id, 'copy_data', host_id)
         self._operation = None
+        self._sorted_domain_locks = []
+        self._sorted_image_locks = []
+
         self.source = _create_endpoint(source)
         self.destination = _create_endpoint(destination)
+        self._determine_vdsm_locks()
+        self._acquire_vdsm_domains()
 
     @property
     def progress(self):
@@ -54,22 +68,78 @@
         if self._operation:
             self._operation.abort()
 
-    def _run(self):
-        # TODO: LOCKING!
-        with self.source.prepare(), \
-                self.destination.prepare(writable=True):
-            # Do not start copying if we have already been aborted
-            if self._status == jobs.STATUS.ABORTED:
-                return
+    def _determine_vdsm_locks(self):
+        # Locks taken:
+        # - Domain (one shared lock per domain involved in the operation)
+        # - Image (shared on source, exclusive on destination):
+        #   - Activates logical volumes when using block storage
+        #   - Protects source image against modification on this host
+        #   - Reserves destination image for exclusive access on this host
+        # Locking convention:
+        # - All domain locks must be taken before any image locks.
+        # - Locks must be taken in sorted order to prevent deadlock.
+        # Caveats:
+        # - These locks don't block other hosts.  We would need to use
+        #   volume leases for all operations to get this protection.  For now
+        #   engine is responsible for avoiding conflicting commands.
+        # - Taking an exclusive lock on the destination image prevents
+        #   concurrent copy_data operations from taking place on the same host.
+        #   To allow this, we'll need to lock the destination image in shared
+        #   mode and the destination volume in exclusive mode.  This requires
+        #   changes to the resourceManager.
+        domain_locks = set()
+        image_locks = list()
+        if isinstance(self.source, CopyDataDivEndpoint):
+            domain_locks.add(self.source.sd_id)
+            img_res_ns = sd.getNamespace(self.source.sd_id, IMAGE_NAMESPACE)
+            lock = DivEndpointImageLock(img_res_ns, self.source.img_id,
+                                        rm.LockType.shared)
+            image_locks.append(lock)
+        if isinstance(self.destination, CopyDataDivEndpoint):
+            domain_locks.add(self.destination.sd_id)
+            img_res_ns = sd.getNamespace(self.destination.sd_id,
+                                         IMAGE_NAMESPACE)
+            lock = DivEndpointImageLock(img_res_ns, self.destination.img_id,
+                                        rm.LockType.exclusive)
+            image_locks.append(lock)
 
-            self._operation = qemuimg.convert(
-                self.source.path,
-                self.destination.path,
-                srcFormat=self.source.qemu_format,
-                dstFormat=self.destination.qemu_format,
-                backing=self.destination.backing_path,
-                backingFormat=self.destination.backing_qemu_format)
-            self._operation.wait_for_completion()
+        # Sort multiple locks taken at each level
+        self._sorted_domain_locks = sorted(list(domain_locks))
+        self._sorted_image_locks = sorted(image_locks,
+                                          key=lambda info: info.ns + info.img)
+
+    def _acquire_vdsm_domains(self):
+        # These locks must be acquired before the job is scheduled.
+        for domain in self._sorted_domain_locks:
+            vars.task.getSharedLock(sc.STORAGE, domain)
+
+    @contextmanager
+    def _acquired_vdsm_images(self):
+        locks = []
+        try:
+            for lock_info in self._sorted_image_locks:
+                locks.append(rmanager.acquireResource(*lock_info))
+            yield
+        finally:
+            for lock in locks:
+                lock.release()
+
+    def _run(self):
+        with self._acquired_vdsm_images():
+            with self.source.prepare(), \
+                    self.destination.prepare(writable=True):
+                # Do not start copying if we have already been aborted
+                if self._status == jobs.STATUS.ABORTED:
+                    return
+
+                self._operation = qemuimg.convert(
+                    self.source.path,
+                    self.destination.path,
+                    srcFormat=self.source.qemu_format,
+                    dstFormat=self.destination.qemu_format,
+                    backing=self.destination.backing_path,
+                    backingFormat=self.destination.backing_qemu_format)
+                self._operation.wait_for_completion()
 
 
 def _create_endpoint(params):


-- 
To view, visit https://gerrit.ovirt.org/60825
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie44e1f1709ee81c14b3d165bbf3a6597ac50b68e
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Adam Litke <ali...@redhat.com>
_______________________________________________
vdsm-patches mailing list
vdsm-patches@lists.fedorahosted.org
https://lists.fedorahosted.org/admin/lists/vdsm-patches@lists.fedorahosted.org

Reply via email to