Shahar Havivi has uploaded a new change for review. Change subject: v2v: use new jobs infrastructure ......................................................................
v2v: use new jobs infrastructure Change-Id: Iac71559e02502580de9c2e537733ec7286682050 Signed-off-by: Shahar Havivi <[email protected]> --- M tests/v2vTests.py M vdsm/v2v.py 2 files changed, 25 insertions(+), 129 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/58/52858/1 diff --git a/tests/v2vTests.py b/tests/v2vTests.py index b3988c9..677ba25 100644 --- a/tests/v2vTests.py +++ b/tests/v2vTests.py @@ -29,7 +29,7 @@ from testlib import namedTemporaryDir, permutations, expandPermutations import v2v -from vdsm import libvirtconnection +from vdsm import jobs, libvirtconnection from vdsm.password import ProtectedPassword from vdsm.commands import execCmd from vdsm.utils import CommandPath @@ -290,7 +290,7 @@ self._vms = [MockVirDomain(*spec) for spec in VM_SPECS] def tearDown(self): - v2v._jobs.clear() + jobs._clear() def testGetExternalVMs(self): if not v2v.supported(): @@ -462,7 +462,7 @@ def testSuccessfulImportOVA(self): with temporary_ovf_dir() as ovapath: v2v.convert_ova(ovapath, self.vminfo, self.job_id, FakeIRS()) - job = v2v._jobs[self.job_id] + job = jobs.get(self.job_id) job.wait() self.assertEqual(job.status, v2v.STATUS.DONE) @@ -501,7 +501,7 @@ self.vminfo, self.job_id, FakeIRS()) - job = v2v._jobs[self.job_id] + job = jobs.get(self.job_id) job.wait() self.assertEqual(job.status, v2v.STATUS.DONE) diff --git a/vdsm/v2v.py b/vdsm/v2v.py index 0574fd3..23b5cf3 100644 --- a/vdsm/v2v.py +++ b/vdsm/v2v.py @@ -32,7 +32,6 @@ import re import signal import tarfile -import threading import xml.etree.ElementTree as ET import zipfile @@ -41,15 +40,14 @@ from vdsm.commands import execCmd from vdsm.constants import P_VDSM_RUN from vdsm.define import errCode, doneCode -from vdsm import libvirtconnection, response, concurrent +from vdsm import jobs, libvirtconnection, response, concurrent from vdsm.infra import zombiereaper from vdsm.utils import traceback, CommandPath, NICENESS, IOCLASS import caps -_lock = threading.Lock() -_jobs = {} +_V2V_JOB_TYPE = 'v2v' _V2V_DIR = os.path.join(P_VDSM_RUN, 'v2v') _VIRT_V2V = CommandPath('virt-v2v', '/usr/bin/virt-v2v') @@ -72,27 +70,17 @@ DiskProgress = namedtuple('DiskProgress', ['progress']) -class STATUS: +class STATUS(jobs.STATUS): ''' STARTING: request granted and starting the import process COPYING_DISK: copying disk in progress - ABORTED: user initiated aborted - FAILED: error during import process - DONE: convert process successfully finished ''' STARTING = 'starting' COPYING_DISK = 'copying_disk' - ABORTED = 'aborted' - FAILED = 'error' - DONE = 'done' class V2VError(Exception): ''' Base class for v2v errors ''' - - -class ClientError(Exception): - ''' Base class for client error ''' class InvalidVMConfiguration(ValueError): @@ -103,23 +91,8 @@ ''' Error while parsing virt-v2v output ''' -class JobExistsError(ClientError): - ''' Job already exists in _jobs collection ''' - err_name = 'JobExistsError' - - -class VolumeError(ClientError): +class VolumeError(jobs.ClientError): ''' Error preparing volume ''' - - -class NoSuchJob(ClientError): - ''' Job not exists in _jobs collection ''' - err_name = 'NoSuchJob' - - -class JobNotDone(ClientError): - ''' Import process still in progress ''' - err_name = 'JobNotDone' class NoSuchOvf(V2VError): @@ -131,7 +104,7 @@ ''' virt-v2v process had error in execution ''' -class InvalidInputError(ClientError): +class InvalidInputError(jobs.ClientError): ''' Invalid input received ''' @@ -167,7 +140,7 @@ command = LibvirtCommand(uri, username, password, vminfo, job_id, irs) job = ImportVm(job_id, command) job.start() - _add_job(job_id, job) + jobs.add(job) return {'status': doneCode} @@ -175,7 +148,7 @@ command = OvaCommand(ova_path, vminfo, job_id, irs) job = ImportVm(job_id, command) job.start() - _add_job(job_id, job) + jobs.add(job) return response.success() @@ -197,12 +170,12 @@ def get_converted_vm(job_id): try: - job = _get_job(job_id) - _validate_job_done(job) + job = jobs.get(job_id) + job.validate_done() ovf = _read_ovf(job_id) - except ClientError as e: + except jobs.ClientError as e: logging.info('Converted VM error %s', e) - return errCode[e.err_name] + return errCode[e.name] except V2VError as e: logging.error('Converted VM error %s', e) return errCode[e.err_name] @@ -210,68 +183,7 @@ def delete_job(job_id): - try: - job = _get_job(job_id) - _validate_job_finished(job) - _remove_job(job_id) - except ClientError as e: - logging.info('Cannot delete job, error: %s', e) - return errCode[e.err_name] - return {'status': doneCode} - - -def abort_job(job_id): - try: - job = _get_job(job_id) - job.abort() - except ClientError as e: - logging.info('Cannot abort job, error: %s', e) - return errCode[e.err_name] - return {'status': doneCode} - - -def get_jobs_status(): - ret = {} - with _lock: - items = tuple(_jobs.items()) - for job_id, job in items: - ret[job_id] = { - 'status': job.status, - 'description': job.description, - 'progress': job.progress - } - return ret - - -def _add_job(job_id, job): - with _lock: - if job_id in _jobs: - raise JobExistsError("Job %r exists" % job_id) - _jobs[job_id] = job - - -def _get_job(job_id): - with _lock: - if job_id not in _jobs: - raise NoSuchJob("No such job %r" % job_id) - return _jobs[job_id] - - -def _remove_job(job_id): - with _lock: - if job_id not in _jobs: - raise NoSuchJob("No such job %r" % job_id) - del _jobs[job_id] - - -def _validate_job_done(job): - if job.status != STATUS.DONE: - raise JobNotDone("Job %r is %s" % (job.id, job.status)) - - -def _validate_job_finished(job): - if job.status not in (STATUS.DONE, STATUS.FAILED, STATUS.ABORTED): - raise JobNotDone("Job %r is %s" % (job.id, job.status)) + return jobs.delete(job_id) def _read_ovf(job_id): @@ -555,17 +467,17 @@ return env -class ImportVm(object): +class ImportVm(jobs.Job): + _JOB_TYPE = _V2V_JOB_TYPE TERM_DELAY = 30 PROC_WAIT_TIMEOUT = 30 def __init__(self, job_id, command): - self._id = job_id + super(ImportVm, self).__init__(job_id) self._command = command self._thread = None self._status = STATUS.STARTING - self._description = '' self._disk_progress = 0 self._disk_count = 1 self._current_disk = 1 @@ -579,18 +491,6 @@ def wait(self): if self._thread is not None and self._thread.is_alive(): self._thread.join() - - @property - def id(self): - return self._id - - @property - def status(self): - return self._status - - @property - def description(self): - return self._description @property def progress(self): @@ -667,11 +567,6 @@ else: raise RuntimeError("Job %r got unexpected parser event: %s" % (self._id, event)) - - def abort(self): - self._status = STATUS.ABORTED - logging.info('Job %r aborting...', self._id) - self._abort() def _abort(self): self._aborted = True @@ -910,8 +805,8 @@ return _read_ovf_from_zip_ova(ova_path) elif tarfile.is_tarfile(ova_path): return _read_ovf_from_tar_ova(ova_path) - raise ClientError('Unknown ova format, supported formats:' - ' tar, zip or a directory') + raise jobs.ClientError('Unknown ova format, supported formats:' + ' tar, zip or a directory') def _find_ovf(entries): @@ -927,7 +822,8 @@ if name is not None: with open(os.path.join(ova_path, name), 'r') as ovf_file: return ovf_file.read() - raise ClientError('OVA directory %s does not contain ovf file' % ova_path) + raise jobs.ClientError('OVA directory %s does not contain ovf file' % + ova_path) def _read_ovf_from_zip_ova(ova_path): @@ -936,7 +832,7 @@ name = _find_ovf(zf.namelist()) if name is not None: return zf.read(name) - raise ClientError('OVA does not contains file with .ovf suffix') + raise jobs.ClientError('OVA does not contains file with .ovf suffix') def _read_ovf_from_tar_ova(ova_path): @@ -945,7 +841,7 @@ if member.name.endswith('.ovf'): with closing(tar.extractfile(member)) as ovf: return ovf.read() - raise ClientError('OVA does not contains file with .ovf suffix') + raise jobs.ClientError('OVA does not contains file with .ovf suffix') def _add_general_ovf_info(vm, node, ns): -- To view, visit https://gerrit.ovirt.org/52858 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Iac71559e02502580de9c2e537733ec7286682050 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Shahar Havivi <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
