Shahar Havivi has uploaded a new change for review.

Change subject: v2v: Convert VM from external source to Data Domain
......................................................................

v2v: Convert VM from external source to Data Domain

new verb: convert VM from external source (non-kvm) to Data Domain.
The convertVmFromExternalSystem() is implemented in v2v module and use
the virt-v2v external tool to do actual VM conversion.

Change-Id: I34bd86d5a87ea8c42113c4a732f87ddd4ceab9ea
Signed-off-by: Shahar Havivi <[email protected]>
---
M client/vdsClient.py
M configure.ac
M debian/vdsm.dirs
M debian/vdsm.postinst
M lib/vdsm/constants.py.in
M lib/vdsm/define.py
M vdsm.spec.in
M vdsm/API.py
M vdsm/Makefile.am
M vdsm/clientIF.py
M vdsm/rpc/BindingXMLRPC.py
M vdsm/rpc/Bridge.py
M vdsm/rpc/vdsmapi-schema.json
M vdsm/sudoers.vdsm.in
M vdsm/v2v.py
M vdsm/virt/sampling.py
16 files changed, 377 insertions(+), 1 deletion(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/09/37509/1

diff --git a/client/vdsClient.py b/client/vdsClient.py
index a3e2f72..f65aef2 100644
--- a/client/vdsClient.py
+++ b/client/vdsClient.py
@@ -1911,6 +1911,29 @@
 
         return status['status']['code'], status['status']['message']
 
+    def convertVmFromExternalSystem(self, args):
+        if len(args) != 5:
+            raise ValueError('Wrong number of arguments')
+        uri, username, password, vmProperties, jobId = args
+        status = self.s.convertVmFromExternalSystem(uri, username, password,
+                                                    vmProperties, jobId)
+        if status['status']['code'] == 0:
+            print "v2v job started"
+        return status['status']['code'], status['status']['message']
+
+    def getConvertedVm(self, args):
+        if len(args) != 1:
+            raise ValueError('Wrong number of arguments')
+        status = self.s.getConvertedVm(args[0])
+        if status['status']['code'] == 0:
+            print status['ovf']
+        return status['status']['code'], status['status']['message']
+
+    def abortV2VJob(self, args):
+        if len(args) != 1:
+            raise ValueError('Wrong number of arguments')
+        status = self.s.abortV2VJob(args)
+        return status['status']['code'], status['status']['message']
 
 if __name__ == '__main__':
     if _glusterEnabled:
@@ -2760,6 +2783,21 @@
                 '<uri> <username> <password>',
                 'get VMs from external hypervisor'
             )),
+        'convertVmFromExternalSystem': (
+            serv.convertVmFromExternalSystem, (
+                '<uri> <username> <password> <vmProperties> <jobId>',
+                'Import and convert VM from external system'
+            )),
+        'getConvertedVm': (
+            serv.getConvertedVm, (
+                '<jobId>',
+                'Return converted VMs OVF'
+            )),
+        'abortV2VJob': (
+            serv.abortV2VJob, (
+                '<jobId>',
+                'Abort running V2V convert process'
+            )),
     }
     if _glusterEnabled:
         commands.update(ge.getGlusterCmdDict(serv))
diff --git a/configure.ac b/configure.ac
index c3dfea4..a8507fd 100644
--- a/configure.ac
+++ b/configure.ac
@@ -335,6 +335,7 @@
 AC_PATH_PROG([TUNE2FS_PATH], [tune2fs], [/sbin/tune2fs])
 AC_PATH_PROG([UDEVADM_PATH], [udevadm], [/sbin/udevadm])
 AC_PATH_PROG([UMOUNT_PATH], [umount], [/bin/umount])
+AC_PATH_PROG([VIRT_V2V_PATH], [virt-v2v], [/bin/bin/virt-v2v])
 AC_PATH_PROG([WGET_PATH], [wget], [/usr/bin/wget])
 AC_PATH_PROG([YUM_PATH], [yum], [/usr/bin/yum])
 
diff --git a/debian/vdsm.dirs b/debian/vdsm.dirs
index 7b5db1b..9965a68 100644
--- a/debian/vdsm.dirs
+++ b/debian/vdsm.dirs
@@ -68,6 +68,7 @@
 var/log/vdsm/backup
 var/run/vdsm
 var/run/vdsm/payload
+var/run/vdsm/v2v
 var/run/vdsm/sourceRoutes
 var/run/vdsm/trackedInterfaces
 var/log/core
diff --git a/debian/vdsm.postinst b/debian/vdsm.postinst
index c00a1a6..5458974 100644
--- a/debian/vdsm.postinst
+++ b/debian/vdsm.postinst
@@ -61,6 +61,7 @@
         $INSTALLDIR /var/lib/vdsm/netconfback
         $INSTALLDIR /var/run/vdsm
         $INSTALLDIR /var/run/vdsm/payload
+        $INSTALLDIR /var/run/vdsm/v2v
         $INSTALLDIR /var/run/vdsm/sourceRoutes
         $INSTALLDIR /var/run/vdsm/trackedInterfaces
         $INSTALLDIR /var/log/vdsm
diff --git a/lib/vdsm/constants.py.in b/lib/vdsm/constants.py.in
index 89bdebd..6ee4f29 100644
--- a/lib/vdsm/constants.py.in
+++ b/lib/vdsm/constants.py.in
@@ -155,6 +155,8 @@
 
 EXT_UMOUNT = '@UMOUNT_PATH@'
 
+EXT_VIRT_V2V = '@VIRT_V2V_PATH@'
+
 EXT_VDSM_RESTORE_NET_CONFIG = '@VDSMDIR@/vdsm-restore-net-config'
 EXT_VDSM_STORE_NET_CONFIG = '@VDSMDIR@/vdsm-store-net-config'
 
diff --git a/lib/vdsm/define.py b/lib/vdsm/define.py
index 34f423a..4c714b6 100644
--- a/lib/vdsm/define.py
+++ b/lib/vdsm/define.py
@@ -147,6 +147,12 @@
     'updateIoTuneErr': {'status': {
         'code': 64,
         'message': 'Failed to update ioTune values'}},
+    'invalidV2VJob': {'status': {
+        'code': 65,
+        'message': 'Job Id is not exists'}},
+    'invalidV2VOvfPath': {'status': {
+        'code': 66,
+        'message': 'OVF file is not exists'}},
     'recovery': {'status': {
         'code': 99,
         'message': 'Recovering from crash or Initializing'}},
diff --git a/vdsm.spec.in b/vdsm.spec.in
index 79a44c0..76f0171 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -1278,6 +1278,7 @@
 %dir %{_localstatedir}/run/%{vdsm_name}/sourceRoutes
 %dir %{_localstatedir}/run/%{vdsm_name}/trackedInterfaces
 %ghost %dir %{_localstatedir}/run/%{vdsm_name}/payload
+%dir %{_localstatedir}/run/%{vdsm_name}/v2v
 %dir %{_localstatedir}/log/%{vdsm_name}
 %dir %{_localstatedir}/log/%{vdsm_name}/backup
 
diff --git a/vdsm/API.py b/vdsm/API.py
index c8d2759..ee3a264 100644
--- a/vdsm/API.py
+++ b/vdsm/API.py
@@ -1403,6 +1403,19 @@
         vms = v2v.get_external_vms(uri, username, password)
         return {'status': doneCode, 'vmList': vms}
 
+    def convertVmFromExternalSystem(self, uri, username, password,
+                                    vmProperties, jobId):
+        v2v.convert(uri, username, password, vmProperties, jobId, self._cif)
+        return {'status': doneCode}
+
+    def getConvertedVm(self, jobId):
+        ovf = v2v.get_converted_vm(jobId)
+        return {'status': doneCode, 'ovf': ovf}
+
+    def abortJob(self, jobId):
+        v2v.abort_job(jobId)
+        return {'status': doneCode}
+
     # Networking-related functions
     def setupNetworks(self, networks, bondings, options):
         """Add a new network to this vds, replacing an old one."""
diff --git a/vdsm/Makefile.am b/vdsm/Makefile.am
index 6d7e4ee..fc98612 100644
--- a/vdsm/Makefile.am
+++ b/vdsm/Makefile.am
@@ -153,6 +153,7 @@
        $(MKDIR_P) $(DESTDIR)$(vdsmrundir)/sourceRoutes
        $(MKDIR_P) $(DESTDIR)$(vdsmrundir)/trackedInterfaces
        $(MKDIR_P) $(DESTDIR)$(vdsmrundir)/payload
+       $(MKDIR_P) $(DESTDIR)$(vdsmrundir)/v2v
        $(MKDIR_P) $(DESTDIR)$(vdsmlibdir)/netconfback
        $(MKDIR_P) $(DESTDIR)$(vdsmlibdir)/persistence
        $(MKDIR_P) $(DESTDIR)$(vdsmlibdir)/upgrade
diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py
index 1487d62..a61af26 100644
--- a/vdsm/clientIF.py
+++ b/vdsm/clientIF.py
@@ -39,6 +39,7 @@
 import caps
 import blkid
 import supervdsm
+import v2v
 from protocoldetector import MultiProtocolAcceptor
 
 from virt import migration
@@ -112,6 +113,7 @@
             self._createAcceptor(host, port)
             self._prepareXMLRPCBinding()
             self._prepareJSONRPCBinding()
+            v2v.kill_zombie_jobs()
         except:
             self.log.error('failed to init clientIF, '
                            'shutting down storage dispatcher')
diff --git a/vdsm/rpc/BindingXMLRPC.py b/vdsm/rpc/BindingXMLRPC.py
index a6b75f3..c6bb184 100644
--- a/vdsm/rpc/BindingXMLRPC.py
+++ b/vdsm/rpc/BindingXMLRPC.py
@@ -364,6 +364,20 @@
         api = API.Global()
         return api.getExternalVMs(uri, username, password)
 
+    def convertVmFromExternalSystem(self, uri, username, password,
+                                    vmProperties, jobId):
+        api = API.Global()
+        return api.convertVmFromExternalSystem(uri, username, password,
+                                               vmProperties, jobId)
+
+    def getConvertedVm(self, jobId):
+        api = API.Global()
+        return api.getConvertedVm(jobId)
+
+    def abortV2VJob(self, jobId):
+        api = API.Global()
+        return api.abortV2VJob(jobId)
+
     def vmPause(self, vmId):
         vm = API.VM(vmId)
         return vm.pause()
@@ -1031,7 +1045,11 @@
                 (self.vmUpdateVmPolicy, 'updateVmPolicy'),
                 (self.vmSetIoTune, 'setIoTune'),
                 (self.vmGetIoTunePolicy, 'getIoTunePolicy'),
-                (self.getExternalVMs, 'getExternalVMs'))
+                (self.getExternalVMs, 'getExternalVMs'),
+                (self.convertVmFromExternalSystem,
+                 'convertVmFromExternalSystem'),
+                (self.getConvertedVm, 'getConvertedVm'),
+                (self.abortV2VJob, 'abortV2VJob'))
 
     def getIrsMethods(self):
         return ((self.domainActivate, 'activateStorageDomain'),
diff --git a/vdsm/rpc/Bridge.py b/vdsm/rpc/Bridge.py
index f1f6f31..2a62145 100644
--- a/vdsm/rpc/Bridge.py
+++ b/vdsm/rpc/Bridge.py
@@ -406,6 +406,9 @@
     'Host_getDeviceList': {'ret': 'devList'},
     'Host_getDevicesVisibility': {'ret': 'visible'},
     'Host_getExternalVMs': {'ret': 'vmList'},
+    'Host_convertVmFromExternalSystem': {},
+    'Host_getConvertedVm': {'ret': 'ovf'},
+    'Host_abortV2VJob': {},
     'Host_getHardwareInfo': {'ret': 'info'},
     'Host_getLVMVolumeGroups': {'ret': 'vglist'},
     'Host_getRoute': {'ret': 'info'},
diff --git a/vdsm/rpc/vdsmapi-schema.json b/vdsm/rpc/vdsmapi-schema.json
index 40f4b98..dec8606 100644
--- a/vdsm/rpc/vdsmapi-schema.json
+++ b/vdsm/rpc/vdsmapi-schema.json
@@ -3722,6 +3722,53 @@
   'returns': ['ExternalVmInfo']}
 
 ##
+# @Host.convertVmFromExternalSystem:
+#
+# Convert VM from external source to data domain
+#
+# @uri:          libvirt connection uri
+#
+# @username:     libvirt connection user name
+#
+# @password:     libvirt connection password
+#
+# @vmProperties: information of the VM such as name, id etc
+#
+# @jobId:        Assign a UUID to this operation which can be used
+#                to identify it in @VmStats
+#
+# Since: 4.17.0
+##
+{'command': {'class': 'Host', 'name': 'convertVmFromExternalSystem'},
+  'data': {'uri': 'str', 'username': 'str', 'password': 'str',
+  'vmProperties': 'ExternalVmInfo', 'jobId': 'UUID'}}
+
+##
+# @Host.getConvertedVm:
+#
+# Return OVF string that created via convertVmFromExternalSystem call
+#
+# @jobId:  UUID to identify the job that created in convertVmFromExternalSystem
+#
+# Since: 4.17.0
+##
+{'command': {'class': 'Host', 'name': 'getConvertedVm'},
+  'data': {'jobId': 'UUID'}
+  'returns': 'str'}
+
+##
+# @Host.abortV2VJob:
+#
+# Abort V2V importing process
+#
+# @jobId:  UUID to identify the job that created in convertVmFromExternalSystem
+#
+# Since: 4.17.0
+##
+{'command': {'class': 'Host', 'name': 'abortV2VJob'},
+  'data': {'jobId': 'UUID'}}
+
+##
 # @VMFullInfo:
 #
 # Full information about VM.
diff --git a/vdsm/sudoers.vdsm.in b/vdsm/sudoers.vdsm.in
index da88ff2..52b0eb7 100644
--- a/vdsm/sudoers.vdsm.in
+++ b/vdsm/sudoers.vdsm.in
@@ -25,6 +25,7 @@
     @CP_PATH@ /etc/multipath.conf *, \
     @CP_PATH@ * /etc/multipath.conf, \
     @MULTIPATH_PATH@, \
+    @PGREP_PATH@, \
     @SETSID_PATH@ @IONICE_PATH@ -c ? -n ? @SU_PATH@ vdsm -s /bin/sh -c 
/usr/libexec/vdsm/spmprotect.sh*, \
     @SERVICE_PATH@ vdsmd *, \
     @REBOOT_PATH@ -f
diff --git a/vdsm/v2v.py b/vdsm/v2v.py
index 46596c9..bd3b2cf 100644
--- a/vdsm/v2v.py
+++ b/vdsm/v2v.py
@@ -18,14 +18,25 @@
 #
 from contextlib import closing
 import logging
+import os
+import re
+import subprocess
+import threading
 import xml.etree.ElementTree as ET
 
 import libvirt
 
+from vdsm.constants import EXT_VIRT_V2V, EXT_PGREP, EXT_KILL
+from vdsm.constants import P_VDSM_RUN
 from vdsm.define import errCode
 from vdsm import libvirtconnection
+from vdsm.utils import execCmd
 
 import caps
+
+
+_jobs = {}
+_P_V2V_DIR = os.path.join(P_VDSM_RUN, 'v2v')
 
 
 class InvalidVMConfiguration(ValueError):
@@ -64,6 +75,233 @@
         return ret
 
 
+def convert(uri, username, password, vmProperties, jobId, cif):
+    global _jobs
+    t = _import_vm_thread(uri, username, password, vmProperties, jobId, cif)
+    _jobs[jobId] = t
+    t.start()
+
+
+def get_converted_vm(jobId):
+    global _jobs
+    if jobId not in _jobs.keys():
+        return errCode['invalidV2VJob']
+
+    file_name = os.path.join(_P_V2V_DIR, "%s.ovf" % jobId)
+    if not os.path.exists(file_name):
+        return errCode['invalidV2VOvfPath']
+
+    f = open(file_name, 'r')
+    with closing(f):
+        ovf = '\n'.join(f.readlines())
+    os.remove(file_name)
+
+    _jobs.pop(jobId)
+    if len(_jobs) > 0:
+        kill_zombie_jobs()
+    return ovf
+
+
+def kill_zombie_jobs():
+    ret, out, err = execCmd([EXT_PGREP, 'virt-v2v'], sudo=True)
+    if len(err) > 0:
+        logging.error('error while trying to grep virt-v2v processes: %r', err)
+        return
+
+    for pid in out:
+        ret, out, err = execCmd([EXT_KILL, '-9', pid], sudo=True)
+        if len(err) > 0:
+            logging.error('error while trying to kill virt-v2v process %r: %r',
+                          pid, err)
+
+    ret, out, err = execCmd([EXT_PGREP, 'virt-v2v'], sudo=True)
+    if len(err) > 0:
+        logging.error('error while trying to grep virt-v2v'
+                      ' processes (second time): %r', err)
+        return
+    if out != '':
+        logging.error('cannot kill the following virt-v2v processes: %r', out)
+
+
+def abort_job(jobId):
+    global _jobs
+    if jobId not in _jobs.keys():
+        return errCode['invalidV2VJob']
+    t = _jobs[jobId]
+    t.abort_job()
+
+
+def jobs():
+    global _jobs
+    ret = {}
+    for jobId in _jobs.keys():
+        ret[jobId] = {}
+        ret[jobId]['status'] = _jobs[jobId].status()
+        ret[jobId]['progress'] = _jobs[jobId].progress()
+    return ret
+
+
+class _import_vm_thread(threading.Thread):
+    """
+    Importing VM to data domain
+    Running virt-v2v command
+    Parsing the stdout
+    """
+    def __init__(self, uri, username, password, vmProperties, jobId, cif):
+        threading.Thread.__init__(self)
+        self.setDaemon(True)
+        self._uri = uri
+        self._username = username
+        self._password = password
+        self._vmProperties = vmProperties
+        self._jobId = jobId
+        self._cif = cif
+        self._status = 'initializing'
+        self._progress = 0
+        self._abort = False
+
+    def job_id(self):
+        return self._jobId
+
+    def status(self):
+        return self._status
+
+    def progress(self):
+        return self._progress
+
+    def run(self):
+        try:
+            self._import_vm()
+        except Exception as ex:
+            logging.info('error in importing external vm: %r', ex)
+        finally:
+            self._delete_passwd_file()
+
+    def _import_vm(self):
+        self._create_passwd_file()
+        self._status = 'starting'
+        cmd = ('%s -ic %s -o vdsm -of qcow2 %s --password-file %s ' +
+               '--vdsm-vm-uuid %s --vdsm-ovf-output %s --machine-readable ' +
+               '-os %s %s ') % (EXT_VIRT_V2V, self._uri,
+                                self._generate_disk_parameters(),
+                                self._pass_file, self._jobId, _P_V2V_DIR,
+                                self._get_domain_path(),
+                                self._vmProperties['vmName'])
+
+        logging.info('import vm, (jobId %s) started, cmd: %r', self._jobId,
+                     cmd)
+        proc = subprocess.Popen(cmd, env={'LIBGUESTFS_BACKEND': 'direct'},
+                                shell=True,
+                                stdout=subprocess.PIPE,
+                                stderr=subprocess.PIPE)
+
+        self._handle_process_input(proc)
+
+        if self._abort:
+            if proc.returncode is None:
+                proc.terminate()
+            return
+
+        logging.info('import vm waiting for exit code')
+        proc.wait()
+        logging.info('import vm ended, exit status %d', proc.returncode)
+        if proc.returncode != 0:
+            self._status = 'ended with error: %r' % proc.returncode
+        else:
+            self._status = 'done'
+
+    def _handle_process_input(self, proc):
+        re_copy_disk = re.compile(r'.*(Copying disk \d+/\d+).*')
+        while not self._abort:
+            line = proc.stdout.readline()
+
+            if line == '':
+                self._handle_process_errors(proc)
+                break
+
+            if 'Copying disk' in line:
+                m = re_copy_disk.match(line)
+                if m is not None:
+                    self._status = m.groups()[0]
+                    self._progress = 0
+                    logging.info('convert status: %s', self._status)
+                else:
+                    self._abort_msg("error parsing 'Copying disk' section: %s"
+                                    % line)
+                    break
+                self._handle_progress_input(proc, line)
+
+    def _handle_progress_input(self, proc, line):
+        re_progress = re.compile(r'\s+\((\d+).*')
+        while not self._abort:
+            buf = proc.stdout.read(1)
+            while '\r' not in buf:
+                buf += proc.stdout.read(1)
+            m = re_progress.match(buf)
+            if m is not None:
+                try:
+                    self._progress = int(m.groups()[0])
+                    if self._progress == 100:
+                        break
+                except ValueError:
+                    self._abort_msg('error converting progress: %r' % line)
+                    break
+            else:
+                self._abort_msg('error parsing progress: %r' % line)
+                break
+
+    def _handle_process_errors(self, proc):
+        error = proc.stderr.readlines()
+        if len(error) > 0:
+            self._abort = True
+            msg = 'error process ended with errors: %s' % error
+            self._status = msg
+            logging.error(msg)
+
+    def _create_passwd_file(self):
+        self._pass_file = os.path.join(_P_V2V_DIR, "%s.tmp" % self._jobId)
+        f = os.fdopen(os.open(self._pass_file,
+                              os.O_WRONLY | os.O_CREAT, 0600), 'w')
+        with closing(f):
+            f.write(self._password)
+
+    def _delete_passwd_file(self):
+        if os.path.exists(self._pass_file):
+            os.remove(self._pass_file)
+
+    def abort_job(self):
+        self._abort = True
+        self._status = 'abort'
+        logging.info('aborting job id: %r', self._jobId)
+
+    def _abort_msg(self, msg):
+        self._status = msg
+        logging.error(msg)
+        self._abort = True
+
+    def _generate_disk_parameters(self):
+        images = ''
+        disks = self._vmProperties['disks']
+        for disk in disks:
+            if 'imageID' in disk:
+                images = '%s --vdsm-image-uuid %s ' % (images, disk['imageID'])
+            if 'volumeID' in disk:
+                images = '%s --vdsm-vol-uuid %s ' % (images, disk['volumeID'])
+        return images
+
+    def _get_domain_path(self):
+        disk = self._vmProperties['disks'][0]
+        drive = {'device': 'disk',
+                 'poolID': self._vmProperties['poolID'],
+                 'domainID': self._vmProperties['domainID'],
+                 'imageID': disk['imageID'],
+                 'volumeID': disk['volumeID']}
+
+        volPath = self._cif.getInstance().prepareVolumePath(drive)
+        volPath = volPath.split('/images/')[0]
+        return volPath
+
+
 def _mem_to_mib(size, unit):
     lunit = unit.lower()
     if lunit in ('bytes', 'b'):
diff --git a/vdsm/virt/sampling.py b/vdsm/virt/sampling.py
index 87d9bed..a92d638 100644
--- a/vdsm/virt/sampling.py
+++ b/vdsm/virt/sampling.py
@@ -34,6 +34,7 @@
 from vdsm import ipwrapper
 from vdsm import netinfo
 from vdsm import utils
+import v2v
 
 import caps
 
@@ -604,6 +605,8 @@
 
         stats['numaNodeMemFree'] = hs1.numaNodeMem.nodesMemSample
         stats['cpuStatistics'] = self._getCpuCoresStats()
+
+        stats['v2vJobs'] = v2v.jobs()
         return stats
 
     def _getCpuCoresStats(self):


-- 
To view, visit http://gerrit.ovirt.org/37509
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I34bd86d5a87ea8c42113c4a732f87ddd4ceab9ea
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Shahar Havivi <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to