With this change, exports of several disks will occur in parallel. Error
handling has also been improved.

Signed-off-by: Michael Hanselmann <[email protected]>
---
 lib/cmdlib.py |  216 +++++++++++++++++++++++++++++++++++----------------------
 1 files changed, 132 insertions(+), 84 deletions(-)

diff --git a/lib/cmdlib.py b/lib/cmdlib.py
index 94267b5..a8423f2 100644
--- a/lib/cmdlib.py
+++ b/lib/cmdlib.py
@@ -9196,6 +9196,72 @@ class LUExportInstance(LogicalUnit):
         raise errors.OpPrereqError("Export not supported for instances with"
                                    " file-based disks", errors.ECODE_INVAL)
 
+  def _CreateSnapshots(self, feedback_fn):
+    instance = self.instance
+    src_node = instance.primary_node
+
+    vgname = self.cfg.GetVGName()
+
+    snap_disks = []
+
+    for idx, disk in enumerate(instance.disks):
+      feedback_fn("Creating a snapshot of disk/%s on node %s" %
+                  (idx, src_node))
+
+      # result.payload will be a snapshot of an lvm leaf of the one we
+      # passed
+      result = self.rpc.call_blockdev_snapshot(src_node, disk)
+      msg = result.fail_msg
+      if msg:
+        self.LogWarning("Could not snapshot disk/%s on node %s: %s",
+                        idx, src_node, msg)
+        snap_disks.append(False)
+      else:
+        disk_id = (vgname, result.payload)
+        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
+                               logical_id=disk_id, physical_id=disk_id,
+                               iv_name=disk.iv_name)
+        snap_disks.append(new_dev)
+
+    return snap_disks
+
+  def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index):
+    disk = snap_disks[disk_index]
+    if disk:
+      src_node = self.instance.primary_node
+
+      feedback_fn("Removing snapshot of disk/%s on node %s" %
+                  (disk_index, src_node))
+
+      result = self.rpc.call_blockdev_remove(src_node, disk)
+      if not result.fail_msg:
+        return True
+
+      self.LogWarning("Could not remove snapshot for disk/%d from node"
+                      " %s: %s", disk_index, src_node, result.fail_msg)
+
+    return False
+
+  def _CleanupExports(self, feedback_fn):
+    nodelist = self.cfg.GetNodeList()
+    nodelist.remove(self.dst_node.name)
+
+    # on one-node clusters nodelist will be empty after the removal
+    # if we proceed the backup would be removed because OpQueryExports
+    # substitutes an empty list with the full cluster node list.
+    iname = self.instance.name
+    if nodelist:
+      feedback_fn("Removing old exports for instance %s" % iname)
+      exportlist = self.rpc.call_export_list(nodelist)
+      for node in exportlist:
+        if exportlist[node].fail_msg:
+          continue
+        if iname in exportlist[node].payload:
+          msg = self.rpc.call_export_remove(node, iname).fail_msg
+          if msg:
+            self.LogWarning("Could not remove older export for instance %s"
+                            " on node %s: %s", iname, node, msg)
+
   def Exec(self, feedback_fn):
     """Export an instance to an image in the cluster.
 
@@ -9213,10 +9279,6 @@ class LUExportInstance(LogicalUnit):
       result.Raise("Could not shutdown instance %s on"
                    " node %s" % (instance.name, src_node))
 
-    vgname = self.cfg.GetVGName()
-
-    snap_disks = []
-
     # set the disks ID correctly since call_instance_start needs the
     # correct drbd minor to create the symlinks
     for disk in instance.disks:
@@ -9230,71 +9292,74 @@ class LUExportInstance(LogicalUnit):
       _StartInstanceDisks(self, instance, None)
 
     try:
-      # per-disk results
-      dresults = []
+      removed_snaps = [False] * len(instance.disks)
+
+      snap_disks = None
       try:
-        for idx, disk in enumerate(instance.disks):
-          feedback_fn("Creating a snapshot of disk/%s on node %s" %
-                      (idx, src_node))
-
-          # result.payload will be a snapshot of an lvm leaf of the one we
-          # passed
-          result = self.rpc.call_blockdev_snapshot(src_node, disk)
-          msg = result.fail_msg
-          if msg:
-            self.LogWarning("Could not snapshot disk/%s on node %s: %s",
-                            idx, src_node, msg)
-            snap_disks.append(False)
-          else:
-            disk_id = (vgname, result.payload)
-            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
-                                   logical_id=disk_id, physical_id=disk_id,
-                                   iv_name=disk.iv_name)
-            snap_disks.append(new_dev)
+        try:
+          snap_disks = self._CreateSnapshots(feedback_fn)
+        finally:
+          if (self.op.shutdown and instance.admin_up
+              and not self.remove_instance):
+            feedback_fn("Starting instance %s" % instance.name)
+            result = self.rpc.call_instance_start(src_node, instance,
+                                                  None, None)
+            msg = result.fail_msg
+            if msg:
+              _ShutdownInstanceDisks(self, instance)
+              raise errors.OpExecError("Could not start instance: %s" % msg)
+
+        assert len(snap_disks) == len(instance.disks)
+        assert len(removed_snaps) == len(instance.disks)
+
+        def _TransferFinished(idx):
+          if self._RemoveSnapshot(feedback_fn, snap_disks, idx):
+            removed_snaps[idx] = True
+
+        transfers = []
+
+        for idx, dev in enumerate(snap_disks):
+          if not dev:
+            transfers.append(None)
+            continue
 
-      finally:
-        if self.op.shutdown and instance.admin_up and not self.remove_instance:
-          feedback_fn("Starting instance %s" % instance.name)
-          result = self.rpc.call_instance_start(src_node, instance, None, None)
-          msg = result.fail_msg
-          if msg:
-            _ShutdownInstanceDisks(self, instance)
-            raise errors.OpExecError("Could not start instance: %s" % msg)
-
-      # TODO: check for size
-
-      cluster_name = self.cfg.GetClusterName()
-      for idx, dev in enumerate(snap_disks):
-        feedback_fn("Exporting snapshot %s from %s to %s" %
-                    (idx, src_node, dst_node.name))
-        if dev:
-          # FIXME: pass debug from opcode to backend
-          result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
-                                                 instance, cluster_name,
-                                                 idx, self.op.debug_level)
-          msg = result.fail_msg
-          if msg:
-            self.LogWarning("Could not export disk/%s from node %s to"
-                            " node %s: %s", idx, src_node, dst_node.name, msg)
-            dresults.append(False)
-          else:
-            dresults.append(True)
-          msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
-          if msg:
-            self.LogWarning("Could not remove snapshot for disk/%d from node"
-                            " %s: %s", idx, src_node, msg)
-        else:
-          dresults.append(False)
+          feedback_fn("Exporting snapshot %s from %s to %s" %
+                      (idx, src_node, dst_node.name))
 
-      feedback_fn("Finalizing export on %s" % dst_node.name)
-      result = self.rpc.call_finalize_export(dst_node.name, instance,
-                                             snap_disks)
-      fin_resu = True
-      msg = result.fail_msg
-      if msg:
-        self.LogWarning("Could not finalize export for instance %s"
-                        " on node %s: %s", instance.name, dst_node.name, msg)
-        fin_resu = False
+          path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
+                                dev.physical_id[1])
+
+          transfers.append(("snapshot/%s" % idx,
+                            constants.IEIO_SCRIPT, (dev, idx),
+                            constants.IEIO_FILE, (path, ),
+                            _TransferFinished, idx))
+
+        # Actually export data
+        dresults = TransferInstanceData(self, feedback_fn,
+                                        src_node, dst_node.name,
+                                        dst_node.secondary_ip,
+                                        instance, transfers)
+
+        assert len(dresults) == len(instance.disks)
+
+        # Check for backwards compatibility
+        assert utils.all([isinstance(i, bool) for i in dresults]), \
+               "Not all results are boolean: %r" % dresults
+
+        feedback_fn("Finalizing export on %s" % dst_node.name)
+        result = self.rpc.call_finalize_export(dst_node.name, instance,
+                                               snap_disks)
+        msg = result.fail_msg
+        fin_resu = not msg
+        if msg:
+          self.LogWarning("Could not finalize export for instance %s"
+                          " on node %s: %s", instance.name, dst_node.name, msg)
+
+      finally:
+        # Remove all snapshots
+        for idx, removed in enumerate(removed_snaps):
+          if not removed:
+            self._RemoveSnapshot(feedback_fn, snap_disks, idx)
 
     finally:
       if activate_disks:
@@ -9306,24 +9371,7 @@ class LUExportInstance(LogicalUnit):
       feedback_fn("Removing instance %s" % instance.name)
       _RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures)
 
-    nodelist = self.cfg.GetNodeList()
-    nodelist.remove(dst_node.name)
-
-    # on one-node clusters nodelist will be empty after the removal
-    # if we proceed the backup would be removed because OpQueryExports
-    # substitutes an empty list with the full cluster node list.
-    iname = instance.name
-    if nodelist:
-      feedback_fn("Removing old exports for instance %s" % iname)
-      exportlist = self.rpc.call_export_list(nodelist)
-      for node in exportlist:
-        if exportlist[node].fail_msg:
-          continue
-        if iname in exportlist[node].payload:
-          msg = self.rpc.call_export_remove(node, iname).fail_msg
-          if msg:
-            self.LogWarning("Could not remove older export for instance %s"
-                            " on node %s: %s", iname, node, msg)
+    self._CleanupExports(feedback_fn)
 
     return fin_resu, dresults
 
-- 
1.7.0.4



-- 
To unsubscribe, reply using "remove me" as the subject.

Reply via email to