Adam Litke has uploaded a new change for review. Change subject: Live Merge: Ignore libvirt block job events ......................................................................
Live Merge: Ignore libvirt block job events Since VDSM needs to handle missed block job completion events that may have been emitted while the daemon was stopped, it is best to not rely on the events at all and initiate all cleanup operations from the existing polling function. Since a volume chain sync can be a relatively expensive operation involving HSM calls, run it in a separate thread. Depending on the current state of the storage, the sync may need to be retried. Change-Id: Ibbcdca4c0c0e45e9323ecfef9ce2fce10d8451a5 Signed-off-by: Adam Litke <[email protected]> --- M vdsm/clientIF.py M vdsm/virt/vm.py 2 files changed, 88 insertions(+), 78 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/46/30046/1 diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py index d5372f3..a2df805 100644 --- a/vdsm/clientIF.py +++ b/vdsm/clientIF.py @@ -608,9 +608,6 @@ elif eventid == libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG: action, = args[:-1] v._onWatchdogEvent(action) - elif eventid == libvirt.VIR_DOMAIN_EVENT_ID_BLOCK_JOB: - path, jobType, status = args[:-1] - v._onBlockJobEvent(path, jobType, status) else: v.log.warning('unknown eventid %s args %s', eventid, args) except: diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py index 5a36a66..5dab406 100644 --- a/vdsm/virt/vm.py +++ b/vdsm/virt/vm.py @@ -1800,11 +1800,6 @@ (SMARTCARD_DEVICES, SmartCardDevice), (TPM_DEVICES, TpmDevice)) - BlockJobTypeMap = {libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_UNKNOWN: 'unknown', - libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_PULL: 'pull', - libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_COPY: 'copy', - libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_COMMIT: 'commit'} - def _makeDeviceDict(self): return dict((dev, []) for dev, _ in self.DeviceMapping) @@ -5588,7 +5583,7 @@ except LookupError: newJob = {'jobID': jobID, 'disk': driveSpec, 'baseVolume': base, 'topVolume': top, - 'strategy': strategy, + 'strategy': strategy, 'blockJobType': 'commit', 'chain': self._driveGetActualVolumeChain(drive)} self.conf['_blockJobs'][jobID] = newJob else: @@ -5610,30 +5605,42 @@ return True def queryBlockJobs(self): - jobs = {} - for jobID, job in self.conf['_blockJobs'].iteritems(): - drive = self._findDriveByUUIDs(job['disk']) - ret = self._dom.blockJobInfo(drive.name, 0) - if not ret: - self.log.info("Block Job %s has ended", jobID) - jobs[jobID] = None - continue - - jobs[jobID] = {'id': jobID, 'jobType': 'block', - 'blockJobType': Vm.BlockJobTypeMap[ret['type']], - 'bandwidth': ret['bandwidth'], - 'cur': str(ret['cur']), 'end': str(ret['end']), - 'imgUUID': job['disk']['imageID']} - - # This function is meant to be called from multiple threads (ie. - # VMStatsThread and API calls. The _jobsLock ensures that a cohesive - # data set is returned by serializing each call. + jobsRet = {} + # We need to take the jobs lock here to ensure that we don't race with + # another call to merge() where the job has been recorded but not yet + # started. with self._jobsLock: - for jobID in jobs.keys(): - if jobs[jobID] is None: - self.untrackBlockJob(jobID) - del jobs[jobID] - return jobs + for storedJob in self.conf['_blockJobs'].values(): + jobID = storedJob['jobID'] + drive = self._findDriveByUUIDs(storedJob['disk']) + entry = {'id': jobID, 'jobType': 'block', + 'blockJobType': storedJob['blockJobType'], + 'bandwidth': 0, 'cur': '0', 'end': '0', + 'imgUUID': storedJob['disk']['imageID']} + + if 'done' in storedJob: + self.log.debug("Still waiting for block job %s to be " + "cleaned up", jobID) + jobsRet[jobID] = entry + continue + + liveInfo = None + try: + liveInfo = self._dom.blockJobInfo(drive.name, 0) + except libvirt.libvirtError: + self.log.exception("Error getting block job info") + + if liveInfo: + entry['bandwidth'] = liveInfo['bandwidth'] + entry['cur'] = str(liveInfo['cur']) + entry['end'] = str(liveInfo['end']) + else: + self.log.info("Block Job %s has ended", jobID) + storedJob['done'] = True + LiveMergeCleanupThread(self, jobID, drive, + mode='finished').start() + jobsRet[jobID] = entry + return jobsRet def merge(self, driveSpec, baseVolUUID, topVolUUID, bandwidth, jobUUID): if not caps.getLiveMergeSupport(): @@ -5685,33 +5692,26 @@ # visible from any host even if the mountpoint is different. flags = libvirt.VIR_DOMAIN_BLOCK_COMMIT_RELATIVE - try: - self.trackBlockJob(jobUUID, drive, baseVolUUID, topVolUUID, - 'commit') - except BlockJobExistsError: - self.log.error("Another block job is already active on this disk") - return errCode['mergeErr'] - self.log.debug("Starting merge with jobUUI + # Take the jobs lock here to protect the new job we are tracking from + # being cleaned up by queryBlockJobs() since it won't exist right away + with self._jobsLock: + try: + self.trackBlockJob(jobUUID, drive, baseVolUUID, topVolUUID, + 'commit') + except BlockJobExistsError: + self.log.error("A block job is already active on this disk") + return errCode['mergeErr'] + self.log.info("Starting merge with jobUUID='%s'", jobUUID) - flags = 0 - # Indicate that we expect libvirt to maintain the relative paths of - # backing files. This is necessary to ensure that a volume chain is - # visible from any host even if the mountpoint is different. - try: - flags |= libvirt.VIR_DOMAIN_BLOCK_COMMIT_RELATIVE - except AttributeError: - self.log.error("Libvirt missing VIR_DOMAIN_BLOCK_COMMIT_RELATIVE. " - "Unable to perform live merge.") - return errCode['mergeErr'] - try: - ret = self._dom.blockCommit(drive.path, base, top, bandwidth, - flags) - if ret != 0: - raise RuntimeError("blockCommit operation failed rc:%i", ret) - except (RuntimeError, libvirt.libvirtError): - self.log.exception("Live merge failed (job: %s)", jobUUID) - self.untrackBlockJob(jobUUID) - return errCode['mergeErr'] + try: + ret = self._dom.blockCommit(drive.path, base, top, bandwidth, + flags) + if ret != 0: + raise RuntimeError("blockCommit failed rc:%i", ret) + except (RuntimeError, libvirt.libvirtError): + self.log.exception("Live merge failed (job: %s)", jobUUID) + self.untrackBlockJob(jobUUID) + return errCode['mergeErr'] # blockCommit will cause data to be written into the base volume. # Perform an initial extension to ensure there is enough space to @@ -5816,27 +5816,18 @@ if x['volumeID'] in volumes] device['volumeChain'] = drive.volumeChain = newChain - def _onBlockJobEvent(self, path, blockJobType, status): - if blockJobType != libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_COMMIT: - self.log.warning("Ignoring unrecognized block job type: '%s'", - blockJobType) - return - - drive = self._lookupDeviceByPath(path) - try: - jobID = self.getBlockJob(drive)['jobID'] - except LookupError: - self.log.debug("Ignoring event for untracked block job on path " - "'%s'", path) - return - - if status == libvirt.VIR_DOMAIN_BLOCK_JOB_COMPLETED: - self.log.info("Live merge completed (job:%s)", jobID) - self._syncVolumeChain(drive) + def handleBlockJobEvent(self, jobID, drive, mode): + if mode == 'finished': + self.log.info("Live merge job completed (job %s)", jobID) + try: + self._syncVolumeChain(drive) + self.untrackBlockJob(jobID) + except Exception: + self.log.exception("Live merge cleanup failed") + return False else: - self.log.warning("Block job %s did not complete successfully " - "(status:%i)", jobID, status) - self.untrackBlockJob(jobID) + raise RuntimeError("Invalid mode: '%s'" % mode) + return True def _initLegacyConf(self): self.conf['displayPort'] = GraphicsDevice.LIBVIRT_PORT_AUTOSELECT @@ -5874,6 +5865,28 @@ yield dev +class LiveMergeCleanupThread(threading.Thread): + def __init__(self, vm, jobId, drive, mode): + threading.Thread.__init__(self) + self.vm = vm + self.jobId = jobId + self.drive = drive + self.mode = mode + + def run(self): + ret = False + while not ret: + self.vm.log.info("Starting live merge cleanup for job %s", + self.jobId) + ret = self.vm.handleBlockJobEvent(self.jobId, self.drive, + self.mode) + if not ret: + self.vm.log.warning("live merge cleanup failed for job %s " + "will retry", self.jobId) + time.sleep(10) + self.vm.log.info("Cleanup completed for live merge job %s", self.jobId) + + def _getNetworkIp(network): try: nets = netinfo.networks() -- To view, visit http://gerrit.ovirt.org/30046 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ibbcdca4c0c0e45e9323ecfef9ce2fce10d8451a5 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Adam Litke <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
