Adam Litke has uploaded a new change for review. Change subject: Update v2v to use new jobs infrastructure ......................................................................
Update v2v to use new jobs infrastructure With the general-purpose jobs infrastructure now in jobs.py, we can remove redundant code in v2v.py by moving to the common implementation. This does not change the v2v API to ovirt-engine at all. Change-Id: I9118e0fe4aaeceab2109afa393dd45fbd97e070f Signed-off-by: Adam Litke <[email protected]> --- M vdsm/v2v.py 1 file changed, 24 insertions(+), 105 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/82/45382/1 diff --git a/vdsm/v2v.py b/vdsm/v2v.py index e150b86..e7fa388 100644 --- a/vdsm/v2v.py +++ b/vdsm/v2v.py @@ -43,11 +43,10 @@ from vdsm.utils import traceback, CommandPath, execCmd import caps +import jobs -_lock = threading.Lock() -_jobs = {} - +_V2V_JOB_TYPE = 'v2v' _V2V_DIR = os.path.join(P_VDSM_RUN, 'v2v') _VIRT_V2V = CommandPath('virt-v2v', '/usr/bin/virt-v2v') _OVF_RESOURCE_CPU = 3 @@ -65,27 +64,17 @@ DiskProgress = namedtuple('DiskProgress', ['progress']) -class STATUS: +class V2VSTATUS(jobs.STATUS): ''' STARTING: request granted and starting the import process COPYING_DISK: copying disk in progress - ABORTED: user initiated aborted - FAILED: error during import process - DONE: convert process successfully finished ''' STARTING = 'starting' COPYING_DISK = 'copying_disk' - ABORTED = 'aborted' - FAILED = 'error' - DONE = 'done' class V2VError(Exception): ''' Base class for v2v errors ''' - - -class ClientError(Exception): - ''' Base class for client error ''' class InvalidVMConfiguration(ValueError): @@ -96,21 +85,21 @@ ''' Error while parsing virt-v2v output ''' -class JobExistsError(ClientError): +class JobExistsError(jobs.ClientError): ''' Job already exists in _jobs collection ''' err_name = 'JobExistsError' -class VolumeError(ClientError): +class VolumeError(jobs.ClientError): ''' Error preparing volume ''' -class NoSuchJob(ClientError): +class NoSuchJob(jobs.ClientError): ''' Job not exists in _jobs collection ''' err_name = 'NoSuchJob' -class JobNotDone(ClientError): +class JobNotDone(jobs.ClientError): ''' Import process still in progress ''' err_name = 'JobNotDone' @@ -124,7 +113,7 @@ ''' virt-v2v process had error in execution ''' -class InvalidInputError(ClientError): +class InvalidInputError(jobs.ClientError): ''' Invalid input received ''' @@ -169,14 +158,14 @@ def convert_external_vm(uri, username, password, vminfo, job_id, irs): job = ImportVm.from_libvirt(uri, username, password, vminfo, job_id, irs) job.start() - _add_job(job_id, job) + jobs.add(job) return {'status': doneCode} def convert_ova(ova_path, vminfo, job_id, irs): job = ImportVm.from_ova(ova_path, vminfo, job_id, irs) job.start() - _add_job(job_id, job) + jobs.add(job) return response.success() @@ -198,10 +187,10 @@ def get_converted_vm(job_id): try: - job = _get_job(job_id) - _validate_job_done(job) + job = jobs.get(job_id) + job.validate_done() ovf = _read_ovf(job_id) - except ClientError as e: + except jobs.ClientError as e: logging.info('Converted VM error %s', e) return errCode[e.err_name] except V2VError as e: @@ -211,68 +200,15 @@ def delete_job(job_id): - try: - job = _get_job(job_id) - _validate_job_finished(job) - _remove_job(job_id) - except ClientError as e: - logging.info('Cannot delete job, error: %s', e) - return errCode[e.err_name] - return {'status': doneCode} + return jobs.delete(job_id) def abort_job(job_id): - try: - job = _get_job(job_id) - job.abort() - except ClientError as e: - logging.info('Cannot abort job, error: %s', e) - return errCode[e.err_name] - return {'status': doneCode} + return jobs.abort(job_id) def get_jobs_status(): - ret = {} - with _lock: - items = tuple(_jobs.items()) - for job_id, job in items: - ret[job_id] = { - 'status': job.status, - 'description': job.description, - 'progress': job.progress - } - return ret - - -def _add_job(job_id, job): - with _lock: - if job_id in _jobs: - raise JobExistsError("Job %r exists" % job_id) - _jobs[job_id] = job - - -def _get_job(job_id): - with _lock: - if job_id not in _jobs: - raise NoSuchJob("No such job %r" % job_id) - return _jobs[job_id] - - -def _remove_job(job_id): - with _lock: - if job_id not in _jobs: - raise NoSuchJob("No such job %r" % job_id) - del _jobs[job_id] - - -def _validate_job_done(job): - if job.status != STATUS.DONE: - raise JobNotDone("Job %r is %s" % (job.id, job.status)) - - -def _validate_job_finished(job): - if job.status not in (STATUS.DONE, STATUS.FAILED, STATUS.ABORTED): - raise JobNotDone("Job %r is %s" % (job.id, job.status)) + return jobs.info(_V2V_JOB_TYPE) def _read_ovf(job_id): @@ -311,7 +247,8 @@ job_id, file_name) -class ImportVm(object): +class ImportVm(jobs.Job): + _JOB_TYPE = _V2V_JOB_TYPE TERM_DELAY = 30 PROC_WAIT_TIMEOUT = 30 @@ -319,12 +256,11 @@ ''' do not use directly, use a factory method instead! ''' + super(ImportVm, self).__init__(job_id) self._vminfo = vminfo - self._id = job_id self._irs = irs - self._status = STATUS.STARTING - self._description = '' + self._status = V2VSTATUS.STARTING self._disk_progress = 0 self._disk_count = 1 self._current_disk = 1 @@ -367,18 +303,6 @@ t.start() @property - def id(self): - return self._id - - @property - def status(self): - return self._status - - @property - def description(self): - return self._description - - @property def progress(self): ''' progress is part of multiple disk_progress its @@ -402,7 +326,7 @@ logging.debug("Job %r was aborted", self._id) else: logging.exception("Job %r failed", self._id) - self._status = STATUS.FAILED + self._status = V2VSTATUS.FAILED self._description = ex.message try: self._abort() @@ -431,8 +355,8 @@ (self._id, self._proc.returncode, self._proc.stderr.read(1024))) - if self._status != STATUS.ABORTED: - self._status = STATUS.DONE + if self._status != V2VSTATUS.ABORTED: + self._status = V2VSTATUS.DONE logging.info('Job %r finished import successfully', self._id) def _execution_environments(self): @@ -453,7 +377,7 @@ parser = OutputParser() for event in parser.parse(self._proc.stdout): if isinstance(event, ImportProgress): - self._status = STATUS.COPYING_DISK + self._status = V2VSTATUS.COPYING_DISK logging.info("Job %r copying disk %d/%d", self._id, event.current_disk, event.disk_count) self._disk_progress = 0 @@ -503,11 +427,6 @@ get_storage_domain_path(self._prepared_volumes[0]['path'])] cmd.extend(self._generate_disk_parameters()) return cmd - - def abort(self): - self._status = STATUS.ABORTED - logging.info('Job %r aborting...', self._id) - self._abort() def _abort(self): self._aborted = True -- To view, visit https://gerrit.ovirt.org/45382 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I9118e0fe4aaeceab2109afa393dd45fbd97e070f Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Adam Litke <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
