Hello Federico Simoncelli, Dan Kenigsberg,
I'd like you to do a code review. Please visit
http://gerrit.ovirt.org/31049
to review the following change.
Change subject: Live Merge: Ignore libvirt block job events
......................................................................
Live Merge: Ignore libvirt block job events
Since VDSM needs to handle missed block job completion events that may
have been emitted while the daemon was stopped, it is best to not rely
on the events at all and initiate all cleanup operations from the
existing polling function. Since a volume chain sync can be a
relatively expensive operation involving HSM calls, run it in a separate
thread. Depending on the current state of the storage, the sync may
need to be retried.
Change-Id: Ibbcdca4c0c0e45e9323ecfef9ce2fce10d8451a5
Signed-off-by: Adam Litke <[email protected]>
Reviewed-on: http://gerrit.ovirt.org/30046
Reviewed-by: Dan Kenigsberg <[email protected]>
Reviewed-by: Federico Simoncelli <[email protected]>
---
M vdsm/clientIF.py
M vdsm/virt/vm.py
2 files changed, 125 insertions(+), 77 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/49/31049/1
diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py
index 803b19b..fc3e573 100644
--- a/vdsm/clientIF.py
+++ b/vdsm/clientIF.py
@@ -607,9 +607,6 @@
elif eventid == libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG:
action, = args[:-1]
v._onWatchdogEvent(action)
- elif eventid == libvirt.VIR_DOMAIN_EVENT_ID_BLOCK_JOB:
- path, jobType, status = args[:-1]
- v._onBlockJobEvent(path, jobType, status)
else:
v.log.warning('unknown eventid %s args %s', eventid, args)
except:
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 4d43c1e..f9a78bb 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -1817,11 +1817,6 @@
(SMARTCARD_DEVICES, SmartCardDevice),
(TPM_DEVICES, TpmDevice))
- BlockJobTypeMap = {libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_UNKNOWN: 'unknown',
- libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_PULL: 'pull',
- libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_COPY: 'copy',
- libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_COMMIT: 'commit'}
-
def _makeDeviceDict(self):
return dict((dev, []) for dev, _ in self.DeviceMapping)
@@ -1910,6 +1905,7 @@
raise RuntimeError('Unsupported architecture: %s' % self.arch)
self._powerDownEvent = threading.Event()
+ self._liveMergeCleanupThreads = {}
def _get_lastStatus(self):
PAUSED_STATES = (vmstatus.POWERING_DOWN, vmstatus.REBOOT_IN_PROGRESS,
@@ -4751,6 +4747,13 @@
if response['status']['code']:
return response
+ # Wait for any Live Merge cleanup threads. This will only block in
+ # the extremely rare case where a VM is being powered off at the
+ # same time as a live merge is being finalized. These threads
+ # finish quickly unless there are storage connection issues.
+ for t in self._liveMergeCleanupThreads.values():
+ t.join()
+
if not self.cif.mom:
self.cif.ksmMonitor.adjust()
self._cleanup()
@@ -5463,7 +5466,7 @@
except LookupError:
newJob = {'jobID': jobID, 'disk': driveSpec,
'baseVolume': base, 'topVolume': top,
- 'strategy': strategy,
+ 'strategy': strategy, 'blockJobType': 'commit',
'chain': self._driveGetActualVolumeChain(drive)}
self.conf['_blockJobs'][jobID] = newJob
else:
@@ -5485,30 +5488,63 @@
return True
def queryBlockJobs(self):
- jobs = {}
- for jobID, job in self.conf['_blockJobs'].iteritems():
- drive = self._findDriveByUUIDs(job['disk'])
- ret = self._dom.blockJobInfo(drive.name, 0)
- if not ret:
- self.log.info("Block Job %s has ended", jobID)
- jobs[jobID] = None
- continue
+ def startCleanup(job, drive):
+ t = LiveMergeCleanupThread(self, job['jobID'], drive)
+ t.start()
+ self._liveMergeCleanupThreads[job['jobID']] = t
- jobs[jobID] = {'id': jobID, 'jobType': 'block',
- 'blockJobType': Vm.BlockJobTypeMap[ret['type']],
- 'bandwidth': ret['bandwidth'],
- 'cur': str(ret['cur']), 'end': str(ret['end']),
- 'imgUUID': job['disk']['imageID']}
-
- # This function is meant to be called from multiple threads (ie.
- # VMStatsThread and API calls. The _jobsLock ensures that a cohesive
- # data set is returned by serializing each call.
+ jobsRet = {}
+ # We need to take the jobs lock here to ensure that we don't race with
+ # another call to merge() where the job has been recorded but not yet
+ # started.
with self._jobsLock:
- for jobID in jobs.keys():
- if jobs[jobID] is None:
+ for storedJob in self.conf['_blockJobs'].values():
+ jobID = storedJob['jobID']
+ cleanThread = self._liveMergeCleanupThreads.get(jobID)
+ if cleanThread and cleanThread.isSuccessful():
+ # Handle successfully cleaned jobs early because the job
+ # just needs to be untracked and the stored disk info might
+ # be stale anyway (ie. after active layer commit).
self.untrackBlockJob(jobID)
- del jobs[jobID]
- return jobs
+ continue
+
+ drive = self._findDriveByUUIDs(storedJob['disk'])
+ entry = {'id': jobID, 'jobType': 'block',
+ 'blockJobType': storedJob['blockJobType'],
+ 'bandwidth': 0, 'cur': '0', 'end': '0',
+ 'imgUUID': storedJob['disk']['imageID']}
+
+ liveInfo = None
+ if 'gone' not in storedJob:
+ try:
+ liveInfo = self._dom.blockJobInfo(drive.name, 0)
+ except libvirt.libvirtError:
+ self.log.exception("Error getting block job info")
+ jobsRet[jobID] = entry
+ continue
+
+ if liveInfo:
+ entry['bandwidth'] = liveInfo['bandwidth']
+ entry['cur'] = str(liveInfo['cur'])
+ entry['end'] = str(liveInfo['end'])
+ else:
+ # Libvirt has stopped reporting this job so we know it will
+ # never report it again.
+ storedJob['gone'] = True
+ if not cleanThread:
+ # There is no cleanup thread so the job must have just
+ # ended. Spawn an async cleanup.
+ startCleanup(storedJob, drive)
+ elif cleanThread.isAlive():
+ # Let previously started cleanup thread continue
+ self.log.debug("Still waiting for block job %s to be "
+ "cleaned up", jobID)
+ elif not cleanThread.isSuccessful():
+ # At this point we know the thread is not alive and the
+ # cleanup failed. Retry it with a new thread.
+ startCleanup(storedJob, drive)
+ jobsRet[jobID] = entry
+ return jobsRet
def merge(self, driveSpec, baseVolUUID, topVolUUID, bandwidth, jobUUID):
if not caps.getLiveMergeSupport():
@@ -5560,33 +5596,26 @@
# visible from any host even if the mountpoint is different.
flags = libvirt.VIR_DOMAIN_BLOCK_COMMIT_RELATIVE
- try:
- self.trackBlockJob(jobUUID, drive, baseVolUUID, topVolUUID,
- 'commit')
- except BlockJobExistsError:
- self.log.error("Another block job is already active on this disk")
- return errCode['mergeErr']
- self.log.info("Starting merge with jobUUID='%s'", jobUUID)
+ # Take the jobs lock here to protect the new job we are tracking from
+ # being cleaned up by queryBlockJobs() since it won't exist right away
+ with self._jobsLock:
+ try:
+ self.trackBlockJob(jobUUID, drive, baseVolUUID, topVolUUID,
+ 'commit')
+ except BlockJobExistsError:
+ self.log.error("A block job is already active on this disk")
+ return errCode['mergeErr']
+ self.log.info("Starting merge with jobUUID='%s'", jobUUID)
- flags = 0
- # Indicate that we expect libvirt to maintain the relative paths of
- # backing files. This is necessary to ensure that a volume chain is
- # visible from any host even if the mountpoint is different.
- try:
- flags |= libvirt.VIR_DOMAIN_BLOCK_COMMIT_RELATIVE
- except AttributeError:
- self.log.error("Libvirt missing VIR_DOMAIN_BLOCK_COMMIT_RELATIVE. "
- "Unable to perform live merge.")
- return errCode['mergeErr']
- try:
- ret = self._dom.blockCommit(drive.path, base, top, bandwidth,
- flags)
- if ret != 0:
- raise RuntimeError("blockCommit operation failed rc:%i", ret)
- except (RuntimeError, libvirt.libvirtError):
- self.log.exception("Live merge failed (job: %s)", jobUUID)
- self.untrackBlockJob(jobUUID)
- return errCode['mergeErr']
+ try:
+ ret = self._dom.blockCommit(drive.path, base, top, bandwidth,
+ flags)
+ if ret != 0:
+ raise RuntimeError("blockCommit failed rc:%i", ret)
+ except (RuntimeError, libvirt.libvirtError):
+ self.log.exception("Live merge failed (job: %s)", jobUUID)
+ self.untrackBlockJob(jobUUID)
+ return errCode['mergeErr']
# blockCommit will cause data to be written into the base volume.
# Perform an initial extension to ensure there is enough space to
@@ -5691,29 +5720,12 @@
if x['volumeID'] in volumes]
device['volumeChain'] = drive.volumeChain = newChain
- def _onBlockJobEvent(self, path, blockJobType, status):
- self.log.debug("Received block job event for path:%s, type:%s, "
- "status:%s", path, blockJobType, status)
- if blockJobType != libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_COMMIT:
- self.log.warning("Ignoring unrecognized block job type: '%s'",
- blockJobType)
- return
-
- drive = self._lookupDeviceByPath(path)
- try:
- jobID = self.getBlockJob(drive)['jobID']
- except LookupError:
- self.log.debug("Ignoring event for untracked block job on path "
- "'%s'", path)
- return
-
- if status == libvirt.VIR_DOMAIN_BLOCK_JOB_COMPLETED:
- self.log.info("Live merge completed (job:%s)", jobID)
+ def handleBlockJobEvent(self, jobID, drive, mode):
+ if mode == 'finished':
+ self.log.info("Live merge job completed (job %s)", jobID)
self._syncVolumeChain(drive)
else:
- self.log.warning("Block job %s did not complete successfully "
- "(status:%i)", jobID, status)
- self.untrackBlockJob(jobID)
+ raise RuntimeError("Invalid mode: '%s'" % mode)
def _initLegacyConf(self):
self.conf['displayPort'] = GraphicsDevice.LIBVIRT_PORT_AUTOSELECT
@@ -5739,6 +5751,45 @@
if dev.get('type') == GRAPHICS_DEVICES:
return dev
+ def getDiskDevices(self):
+ return self._devices[DISK_DEVICES]
+
+ def getNicDevices(self):
+ return self._devices[NIC_DEVICES]
+
+ def getBalloonDevicesConf(self):
+ for dev in self.conf['devices']:
+ if dev['type'] == BALLOON_DEVICES:
+ yield dev
+
+
+class LiveMergeCleanupThread(threading.Thread):
+ def __init__(self, vm, jobId, drive):
+ threading.Thread.__init__(self)
+ self.setDaemon(True)
+ self.vm = vm
+ self.jobId = jobId
+ self.drive = drive
+ self.success = False
+
+ @utils.traceback()
+ def run(self):
+ self.vm.log.info("Starting live merge cleanup for job %s",
+ self.jobId)
+ try:
+ self.vm.handleBlockJobEvent(self.jobId, self.drive, 'finished')
+ except Exception:
+ self.vm.log.warning("Cleanup failed for live merge job %s",
+ self.jobId)
+ raise
+ else:
+ self.success = True
+ self.vm.log.info("Cleanup completed for live merge job %s",
+ self.jobId)
+
+ def isSuccessful(self):
+ return self.success
+
def _getNetworkIp(network):
try:
--
To view, visit http://gerrit.ovirt.org/31049
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibbcdca4c0c0e45e9323ecfef9ce2fce10d8451a5
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: ovirt-3.5
Gerrit-Owner: Adam Litke <[email protected]>
Gerrit-Reviewer: Dan Kenigsberg <[email protected]>
Gerrit-Reviewer: Federico Simoncelli <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches