Adam Litke has uploaded a new change for review.

Change subject: LiveMerge: Sync volume chain information after live merge
......................................................................

LiveMerge: Sync volume chain information after live merge

When a block job event comes in from libvirt or when vdsm notices that a
block job is no longer running on a disk, we need to check the volume
chain according to qemu and sync our vm state with that information.

The items that require synchronization are:
 - VM.conf['devices'] and VM._devices:
   * Update volumeID if the active layer was merged
   * Update volumeChain to remove merged volumes
 - Image chain
   * For file-based storage domains, we own the volume metadata and must
     update the chain to remove merged volumes.  For block based domains
     modifying the chain requires SPM so we will do it during
     deleteIllegalVolume
 - Volume legality
   * Mark all merged volumes as illegal so they can be safely deleted by
     engine with the deleteIllegalVolumes verb.

Change-Id: Ib86c19077695354b45818e0186e642ad0e8bc07c
Signed-off-by: Adam Litke <ali...@redhat.com>
---
M vdsm/clientIF.py
M vdsm/storage/hsm.py
M vdsm/storage/image.py
M vdsm/vm.py
4 files changed, 153 insertions(+), 2 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/36/26636/1

diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py
index 88ed30c..74f2f47 100644
--- a/vdsm/clientIF.py
+++ b/vdsm/clientIF.py
@@ -564,6 +564,9 @@
             elif eventid == libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG:
                 action, = args[:-1]
                 v._onWatchdogEvent(action)
+            elif eventid == libvirt.VIR_DOMAIN_EVENT_ID_BLOCK_JOB:
+                path, jobType, status = args[:-1]
+                v._onBlockJobEvent(path, jobType, status)
             else:
                 v.log.warning('unknown eventid %s args %s', eventid, args)
         except:
diff --git a/vdsm/storage/hsm.py b/vdsm/storage/hsm.py
index bfdccf5..cb24022 100644
--- a/vdsm/storage/hsm.py
+++ b/vdsm/storage/hsm.py
@@ -1807,6 +1807,40 @@
         return dict(volumes=retVolumes)
 
     @public
+    def imageSyncVolumeChain(self, sdUUID, imgUUID, volUUID, newChain):
+        """
+        Update storage metadata for an image chain after a live merge
+        completes.  Any volumes which are no longer part of the qemu volume
+        chain need to be marked illegal to indicate they are safe to remove.
+        For fileSD domains we must also update the chain metadata so volume
+        ancestry is correct.  For blockSD domains this operation must be done
+        by SPM due to the need to change LVM metadata.  It will be handled by
+        the deleteIllegalVolume SPM verb.
+        """
+        sdDom = sdCache.produce(sdUUID=sdUUID)
+        repoPath = os.path.join(self.storage_repository, sdDom.getPools()[0])
+        img = image.Image(repoPath)
+
+        imageResourcesNamespace = sd.getNamespace(sdUUID, IMAGE_NAMESPACE)
+        with rmanager.acquireResource(imageResourcesNamespace, imgUUID,
+                                      rm.LockType.shared):
+            curChain = img.getChain(sdUUID, imgUUID, volUUID)
+            subChain = []
+            for vol in curChain:
+                if vol.volUUID not in newChain:
+                    subChain.insert(0, vol.volUUID)
+                elif len(subChain) > 0:
+                    break
+            self.log.error("AGLITKE: imageSyncVolumeChain removing subchain: "
+                           "%s" % subChain)
+            if len(subChain) == 0:
+                return
+            # Mark the subchain illegal and unlink the subchain (if possible)
+            img.markIllegalSubChain(sdDom, imgUUID, subChain)
+            if sdDom.getStorageType() not in sd.BLOCK_DOMAIN_TYPES:
+                img.unlinkSubChain(sdDom, imgUUID, subChain)
+
+    @public
     def mergeSnapshots(self, sdUUID, spUUID, vmUUID, imgUUID, ancestor,
                        successor, postZero=False):
         """
diff --git a/vdsm/storage/image.py b/vdsm/storage/image.py
index b08ecca..c8f8ae5 100644
--- a/vdsm/storage/image.py
+++ b/vdsm/storage/image.py
@@ -858,6 +858,26 @@
             chain.remove(srcVol.volUUID)
             srcVol = vol
 
+    def unlinkSubChain(self, sdDom, imgUUID, chain):
+        """
+        Unlink the sub-chain from the image.  Do not actually remove volumes.
+        """
+        if not chain:
+            raise se.InvalidParameterException("chain", str(chain))
+
+        volclass = sdDom.getVolumeClass()
+        ancestor = chain[0]
+        successor = chain[-1]
+        dstParent = volclass(self.repoPath, sdDom.sdUUID, imgUUID,
+                             ancestor).getParent()
+        try:
+            child = volclass(self.repoPath, sdDom.sdUUID, imgUUID,
+                             successor).getChildren()[0]
+            dstChild = volclass(self.repoPath, sdDom.sdUUID, imgUUID, child)
+        except IndexError:
+            return
+        dstChild.setParent(dstParent)
+
     def _internalVolumeMerge(self, sdDom, srcVolParams, volParams, newSize,
                              chain):
         """
diff --git a/vdsm/vm.py b/vdsm/vm.py
index 3602081..dabc9ad 100644
--- a/vdsm/vm.py
+++ b/vdsm/vm.py
@@ -32,9 +32,11 @@
 import time
 import xml.dom.minidom
 import uuid
+import json
 
 # 3rd party libs imports
 import libvirt
+import libvirt_qemu
 
 # vdsm imports
 from vdsm import constants
@@ -3122,6 +3124,12 @@
                         supervdsm.getProxy().setPortMirroring(network,
                                                               nic.name)
 
+        # Scan for block jobs that may have completed while we were not
+        # monitoring.  Make sure to do this before initializing the stats
+        # thread so we can handle any post-job synchronization work.
+        if self.recovering:
+            self.queryBlockJobs()
+
         # VmStatsThread may use block devices info from libvirt.
         # So, run it after you have this info
         self._initVmStats()
@@ -3830,6 +3838,14 @@
                     return device
 
         raise LookupError("No such drive: '%s'" % drive)
+
+    def _findDriveByPath(self, path):
+        for device in self._devices[DISK_DEVICES][:]:
+            if not hasattr(device, 'path'):
+                continue
+            if (device.path == path):
+                return device
+        raise LookupError("No drive matches path: '%s'" % path)
 
     def updateDriveVolume(self, vmDrive):
         if not vmDrive.device == 'disk' or not isVdsmImage(vmDrive):
@@ -5240,7 +5256,14 @@
                 if not ret:
                     self.log.debug("Block Job for vm:%s, img:%s has ended",
                                    self.conf['vmId'], dev['imageID'])
-                    self._vm.setDiskBlockJobID(dev, None)
+                    if self.recovering:
+                        # During VM recovery, we want to process jobs that have
+                        # finished while vdsm was stopped.
+                        self.log.debug("Recovering from missed block event "
+                                       "for job '%s'" % jobID)
+                        drive = self._findDriveByPath(dev['path'])
+                        self._postLiveMergeSyncVolumeChain(drive)
+                    self.setDiskBlockJobID(dev, None)
                     continue
 
                 jobs[jobID] = {'id': jobID, 'jobType': 'block',
@@ -5289,8 +5312,79 @@
 
         return {'status': doneCode}
 
+    def _internalQMPMonitorCommand(self, cmdDict):
+        jsonCmd = json.dumps(cmdDict)
+        ret = libvirt_qemu.qemuMonitorCommand(self._dom, jsonCmd, 0)
+        return json.loads(ret)
+
+    def _driveGetActualVolumeChain(self, drive):
+        def pathToVolId(drive, path):
+            for vol in drive.volumeChain:
+                if vol['path'] == path:
+                    return vol['volumeID']
+            raise LookupError("Unable to find VolumeID for path '%s'", path)
+
+        ret = []
+        cmd = {'execute': 'query-block'}
+        info = self._internalQMPMonitorCommand(cmd)
+        for dev in info['return']:
+            try:
+                image = dev['inserted']['image']
+            except KeyError:
+                continue
+            if image['filename'] != drive.path:
+                continue
+            while image:
+                # Produce an absolute path so that this information is
+                # comparable with the VM Disk paths.  It's not safe to make
+                # this information public since the paths cannot be guaranteed
+                # across hosts.  Use _pathListToVolumeList to convert it for
+                # export.
+                path = os.path.abspath(image['filename'])
+                ret.insert(0, pathToVolId(drive, path))
+                image = image.get('backing-image')
+        return ret
+
+    def _postLiveMergeSyncVolumeChain(self, drive):
+        def getVolumeInfo(device, volumeID):
+            for info in device['volumeChain']:
+                if info['volumeID'] == volumeID:
+                    return deepcopy(info)
+
+        volumes = self._driveGetActualVolumeChain(drive)
+        volumeID = volumes[-1]
+        self.log.error("AGLITKE: postLive volumes: %s", volumes)
+
+        # Sync this VM's data strctures.  Ugh, we're storing the same info in
+        # two places so we need to change it twice.
+        device = self._findDiskConfByPath(drive['path'])
+        if drive.volumeID != volumeID:
+            # If the active layer changed:
+            #  Update the disk path, volumeID, and volumeInfo members
+            volInfo = getVolumeInfo(device, volumeID)
+            device['path'] = drive.path = volInfo['path']
+            device['volumeID'] = drive.volumeID = volumeID
+            device['volumeInfo'] = drive.volumeInfo = volInfo
+        # Remove any components of the volumeChain which are no longer present
+        newChain = [x for x in device['volumeChain']
+                    if x['volumeID'] in volumes]
+        device['volumeChain'] = drive.volumeChain = newChain
+
+        # Ask the storage to sync metadata according to the new chain
+        self.cif.irs.imageSyncVolumeChain(drive.domainID, drive.imageID,
+                                          drive.volumeID, volumes)
+
     def _onBlockJobEvent(self, path, blockJobType, status):
-        # TODO: Synchronize our state in case the volume chain changed
+        if blockJobType != libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_COMMIT:
+            self.log.warning("Ignoring unrecognized block job type: '%s'",
+                             blockJobType)
+            return
+
+        if status == libvirt.VIR_DOMAIN_BLOCK_JOB_COMPLETED:
+            drive = self._findDriveByPath(path)
+            self._postLiveMergeSyncVolumeChain(drive)
+
+        # Clear the annotation so we don't try looking for more block jobs
         self._findAndsetDiskBlockJobID(path, None)
 
 


-- 
To view, visit http://gerrit.ovirt.org/26636
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib86c19077695354b45818e0186e642ad0e8bc07c
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Adam Litke <ali...@redhat.com>
_______________________________________________
vdsm-patches mailing list
vdsm-patches@lists.fedorahosted.org
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to