Adam Litke has uploaded a new change for review. Change subject: Live Merge: Simplify pivot flow ......................................................................
Live Merge: Simplify pivot flow The pivot part of a Live Merge can be made a synchronous operation which simplifies our flows. We're already doing cleanup in a thread so just do the pivot and wait for it (very short) to complete and then continue directly to the volume chain sync part of the cleanup. This is much simpler than doing the pivot and waiting another sample interval to try the sync portion. Change-Id: I5e6a9ea7096b5d81f418754d411f135450bf572e Signed-off-by: Adam Litke <[email protected]> --- M vdsm/virt/vm.py 1 file changed, 44 insertions(+), 26 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/49/31849/1 diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py index 4cbcd2c..5890d6a 100644 --- a/vdsm/virt/vm.py +++ b/vdsm/virt/vm.py @@ -5356,8 +5356,8 @@ return False def queryBlockJobs(self): - def startCleanup(job, drive): - t = LiveMergeCleanupThread(self, job['jobID'], drive) + def startCleanup(job, drive, mode): + t = LiveMergeCleanupThread(self, job['jobID'], drive, mode) t.start() self._liveMergeCleanupThreads[job['jobID']] = t @@ -5370,9 +5370,9 @@ jobID = storedJob['jobID'] cleanThread = self._liveMergeCleanupThreads.get(jobID) if cleanThread and cleanThread.isSuccessful(): - # Handle successfully cleaned jobs early because the job - # just needs to be untracked and the stored disk info might - # be stale anyway (ie. after active layer commit). + # Handle successful jobs early because the job just needs + # to be untracked and the stored disk info might be stale + # anyway (ie. after active layer commit). self.untrackBlockJob(jobID) continue @@ -5391,32 +5391,31 @@ jobsRet[jobID] = entry continue + mode = None if liveInfo: entry['bandwidth'] = liveInfo['bandwidth'] entry['cur'] = str(liveInfo['cur']) entry['end'] = str(liveInfo['end']) if self._activeLayerCommitReady(liveInfo): - try: - self.handleBlockJobEvent(jobID, drive, 'pivot') - except Exception: - # Just log it. We will retry next time - self.log.error("Pivot failed for job %s", jobID) + mode = LiveMergeCleanupThread.MODE_PIVOT else: # Libvirt has stopped reporting this job so we know it will # never report it again. + mode = LiveMergeCleanupThread.MODE_CLEANUP storedJob['gone'] = True + if mode: if not cleanThread: # There is no cleanup thread so the job must have just # ended. Spawn an async cleanup. - startCleanup(storedJob, drive) + startCleanup(storedJob, drive, mode) elif cleanThread.isAlive(): # Let previously started cleanup thread continue self.log.debug("Still waiting for block job %s to be " - "cleaned up", jobID) + "synchronized", jobID) elif not cleanThread.isSuccessful(): # At this point we know the thread is not alive and the # cleanup failed. Retry it with a new thread. - startCleanup(storedJob, drive) + startCleanup(storedJob, drive, mode) jobsRet[jobID] = entry return jobsRet @@ -5618,14 +5617,26 @@ device['volumeChain'] = drive.volumeChain = newChain def handleBlockJobEvent(self, jobID, drive, mode): - if mode == 'finished': - self.log.info("Live merge job completed (job %s)", jobID) - self._syncVolumeChain(drive) - elif mode == 'pivot': + if mode == LiveMergeCleanupThread.MODE_PIVOT: + # We call imageSyncVolumeChain which will mark the current leaf + # ILLEGAL. We do this before requesting a pivot so that we can + # properly recover the VM in case we crash. At this point the + # active layer contains the same data as its parent so the ILLEGAL + # flag indicates that the VM should be restarted using the parent. + newVols = [vol['volumeID'] for vol in drive.volumeChain + if vol['volumeID'] != drive.volumeID] + self.cif.irs.imageSyncVolumeChain(drive.domainID, drive.imageID, + drive['volumeID'], newVols) + self.log.info("Requesting pivot to complete active layer commit " "(job %s)", jobID) flags = libvirt.VIR_DOMAIN_BLOCK_JOB_ABORT_PIVOT - self._dom.blockJobAbort(drive.name, flags) + if self._dom.blockJobAbort(drive.name, flags) != 0: + raise RuntimeError("pivot failed") + if mode in (LiveMergeCleanupThread.MODE_CLEANUP, + LiveMergeCleanupThread.MODE_PIVOT): + self.log.info("Live merge job completed (job %s)", jobID) + self._syncVolumeChain(drive) else: raise RuntimeError("Invalid mode: '%s'" % mode) @@ -5666,30 +5677,37 @@ class LiveMergeCleanupThread(threading.Thread): - def __init__(self, vm, jobId, drive): + MODE_CLEANUP = 'cleanup' + MODE_PIVOT = 'pivot' + + def __init__(self, vm, jobId, drive, mode): threading.Thread.__init__(self) self.setDaemon(True) self.vm = vm self.jobId = jobId self.drive = drive + self.mode = mode self.success = False @utils.traceback() def run(self): - self.vm.log.info("Starting live merge cleanup for job %s", - self.jobId) + self.vm.log.info("Starting live merge %s for job %s", + self.mode, self.jobId) try: - self.vm.handleBlockJobEvent(self.jobId, self.drive, 'finished') + self.vm.handleBlockJobEvent(self.jobId, self.drive, self.mode) except Exception: - self.vm.log.warning("Cleanup failed for live merge job %s", - self.jobId) + self.vm.log.warning("%s failed for live merge job %s", + self.mode, self.jobId) raise else: self.success = True - self.vm.log.info("Cleanup completed for live merge job %s", - self.jobId) + self.vm.log.info("%s completed for live merge job %s", + self.mode, self.jobId) def isSuccessful(self): + """ + Returns True if this phase completed successfully. + """ return self.success -- To view, visit http://gerrit.ovirt.org/31849 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I5e6a9ea7096b5d81f418754d411f135450bf572e Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Adam Litke <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
