With this change, exports of several disks will occur in parallel. Error handling has also been improved.
Signed-off-by: Michael Hanselmann <[email protected]> --- lib/cmdlib.py | 216 +++++++++++++++++++++++++++++++++++---------------------- 1 files changed, 132 insertions(+), 84 deletions(-) diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 94267b5..a8423f2 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -9196,6 +9196,72 @@ class LUExportInstance(LogicalUnit): raise errors.OpPrereqError("Export not supported for instances with" " file-based disks", errors.ECODE_INVAL) + def _CreateSnapshots(self, feedback_fn): + instance = self.instance + src_node = instance.primary_node + + vgname = self.cfg.GetVGName() + + snap_disks = [] + + for idx, disk in enumerate(instance.disks): + feedback_fn("Creating a snapshot of disk/%s on node %s" % + (idx, src_node)) + + # result.payload will be a snapshot of an lvm leaf of the one we + # passed + result = self.rpc.call_blockdev_snapshot(src_node, disk) + msg = result.fail_msg + if msg: + self.LogWarning("Could not snapshot disk/%s on node %s: %s", + idx, src_node, msg) + snap_disks.append(False) + else: + disk_id = (vgname, result.payload) + new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, + logical_id=disk_id, physical_id=disk_id, + iv_name=disk.iv_name) + snap_disks.append(new_dev) + + return snap_disks + + def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index): + disk = snap_disks[disk_index] + if disk: + src_node = self.instance.primary_node + + feedback_fn("Removing snapshot of disk/%s on node %s" % + (disk_index, src_node)) + + result = self.rpc.call_blockdev_remove(src_node, disk) + if not result.fail_msg: + return True + + self.LogWarning("Could not remove snapshot for disk/%d from node" + " %s: %s", disk_index, src_node, result.fail_msg) + + return False + + def _CleanupExports(self, feedback_fn): + nodelist = self.cfg.GetNodeList() + nodelist.remove(self.dst_node.name) + + # on one-node clusters nodelist will be empty after the removal + # if we proceed the backup would be removed because OpQueryExports + # substitutes an empty list with the full cluster node list. + iname = self.instance.name + if nodelist: + feedback_fn("Removing old exports for instance %s" % iname) + exportlist = self.rpc.call_export_list(nodelist) + for node in exportlist: + if exportlist[node].fail_msg: + continue + if iname in exportlist[node].payload: + msg = self.rpc.call_export_remove(node, iname).fail_msg + if msg: + self.LogWarning("Could not remove older export for instance %s" + " on node %s: %s", iname, node, msg) + def Exec(self, feedback_fn): """Export an instance to an image in the cluster. @@ -9213,10 +9279,6 @@ class LUExportInstance(LogicalUnit): result.Raise("Could not shutdown instance %s on" " node %s" % (instance.name, src_node)) - vgname = self.cfg.GetVGName() - - snap_disks = [] - # set the disks ID correctly since call_instance_start needs the # correct drbd minor to create the symlinks for disk in instance.disks: @@ -9230,71 +9292,74 @@ class LUExportInstance(LogicalUnit): _StartInstanceDisks(self, instance, None) try: - # per-disk results - dresults = [] + removed_snaps = [False] * len(instance.disks) + + snap_disks = None try: - for idx, disk in enumerate(instance.disks): - feedback_fn("Creating a snapshot of disk/%s on node %s" % - (idx, src_node)) - - # result.payload will be a snapshot of an lvm leaf of the one we - # passed - result = self.rpc.call_blockdev_snapshot(src_node, disk) - msg = result.fail_msg - if msg: - self.LogWarning("Could not snapshot disk/%s on node %s: %s", - idx, src_node, msg) - snap_disks.append(False) - else: - disk_id = (vgname, result.payload) - new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, - logical_id=disk_id, physical_id=disk_id, - iv_name=disk.iv_name) - snap_disks.append(new_dev) + try: + snap_disks = self._CreateSnapshots(feedback_fn) + finally: + if (self.op.shutdown and instance.admin_up + and not self.remove_instance): + feedback_fn("Starting instance %s" % instance.name) + result = self.rpc.call_instance_start(src_node, instance, + None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance: %s" % msg) + + assert len(snap_disks) == len(instance.disks) + assert len(removed_snaps) == len(instance.disks) + + def _TransferFinished(idx): + if self._RemoveSnapshot(feedback_fn, snap_disks, idx): + removed_snaps[idx] = True + + transfers = [] + + for idx, dev in enumerate(snap_disks): + if not dev: + transfers.append(None) + continue - finally: - if self.op.shutdown and instance.admin_up and not self.remove_instance: - feedback_fn("Starting instance %s" % instance.name) - result = self.rpc.call_instance_start(src_node, instance, None, None) - msg = result.fail_msg - if msg: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Could not start instance: %s" % msg) - - # TODO: check for size - - cluster_name = self.cfg.GetClusterName() - for idx, dev in enumerate(snap_disks): - feedback_fn("Exporting snapshot %s from %s to %s" % - (idx, src_node, dst_node.name)) - if dev: - # FIXME: pass debug from opcode to backend - result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, - instance, cluster_name, - idx, self.op.debug_level) - msg = result.fail_msg - if msg: - self.LogWarning("Could not export disk/%s from node %s to" - " node %s: %s", idx, src_node, dst_node.name, msg) - dresults.append(False) - else: - dresults.append(True) - msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg - if msg: - self.LogWarning("Could not remove snapshot for disk/%d from node" - " %s: %s", idx, src_node, msg) - else: - dresults.append(False) + feedback_fn("Exporting snapshot %s from %s to %s" % + (idx, src_node, dst_node.name)) - feedback_fn("Finalizing export on %s" % dst_node.name) - result = self.rpc.call_finalize_export(dst_node.name, instance, - snap_disks) - fin_resu = True - msg = result.fail_msg - if msg: - self.LogWarning("Could not finalize export for instance %s" - " on node %s: %s", instance.name, dst_node.name, msg) - fin_resu = False + path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name, + dev.physical_id[1]) + + transfers.append(("snapshot/%s" % idx, + constants.IEIO_SCRIPT, (dev, idx), + constants.IEIO_FILE, (path, ), + _TransferFinished, idx)) + + # Actually export data + dresults = TransferInstanceData(self, feedback_fn, + src_node, dst_node.name, + dst_node.secondary_ip, + instance, transfers) + + assert len(dresults) == len(instance.disks) + + # Check for backwards compatibility + assert utils.all([isinstance(i, bool) for i in dresults]), \ + "Not all results are boolean: %r" % dresults + + feedback_fn("Finalizing export on %s" % dst_node.name) + result = self.rpc.call_finalize_export(dst_node.name, instance, + snap_disks) + msg = result.fail_msg + fin_resu = not msg + if msg: + self.LogWarning("Could not finalize export for instance %s" + " on node %s: %s", instance.name, dst_node.name, msg) + + finally: + # Remove all snapshots + for idx, removed in enumerate(removed_snaps): + if not removed: + self._RemoveSnapshot(feedback_fn, snap_disks, idx) finally: if activate_disks: @@ -9306,24 +9371,7 @@ class LUExportInstance(LogicalUnit): feedback_fn("Removing instance %s" % instance.name) _RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures) - nodelist = self.cfg.GetNodeList() - nodelist.remove(dst_node.name) - - # on one-node clusters nodelist will be empty after the removal - # if we proceed the backup would be removed because OpQueryExports - # substitutes an empty list with the full cluster node list. - iname = instance.name - if nodelist: - feedback_fn("Removing old exports for instance %s" % iname) - exportlist = self.rpc.call_export_list(nodelist) - for node in exportlist: - if exportlist[node].fail_msg: - continue - if iname in exportlist[node].payload: - msg = self.rpc.call_export_remove(node, iname).fail_msg - if msg: - self.LogWarning("Could not remove older export for instance %s" - " on node %s: %s", iname, node, msg) + self._CleanupExports(feedback_fn) return fin_resu, dresults -- 1.7.0.4 -- To unsubscribe, reply using "remove me" as the subject.
