Adam Litke has uploaded a new change for review.

Change subject: Update v2v to use new jobs infrastructure
......................................................................

Update v2v to use new jobs infrastructure

With the general-purpose jobs infrastructure now in jobs.py, we can
remove redundant code in v2v.py by moving to the common implementation.
This does not change the v2v API to ovirt-engine at all.

Change-Id: I9118e0fe4aaeceab2109afa393dd45fbd97e070f
Signed-off-by: Adam Litke <[email protected]>
---
M vdsm/v2v.py
1 file changed, 24 insertions(+), 105 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/82/45382/1

diff --git a/vdsm/v2v.py b/vdsm/v2v.py
index e150b86..e7fa388 100644
--- a/vdsm/v2v.py
+++ b/vdsm/v2v.py
@@ -43,11 +43,10 @@
 from vdsm.utils import traceback, CommandPath, execCmd
 
 import caps
+import jobs
 
 
-_lock = threading.Lock()
-_jobs = {}
-
+_V2V_JOB_TYPE = 'v2v'
 _V2V_DIR = os.path.join(P_VDSM_RUN, 'v2v')
 _VIRT_V2V = CommandPath('virt-v2v', '/usr/bin/virt-v2v')
 _OVF_RESOURCE_CPU = 3
@@ -65,27 +64,17 @@
 DiskProgress = namedtuple('DiskProgress', ['progress'])
 
 
-class STATUS:
+class V2VSTATUS(jobs.STATUS):
     '''
     STARTING: request granted and starting the import process
     COPYING_DISK: copying disk in progress
-    ABORTED: user initiated aborted
-    FAILED: error during import process
-    DONE: convert process successfully finished
     '''
     STARTING = 'starting'
     COPYING_DISK = 'copying_disk'
-    ABORTED = 'aborted'
-    FAILED = 'error'
-    DONE = 'done'
 
 
 class V2VError(Exception):
     ''' Base class for v2v errors '''
-
-
-class ClientError(Exception):
-    ''' Base class for client error '''
 
 
 class InvalidVMConfiguration(ValueError):
@@ -96,21 +85,21 @@
     ''' Error while parsing virt-v2v output '''
 
 
-class JobExistsError(ClientError):
+class JobExistsError(jobs.ClientError):
     ''' Job already exists in _jobs collection '''
     err_name = 'JobExistsError'
 
 
-class VolumeError(ClientError):
+class VolumeError(jobs.ClientError):
     ''' Error preparing volume '''
 
 
-class NoSuchJob(ClientError):
+class NoSuchJob(jobs.ClientError):
     ''' Job not exists in _jobs collection '''
     err_name = 'NoSuchJob'
 
 
-class JobNotDone(ClientError):
+class JobNotDone(jobs.ClientError):
     ''' Import process still in progress '''
     err_name = 'JobNotDone'
 
@@ -124,7 +113,7 @@
     ''' virt-v2v process had error in execution '''
 
 
-class InvalidInputError(ClientError):
+class InvalidInputError(jobs.ClientError):
     ''' Invalid input received '''
 
 
@@ -169,14 +158,14 @@
 def convert_external_vm(uri, username, password, vminfo, job_id, irs):
     job = ImportVm.from_libvirt(uri, username, password, vminfo, job_id, irs)
     job.start()
-    _add_job(job_id, job)
+    jobs.add(job)
     return {'status': doneCode}
 
 
 def convert_ova(ova_path, vminfo, job_id, irs):
     job = ImportVm.from_ova(ova_path, vminfo, job_id, irs)
     job.start()
-    _add_job(job_id, job)
+    jobs.add(job)
     return response.success()
 
 
@@ -198,10 +187,10 @@
 
 def get_converted_vm(job_id):
     try:
-        job = _get_job(job_id)
-        _validate_job_done(job)
+        job = jobs.get(job_id)
+        job.validate_done()
         ovf = _read_ovf(job_id)
-    except ClientError as e:
+    except jobs.ClientError as e:
         logging.info('Converted VM error %s', e)
         return errCode[e.err_name]
     except V2VError as e:
@@ -211,68 +200,15 @@
 
 
 def delete_job(job_id):
-    try:
-        job = _get_job(job_id)
-        _validate_job_finished(job)
-        _remove_job(job_id)
-    except ClientError as e:
-        logging.info('Cannot delete job, error: %s', e)
-        return errCode[e.err_name]
-    return {'status': doneCode}
+    return jobs.delete(job_id)
 
 
 def abort_job(job_id):
-    try:
-        job = _get_job(job_id)
-        job.abort()
-    except ClientError as e:
-        logging.info('Cannot abort job, error: %s', e)
-        return errCode[e.err_name]
-    return {'status': doneCode}
+    return jobs.abort(job_id)
 
 
 def get_jobs_status():
-    ret = {}
-    with _lock:
-        items = tuple(_jobs.items())
-    for job_id, job in items:
-        ret[job_id] = {
-            'status': job.status,
-            'description': job.description,
-            'progress': job.progress
-        }
-    return ret
-
-
-def _add_job(job_id, job):
-    with _lock:
-        if job_id in _jobs:
-            raise JobExistsError("Job %r exists" % job_id)
-        _jobs[job_id] = job
-
-
-def _get_job(job_id):
-    with _lock:
-        if job_id not in _jobs:
-            raise NoSuchJob("No such job %r" % job_id)
-        return _jobs[job_id]
-
-
-def _remove_job(job_id):
-    with _lock:
-        if job_id not in _jobs:
-            raise NoSuchJob("No such job %r" % job_id)
-        del _jobs[job_id]
-
-
-def _validate_job_done(job):
-    if job.status != STATUS.DONE:
-        raise JobNotDone("Job %r is %s" % (job.id, job.status))
-
-
-def _validate_job_finished(job):
-    if job.status not in (STATUS.DONE, STATUS.FAILED, STATUS.ABORTED):
-        raise JobNotDone("Job %r is %s" % (job.id, job.status))
+    return jobs.info(_V2V_JOB_TYPE)
 
 
 def _read_ovf(job_id):
@@ -311,7 +247,8 @@
                               job_id, file_name)
 
 
-class ImportVm(object):
+class ImportVm(jobs.Job):
+    _JOB_TYPE = _V2V_JOB_TYPE
     TERM_DELAY = 30
     PROC_WAIT_TIMEOUT = 30
 
@@ -319,12 +256,11 @@
         '''
         do not use directly, use a factory method instead!
         '''
+        super(ImportVm, self).__init__(job_id)
         self._vminfo = vminfo
-        self._id = job_id
         self._irs = irs
 
-        self._status = STATUS.STARTING
-        self._description = ''
+        self._status = V2VSTATUS.STARTING
         self._disk_progress = 0
         self._disk_count = 1
         self._current_disk = 1
@@ -367,18 +303,6 @@
         t.start()
 
     @property
-    def id(self):
-        return self._id
-
-    @property
-    def status(self):
-        return self._status
-
-    @property
-    def description(self):
-        return self._description
-
-    @property
     def progress(self):
         '''
         progress is part of multiple disk_progress its
@@ -402,7 +326,7 @@
                 logging.debug("Job %r was aborted", self._id)
             else:
                 logging.exception("Job %r failed", self._id)
-                self._status = STATUS.FAILED
+                self._status = V2VSTATUS.FAILED
                 self._description = ex.message
                 try:
                     self._abort()
@@ -431,8 +355,8 @@
                                   (self._id, self._proc.returncode,
                                    self._proc.stderr.read(1024)))
 
-        if self._status != STATUS.ABORTED:
-            self._status = STATUS.DONE
+        if self._status != V2VSTATUS.ABORTED:
+            self._status = V2VSTATUS.DONE
             logging.info('Job %r finished import successfully', self._id)
 
     def _execution_environments(self):
@@ -453,7 +377,7 @@
         parser = OutputParser()
         for event in parser.parse(self._proc.stdout):
             if isinstance(event, ImportProgress):
-                self._status = STATUS.COPYING_DISK
+                self._status = V2VSTATUS.COPYING_DISK
                 logging.info("Job %r copying disk %d/%d",
                              self._id, event.current_disk, event.disk_count)
                 self._disk_progress = 0
@@ -503,11 +427,6 @@
                get_storage_domain_path(self._prepared_volumes[0]['path'])]
         cmd.extend(self._generate_disk_parameters())
         return cmd
-
-    def abort(self):
-        self._status = STATUS.ABORTED
-        logging.info('Job %r aborting...', self._id)
-        self._abort()
 
     def _abort(self):
         self._aborted = True


-- 
To view, visit https://gerrit.ovirt.org/45382
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I9118e0fe4aaeceab2109afa393dd45fbd97e070f
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Adam Litke <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to