Nir Soffer has posted comments on this change. Change subject: v2v: Convert VM from external source to Data Domain ......................................................................
Patch Set 9: (32 comments) Reviewed only v2v, will review the rest when its done. https://gerrit.ovirt.org/#/c/37509/9/vdsm/v2v.py File vdsm/v2v.py: Line 134: def abort_job(jobId): Line 135: global _jobs Line 136: with _lock: Line 137: if jobId not in _jobs: Line 138: return errCode['invalidV2VJob'] This code is repeated: with lock: if no such job: return invalid job error Lets helpers for adding, getting and removing jobs: def _get_job(id): with _lock: if id not in _jobs: raise NoSuchJob(id) return _jobs[id] def _add_job(id, job): with _lock: if id in _jobs: raise JobExists(id) _jobs[id] = job def _remove_job(id): with _lock: if id not in _jobs: raise NoSuchJob(id) del _jobs[id] The upper layer should catch these errors and return the proper error code - for example: try: whatever... except V2VError as e: return response.error(code=e.code, message=e.message) See https://gerrit.ovirt.org/37912 This may need too much work for this patch, so consider doing this later. Line 139: t = _jobs[jobId] Line 140: t.abort_job() Line 141: del _jobs[jobId] Line 142: Line 135: global _jobs Line 136: with _lock: Line 137: if jobId not in _jobs: Line 138: return errCode['invalidV2VJob'] Line 139: t = _jobs[jobId] Please rename "t" to "job" Line 140: t.abort_job() Line 141: del _jobs[jobId] Line 142: Line 143: return {'status': doneCode} Line 151: ret[jobId] = { Line 152: 'status': jobValue.status, Line 153: 'status_msg': jobValue.status_msg, Line 154: 'progress': jobValue.progress Line 155: } You are holding the lock again more time than needed. with _lock: jobs = _jobs.items() for jobId, job in items: ... Line 156: return ret Line 157: Line 158: Line 159: class ImportVm(): Line 167: process (ie job) is via getVdsStats() with the fields progress and status. Line 168: progress is a number which represent percentage of a single disk copy, Line 169: status is a way to feedback information on the job (init, error etc) Line 170: when the job is done a call to get_converted_vm will return the VMs OVF and Line 171: delete the job. Nice - but I think this should be in the module level, not this class. Line 172: """ Line 173: def __init__(self, uri, username, password, vmProperties, jobId, cif): Line 174: self._thread = threading.Thread(target=self._run) Line 175: self._thread.daemon = True Line 178: self._password = password Line 179: self._vmProperties = vmProperties Line 180: self._job_id = jobId Line 181: self._cif = cif Line 182: self._status = _STATUS_INITIALIZING The job is not really initializing anything in this state - right? Maybe - NOT_STARTED Line 183: self._progress = 0 Line 184: self._abort = False Line 185: self._status_msg = '' Line 186: Line 179: self._vmProperties = vmProperties Line 180: self._job_id = jobId Line 181: self._cif = cif Line 182: self._status = _STATUS_INITIALIZING Line 183: self._progress = 0 This is not a good name - the user expect this to be the ImportVm progress, but this is actually the copy disk progress, and we may have multiple disk. Lets call this _copy_progress? Line 184: self._abort = False Line 185: self._status_msg = '' Line 186: Line 187: def start(self): Line 180: self._job_id = jobId Line 181: self._cif = cif Line 182: self._status = _STATUS_INITIALIZING Line 183: self._progress = 0 Line 184: self._abort = False I would call this _aborted Line 185: self._status_msg = '' Line 186: Line 187: def start(self): Line 188: self._thread.start() Line 184: self._abort = False Line 185: self._status_msg = '' Line 186: Line 187: def start(self): Line 188: self._thread.start() state should be set to _STATUS_STARTING or _STATUS_STARTED here Line 189: Line 190: @property Line 191: def job_id(self): Line 192: return self._job_id Line 188: self._thread.start() Line 189: Line 190: @property Line 191: def job_id(self): Line 192: return self._job_id Calling job.job_id() feels too repetitive. Does job have multiple ids? It should be: job.id job.status Line 193: Line 194: @property Line 195: def status(self): Line 196: return self._status Line 195: def status(self): Line 196: return self._status Line 197: Line 198: @property Line 199: def status_msg(self): description? Line 200: return self._status_msg Line 201: Line 202: @property Line 203: def progress(self): Line 224: cmd.extend(self._generate_disk_parameters()) Line 225: cmd.extend(['--password-file', self._pass_file, '--vdsm-vm-uuid', Line 226: self._job_id, '--vdsm-ovf-output', _P_V2V_DIR, Line 227: '--machine-readable', '-os', self._get_domain_path(), Line 228: self._vmProperties['vmName']]) Please use this format: cmd.extend([ "--unary-option", "--binary-option", "and its value", ... ]) Line 229: Line 230: logging.info('import vm, (jobId %s) started, cmd: %r', self._job_id, Line 231: cmd) Line 232: Line 226: self._job_id, '--vdsm-ovf-output', _P_V2V_DIR, Line 227: '--machine-readable', '-os', self._get_domain_path(), Line 228: self._vmProperties['vmName']]) Line 229: Line 230: logging.info('import vm, (jobId %s) started, cmd: %r', self._job_id, - import -> Import - cmd: %s is better. Line 231: cmd) Line 232: Line 233: proc = execCmd(cmd, sync=False, deathSignal=15, Line 234: env={'LIBGUESTFS_BACKEND': 'direct'}) Line 233: proc = execCmd(cmd, sync=False, deathSignal=15, Line 234: env={'LIBGUESTFS_BACKEND': 'direct'}) Line 235: Line 236: proc.blocking = True Line 237: self._handle_process_output(proc) You are using _handle for two different things - watching process output and handling process exit code. Lets make it more clear - _watch_process_output, _handle_errors? Line 238: Line 239: if self._abort: Line 240: if proc.returncode is None: Line 241: proc.terminate() Line 239: if self._abort: Line 240: if proc.returncode is None: Line 241: proc.terminate() Line 242: self._status = _STATUS_ABORT Line 243: elif proc.returncode != 0: Lets make this more consistent and set self._status here in all cases. Line 244: self._handle_process_errors(proc) Line 245: else: Line 246: self._status = _STATUS_DONE Line 247: Line 245: else: Line 246: self._status = _STATUS_DONE Line 247: Line 248: def _handle_process_output(self, proc): Line 249: re_copy_disk = re.compile(r'.*(Copying disk \d+/\d+).*') Better move this to class constant. Also the name is strange, typically this will be called copy_disk_re. Line 250: while not self._abort and proc.returncode is None: Line 251: line = proc.stdout.readline() Line 252: Line 253: if 'Copying disk' in line: Line 258: self._progress = 0 Line 259: logging.info('convert status: %s', self._status_msg) Line 260: else: Line 261: self._abort_msg("error parsing 'Copying disk' section: %s" Line 262: % line) Raise an error here, and abort the process in _run. Also you should handle the error first, simplifying the normal flow: for line in stdout: m = copy_disk_re.match(line) if m is None: raise ... start watching progress of a new disk Line 263: break Line 264: self._handle_progress(proc, line) Line 265: Line 266: def _handle_progress(self, proc, line): Line 260: else: Line 261: self._abort_msg("error parsing 'Copying disk' section: %s" Line 262: % line) Line 263: break Line 264: self._handle_progress(proc, line) Lets be more specific and call this _watch_copy_progress() Line 265: Line 266: def _handle_progress(self, proc, line): Line 267: re_progress = re.compile(r'\s+\((\d+).*') Line 268: while not self._abort: Line 262: % line) Line 263: break Line 264: self._handle_progress(proc, line) Line 265: Line 266: def _handle_progress(self, proc, line): We don't need line here - you use it only for reporting errors which should done here anyway. This feels more complected than needed, but maybe I don't understand the output of the tool you are trying to parse. Can you point us to an example output of the v2v tool? Line 267: re_progress = re.compile(r'\s+\((\d+).*') Line 268: while not self._abort: Line 269: buf = proc.stdout.read(1) Line 270: while '\r' not in buf: Line 265: Line 266: def _handle_progress(self, proc, line): Line 267: re_progress = re.compile(r'\s+\((\d+).*') Line 268: while not self._abort: Line 269: buf = proc.stdout.read(1) You can start with empty buffer: buf = "" Line 270: while '\r' not in buf: Line 271: buf += proc.stdout.read(1) Line 272: m = re_progress.match(buf) Line 273: if m is not None: Line 273: if m is not None: Line 274: try: Line 275: self._progress = int(m.groups()[0]) Line 276: if self._progress == 100: Line 277: break This is little fragile - is it possible that we don't get this value sometimes? for example, read 98% 99% and then the copy finish without printing 100%? Seeing the output from v2v will make this clear. Line 278: except ValueError: Line 279: self._abort_msg('error converting progress: %r' % line) Line 280: raise Line 281: else: Line 279: self._abort_msg('error converting progress: %r' % line) Line 280: raise Line 281: else: Line 282: self._abort_msg('error parsing progress: %r' % line) Line 283: raise Handle the error first and raise instead of setting the abort flag. Line 284: Line 285: def _handle_process_errors(self, proc): Line 286: self._status = _STATUS_ERROR Line 287: error = proc.stderr.readlines() Line 289: msg = 'error process ended with errors: %s' % error Line 290: self._status_msg = msg Line 291: logging.error(msg) Line 292: Line 293: def _create_passwd_file(self): Can we pipe this into v2v instead of keeping plaintext passwords on disk during the import? Line 294: if not os.path.exists(_P_V2V_DIR): Line 295: try: Line 296: os.mkdir(_P_V2V_DIR, 0700) Line 297: except OSError as e: Line 290: self._status_msg = msg Line 291: logging.error(msg) Line 292: Line 293: def _create_passwd_file(self): Line 294: if not os.path.exists(_P_V2V_DIR): No need to check if the directory exists, just create it and handle EEXIST. Line 295: try: Line 296: os.mkdir(_P_V2V_DIR, 0700) Line 297: except OSError as e: Line 298: if e.errno != os.errno.EEXIST: Line 292: Line 293: def _create_passwd_file(self): Line 294: if not os.path.exists(_P_V2V_DIR): Line 295: try: Line 296: os.mkdir(_P_V2V_DIR, 0700) Please use octal literal - 0o700 Line 297: except OSError as e: Line 298: if e.errno != os.errno.EEXIST: Line 299: raise Line 300: Line 295: try: Line 296: os.mkdir(_P_V2V_DIR, 0700) Line 297: except OSError as e: Line 298: if e.errno != os.errno.EEXIST: Line 299: raise But if the directory exists, you must use check that it has correct permissions, or chmod it - so: os.chmod(...) Line 300: Line 301: self._pass_file = os.path.join(_P_V2V_DIR, "%s.tmp" % self._job_id) Line 302: f = os.fdopen(os.open(self._pass_file, Line 303: os.O_WRONLY | os.O_CREAT, 0600), 'w') Line 305: f.write(self._password) Line 306: Line 307: def _delete_passwd_file(self): Line 308: if os.path.exists(self._pass_file): Line 309: os.remove(self._pass_file) This is racy, use: try: os.remove(path) except OSError as e: if e.errno != errno.ENOENT: raise Line 310: Line 311: def abort_job(self): Line 312: self._abort = True Line 313: self._status = _STATUS_ABORT Line 307: def _delete_passwd_file(self): Line 308: if os.path.exists(self._pass_file): Line 309: os.remove(self._pass_file) Line 310: Line 311: def abort_job(self): Why not call this abort()? Line 312: self._abort = True Line 313: self._status = _STATUS_ABORT Line 314: logging.info('aborting job id: %r', self._job_id) Line 315: Line 309: os.remove(self._pass_file) Line 310: Line 311: def abort_job(self): Line 312: self._abort = True Line 313: self._status = _STATUS_ABORT Lets separate the setting self._status from signaling an abort. Set the flag, here, the status will be set later in _import() Also, why not terminate the process here? I don't see a need to wait for the next line. You can do: self._aborted = True if self._proc.returncode is None: self._proc.terminate() The code watching the output would stop because stdout will be closed, and it can check self._aborted to detect an abort. Line 314: logging.info('aborting job id: %r', self._job_id) Line 315: Line 316: def _abort_msg(self, msg): Line 317: self._status = _STATUS_ERROR Line 316: def _abort_msg(self, msg): Line 317: self._status = _STATUS_ERROR Line 318: self._status_msg = msg Line 319: logging.error(msg) Line 320: self._abort = True This looks strange - moving the error status but signaling an abort - I guess that this can be solved without this by checking the correct state in the place calling this. Line 321: Line 322: def _generate_disk_parameters(self): Line 323: images = [] Line 324: for disk in self._vmProperties['disks']: Line 329: images.append('--vdsm-vol-uuid') Line 330: images.append(disk['volumeID']) Line 331: return images Line 332: Line 333: def _get_domain_path(self): Why do you need to prepare multiple images for getting the domain path? Please do not hide side effects like preparing images in what looks like a simple getter. I guess that calling it prepareVolumes would solve the issue. Line 334: ''' method prepare the images and return storage domain mounted path Line 335: since all images are in the same domain we return arbitrary image Line 336: for the return path (res['path']) ''' Line 337: for disk in self._vmProperties['disks']: Line 344: def _teardownVolumes(self): Line 345: try: Line 346: drive = {'device': 'disk', Line 347: 'poolID': self._vmProperties['poolID'], Line 348: 'domainID': self._vmProperties['domainID']} Do you expect that self._vmProperties do not have a poolID or domainID? I think you should validate that they have when creating the object, and then you can leave this code outside the try-except block. Line 349: for disk in self._vmProperties['disks']: Line 350: drive['imageID'] = disk['imageID'] Line 351: drive['volumeID'] = disk['volumeID'] Line 352: self._cif.teardownVolumePath(drive) Line 350: drive['imageID'] = disk['imageID'] Line 351: drive['volumeID'] = disk['volumeID'] Line 352: self._cif.teardownVolumePath(drive) Line 353: except Exception as e: Line 354: logging.error('error while trying to teardownVolumePath: %r', e) You have multiple images to tear down, but this will leave prepared images behind if one operation raised. Also you hide the exception details although you don't have a clue why it failed. Must be: for disk in disks: try: teardown disk except Exception: log.exception(...) Line 355: Line 356: Line 357: def _mem_to_mib(size, unit): Line 358: lunit = unit.lower() -- To view, visit https://gerrit.ovirt.org/37509 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: I34bd86d5a87ea8c42113c4a732f87ddd4ceab9ea Gerrit-PatchSet: 9 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Shahar Havivi <[email protected]> Gerrit-Reviewer: Dan Kenigsberg <[email protected]> Gerrit-Reviewer: Federico Simoncelli <[email protected]> Gerrit-Reviewer: Francesco Romani <[email protected]> Gerrit-Reviewer: Michal Skrivanek <[email protected]> Gerrit-Reviewer: Nir Soffer <[email protected]> Gerrit-Reviewer: Piotr Kliczewski <[email protected]> Gerrit-Reviewer: Saggi Mizrahi <[email protected]> Gerrit-Reviewer: Shahar Havivi <[email protected]> Gerrit-Reviewer: Yaniv Bronhaim <[email protected]> Gerrit-Reviewer: [email protected] Gerrit-Reviewer: oVirt Jenkins CI Server Gerrit-HasComments: Yes _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
