Adam Litke has uploaded a new change for review.

Change subject: Live Merge: Extend internal volumes during live merge
......................................................................

Live Merge: Extend internal volumes during live merge

During a live merge operation data is written to the target of the
commit (the base volume).  This means that for block-based images, we
must monitor the base volume and the leaf volume for extension.

Since libvirt is not providing watermark information for internal
volumes yet, use a qemu monitor command to retrieve the information.

Change-Id: I1a5b11e7da185a699028b6127066cd01de010a0d
Signed-off-by: Adam Litke <[email protected]>
---
M vdsm/virt/vm.py
1 file changed, 90 insertions(+), 4 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/97/28597/1

diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 4f8b51e..865e936 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -2285,15 +2285,91 @@
         with self._confLock:
             self.conf['timeOffset'] = newTimeOffset
 
+    def _getMergeWriteWatermarks(self):
+        ret = {}
+        cmd = {'execute': 'query-blockstats'}
+        resp = self._internalQMPMonitorCommand(cmd)
+        for device in resp['return']:
+            name = device['device']
+            if not name.startswith('drive-'):
+                continue
+            alias = name[6:]
+            try:
+                drive = self._lookupDeviceByAlias(DISK_DEVICES, alias)
+                job = self.getBlockJob(drive)
+            except LookupError:
+                continue
+
+            volChain = job['chain']
+            stats = []
+            vol = device
+            while vol:
+                stats.insert(0, vol['parent']['stats']['wr_highest_offset'])
+                vol = vol.get('backing')
+            if len(volChain) != len(stats):
+                self.log.debug("The number of wr_highest_offset stats does "
+                               "not match the number of volumes.  Skipping.")
+                continue
+            for vol, stat in zip(volChain, stats):
+                ret[vol] = stat
+        return ret
+
+    def _getLiveMergeExtendCandidates(self):
+        ret = {}
+        watermarks = self._getMergeWriteWatermarks()
+        for job in self.conf['_blockJobs'].values():
+            drive = self._findDriveByUUIDs(job['disk'])
+            if not drive.blockDev or drive.format != 'cow':
+                continue
+
+            info = {}
+            if job['strategy'] == 'commit':
+                volumeID = job['baseVolume']
+            else:
+                self.log.debug("Unrecognized merge strategy '%s'",
+                               job['strategy'])
+                continue
+            volSize = self.cif.irs.getVolumeSize(drive.domainID, drive.poolID,
+                                                 drive.imageID, volumeID)
+            if volSize['status']['code'] != 0:
+                self.log.error("Unable to get the size of volume %s (domain: "
+                               "%s image: %s)", volumeID, drive.domainID,
+                               drive.imageID)
+                continue
+
+            info['physical'] = int(volSize['truesize'])
+            info['capacity'] = drive.apparentsize
+            try:
+                info['alloc'] = watermarks[volumeID]
+            except KeyError:
+                self.log.warning("No watermark info available for %s",
+                                 volumeID)
+                continue
+            info['volumeID'] = volumeID
+            self.log.debug("Adding live merge extension candidate: volume=%s",
+                           volumeID)
+            ret[drive.imageID] = info
+        return ret
+
     def _getExtendCandidates(self):
         ret = []
 
+        mergeCandidates = self._getLiveMergeExtendCandidates()
         for drive in self._devices[DISK_DEVICES]:
             if not drive.blockDev or drive.format != 'cow':
                 continue
 
+            imageID = drive.imageID
             capacity, alloc, physical = self._dom.blockInfo(drive.path, 0)
             ret.append((drive, drive.volumeID, capacity, alloc, physical))
+
+            try:
+                mergeCandidate = mergeCandidates[imageID]
+            except KeyError:
+                continue
+            ret.append((drive, mergeCandidate['volumeID'],
+                        mergeCandidate['capacity'], mergeCandidate['alloc'],
+                        mergeCandidate['physical']))
         return ret
 
     def _shouldExtendVolume(self, drive, volumeID, capacity, alloc, physical):
@@ -3218,8 +3294,11 @@
 
     def _lookupDeviceByAlias(self, devType, alias):
         for dev in self._devices[devType][:]:
-            if dev.alias == alias:
-                return dev
+            try:
+                if dev.alias == alias:
+                    return dev
+            except AttributeError:
+                continue
         raise LookupError('Device instance for device identified by alias %s '
                           'not found' % alias)
 
@@ -5139,9 +5218,10 @@
             try:
                 job = self.getBlockJob(drive)
             except LookupError:
+                chain = self._driveGetActualVolumeChain(drive)
                 newJob = {'jobID': jobID, 'disk': driveSpec,
                           'baseVolume': base, 'topVolume': top,
-                          'strategy': strategy}
+                          'strategy': strategy, 'chain': chain}
                 self.conf['_blockJobs'][jobID] = newJob
             else:
                 self.log.debug("A block job with id %s already exists for "
@@ -5228,6 +5308,7 @@
         if res['info']['voltype'] == 'SHARED':
             self.log.error("merge: Refusing to merge into a shared volume")
             return errCode['mergeErr']
+        baseSize = int(res['info']['apparentsize'])
 
         ret = self.trackBlockJob(jobUUID, drive, baseVolUUID, topVolUUID,
                                  'commit')
@@ -5257,7 +5338,11 @@
             self.untrackBlockJob(jobUUID)
             return errCode['mergeErr']
 
-        # TODO: Handle block volume resizing for base volume
+        # blockCommit will cause data to be written into the base volume.
+        # Perform an initial extension to ensure there is enough space to
+        # start copying.  The normal monitoring code will take care of any
+        # future internal volume extensions that may be necessary
+        self.extendDriveVolume(drive, baseVolUUID, baseSize)
 
         # Trigger the collection of stats before returning so that callers
         # of getVmStats after this returns will see the new job
@@ -5362,6 +5447,7 @@
                              blockJobType)
             return
 
+        self.log.debug("Live merge completed for path '%s'", path)
         drive = self._lookupDeviceByPath(path)
         try:
             jobID = self.getBlockJob(drive)['jobID']


-- 
To view, visit http://gerrit.ovirt.org/28597
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I1a5b11e7da185a699028b6127066cd01de010a0d
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Adam Litke <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to