Signed-off-by: Michael Hanselmann <[email protected]>
---
 lib/cmdlib.py           |  137 ++++++++++++++++++++++++++++++++++++++++------
 lib/constants.py        |   11 ++++
 lib/masterd/instance.py |   82 ++++++++++++++++++++++++++++
 lib/opcodes.py          |   21 +++++++-
 4 files changed, 232 insertions(+), 19 deletions(-)

diff --git a/lib/cmdlib.py b/lib/cmdlib.py
index a621451..8dbdb27 100644
--- a/lib/cmdlib.py
+++ b/lib/cmdlib.py
@@ -8905,23 +8905,41 @@ class LUExportInstance(LogicalUnit):
     self.remove_instance = getattr(self.op, "remove_instance", False)
     self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
                                           False)
+    self.export_mode = getattr(self.op, "mode", constants.EXPORT_MODE_LOCAL)
+    self.x509_key_name = getattr(self.op, "x509_key_name", None)
+    self.dest_x509_ca_pem = getattr(self.op, "destination_x509_ca", None)
 
     if self.remove_instance and not self.op.shutdown:
       raise errors.OpPrereqError("Can not remove instance without shutting it"
                                  " down before")
 
+    if self.export_mode not in constants.EXPORT_MODES:
+      raise errors.OpPrereqError("Invalid export mode %r" % self.export_mode,
+                                 errors.ECODE_INVAL)
+
+    if self.export_mode == constants.EXPORT_MODE_REMOTE:
+      if not self.x509_key_name:
+        raise errors.OpPrereqError("Missing X509 key name for encryption",
+                                   errors.ECODE_INVAL)
+
+      if not self.dest_x509_ca_pem:
+        raise errors.OpPrereqError("Missing destination X509 CA",
+                                   errors.ECODE_INVAL)
+
   def ExpandNames(self):
     self._ExpandAndLockInstance()
 
-    # FIXME: lock only instance primary and destination node
-    #
-    # Sad but true, for now we have do lock all nodes, as we don't know where
-    # the previous export might be, and and in this LU we search for it and
-    # remove it from its current node. In the future we could fix this by:
-    #  - making a tasklet to search (share-lock all), then create the new one,
-    #    then one to remove, after
-    #  - removing the removal operation altogether
-    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    # Lock all nodes for local exports
+    if self.export_mode == constants.EXPORT_MODE_LOCAL:
+      # FIXME: lock only instance primary and destination node
+      #
+      # Sad but true, for now we have do lock all nodes, as we don't know where
+      # the previous export might be, and in this LU we search for it and
+      # remove it from its current node. In the future we could fix this by:
+      #  - making a tasklet to search (share-lock all), then create the new 
one,
+      #    then one to remove, after
+      #  - removing the removal operation altogether
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
 
   def DeclareLocks(self, level):
     """Last minute lock declaration."""
@@ -8934,15 +8952,21 @@ class LUExportInstance(LogicalUnit):
 
     """
     env = {
+      "EXPORT_MODE": self.export_mode,
       "EXPORT_NODE": self.op.target_node,
       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
       "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
       # TODO: Generic function for boolean env variables
       "REMOVE_INSTANCE": str(bool(self.remove_instance)),
       }
+
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
-          self.op.target_node]
+
+    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
+
+    if self.export_mode == constants.EXPORT_MODE_LOCAL:
+      nl.append(self.op.target_node)
+
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -8952,17 +8976,79 @@ class LUExportInstance(LogicalUnit):
 
     """
     instance_name = self.op.instance_name
+
     self.instance = self.cfg.GetInstanceInfo(instance_name)
     assert self.instance is not None, \
           "Cannot retrieve locked instance %s" % self.op.instance_name
     _CheckNodeOnline(self, self.instance.primary_node)
 
-    self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
-    self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
-    assert self.dst_node is not None
+    if self.export_mode == constants.EXPORT_MODE_LOCAL:
+      self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
+      assert self.dst_node is not None
+
+      _CheckNodeOnline(self, self.dst_node.name)
+      _CheckNodeNotDrained(self, self.dst_node.name)
+
+      self._cds = None
+      self.dest_x509_ca = None
 
-    _CheckNodeOnline(self, self.dst_node.name)
-    _CheckNodeNotDrained(self, self.dst_node.name)
+    elif self.export_mode == constants.EXPORT_MODE_REMOTE:
+      self.dst_node = None
+
+      if len(self.op.target_node) != len(self.instance.disks):
+        raise errors.OpPrereqError(("Received destination information for %s"
+                                    " disks, but instance %s has %s disks") %
+                                   (len(self.op.target_node), instance_name,
+                                    len(self.instance.disks)),
+                                   errors.ECODE_INVAL)
+
+      cds = _GetClusterDomainSecret()
+
+      # Check X509 key name
+      try:
+        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
+      except (TypeError, ValueError), err:
+        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err)
+
+      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
+        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
+                                   errors.ECODE_INVAL)
+
+      # Load and verify CA
+      try:
+        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
+      except OpenSSL.crypto.Error, err:
+        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
+                                   (err, ), errors.ECODE_INVAL)
+
+      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
+      if errcode is not None:
+        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" % (msg, 
),
+                                   errors.ECODE_INVAL)
+
+      self.dest_x509_ca = cert
+
+      # Verify target information
+      for idx, disk_data in enumerate(self.op.target_node):
+        try:
+          (host, port, hmac_digest, hmac_salt) = disk_data
+        except (TypeError, ValueError), err:
+          raise errors.OpPrereqError("Invalid data for disk %s: %s" %
+                                     (idx, err))
+
+        if not (host and port):
+          raise errors.OpPrereqError("Disk %s is missing destination host or"
+                                     " port")
+
+        if not utils.VerifySha1Hmac(cds, "%s:%s:%s" % (idx, host, port),
+                                    hmac_digest, salt=hmac_salt):
+          raise errors.OpPrereqError("HMAC for disk %s is wrong" % idx,
+                                     errors.ECODE_INVAL)
+
+    else:
+      raise errors.ProgrammerError("Unhandled export mode %r" %
+                                   self.export_mode)
 
     # instance disk type verification
     # TODO: Implement export support for file-based disks
@@ -8978,6 +9064,8 @@ class LUExportInstance(LogicalUnit):
     exports will be removed from the nodes A, B and D.
 
     """
+    assert self.export_mode != constants.EXPORT_MODE_REMOTE
+
     nodelist = self.cfg.GetNodeList()
     nodelist.remove(self.dst_node.name)
 
@@ -9001,6 +9089,8 @@ class LUExportInstance(LogicalUnit):
     """Export an instance to an image in the cluster.
 
     """
+    assert self.export_mode in constants.EXPORT_MODES
+
     instance = self.instance
     src_node = instance.primary_node
 
@@ -9031,7 +9121,17 @@ class LUExportInstance(LogicalUnit):
 
       helper.CreateSnapshots()
       try:
-        (fin_resu, dresults) = helper.LocalExport(self.dst_node)
+        if self.export_mode == constants.EXPORT_MODE_LOCAL:
+          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
+        elif self.export_mode == constants.EXPORT_MODE_REMOTE:
+          connect_timeout = constants.RIE_CONNECT_TIMEOUT
+          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
+
+          (key_name, _, _) = self.x509_key_name
+          (fin_resu, dresults) = helper.RemoteExport(key_name,
+                                                     self.dest_x509_ca,
+                                                     self.op.target_node,
+                                                     timeouts)
       finally:
         helper.Cleanup()
 
@@ -9055,7 +9155,8 @@ class LUExportInstance(LogicalUnit):
         _RemoveInstance(self, feedback_fn, instance,
                         self.ignore_remove_failures)
 
-    self._CleanupExports(feedback_fn)
+    if self.export_mode == constants.EXPORT_MODE_LOCAL:
+      self._CleanupExports(feedback_fn)
 
     return fin_resu, dresults
 
diff --git a/lib/constants.py b/lib/constants.py
index cf6a656..fdba305 100644
--- a/lib/constants.py
+++ b/lib/constants.py
@@ -325,6 +325,14 @@ REPLACE_DISK_SEC = "replace_on_secondary"  # replace disks 
on secondary
 REPLACE_DISK_CHG = "replace_new_secondary" # change secondary node
 REPLACE_DISK_AUTO = "replace_auto"
 
+# Instance export mode
+EXPORT_MODE_LOCAL = "local"
+EXPORT_MODE_REMOTE = "remote"
+EXPORT_MODES = frozenset([
+  EXPORT_MODE_LOCAL,
+  EXPORT_MODE_REMOTE,
+  ])
+
 # lock recalculate mode
 LOCKS_REPLACE = 'replace'
 LOCKS_APPEND = 'append'
@@ -339,6 +347,9 @@ RIE_HANDSHAKE = "Hi, I'm Ganeti"
 # Remote import/export certificate validity in seconds
 RIE_CERT_VALIDITY = 24 * 60 * 60
 
+# Remote import/export connect timeout for socat
+RIE_CONNECT_TIMEOUT = 60
+
 DISK_TEMPLATES = frozenset([DT_DISKLESS, DT_PLAIN,
                             DT_DRBD8, DT_FILE])
 
diff --git a/lib/masterd/instance.py b/lib/masterd/instance.py
index 7e95b0d..ca244a1 100644
--- a/lib/masterd/instance.py
+++ b/lib/masterd/instance.py
@@ -25,6 +25,7 @@
 
 import logging
 import time
+import OpenSSL
 
 from ganeti import constants
 from ganeti import errors
@@ -966,6 +967,48 @@ def TransferInstanceData(lu, feedback_fn, src_node, 
dest_node, dest_ip,
   return [bool(dtp.success) for dtp in all_dtp]
 
 
+class _RemoteExportCb(ImportExportCbBase):
+  def __init__(self, feedback_fn, disk_count):
+    """Initializes this class.
+
+    """
+    ImportExportCbBase.__init__(self)
+    self._feedback_fn = feedback_fn
+    self._dresults = [None] * disk_count
+
+  @property
+  def disk_results(self):
+    """Returns per-disk results.
+
+    """
+    return self._dresults
+
+  def ReportConnected(self, ie, private):
+    """Called when a connection has been established.
+
+    """
+    (idx, _) = private
+
+    self._feedback_fn("Disk %s is now sending data" % idx)
+
+  def ReportFinished(self, ie, private):
+    """Called when a transfer has finished.
+
+    """
+    (idx, finished_fn) = private
+
+    if ie.success:
+      self._feedback_fn("Disk %s finished sending data" % idx)
+    else:
+      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
+                        (idx, ie.final_message, ie.recent_output))
+
+    self._dresults[idx] = bool(ie.success)
+
+    if finished_fn:
+      finished_fn()
+
+
 class ExportInstanceHelper:
   def __init__(self, lu, feedback_fn, instance):
     """Initializes this class.
@@ -1088,6 +1131,45 @@ class ExportInstanceHelper:
 
     return (fin_resu, dresults)
 
+  def RemoteExport(self, x509_key_name, dest_x509_ca, disk_info, timeouts):
+    """Inter-cluster instance export.
+
+    @type x509_key_name: string
+    @param x509_key_name: X509 key name for encrypting data
+    @type dest_x509_ca: OpenSSL.crypto.X509
+    @param dest_x509_ca: Remote peer X509 CA object
+    @type disk_info: list
+    @param disk_info: Per-disk destination information
+    @type timeouts: L{ImportExportTimeouts}
+    @param timeouts: Timeouts for this import
+
+    """
+    instance = self._instance
+
+    assert len(disk_info) == len(instance.disks)
+
+    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
+
+    dest_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                                  dest_x509_ca)
+
+    ieloop = ImportExportLoop(self._lu)
+    try:
+      for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
+                                                          disk_info)):
+        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
+        finished_fn = compat.partial(self._TransferFinished, idx)
+        ieloop.Add(DiskExport(self._lu, instance.primary_node,
+                              x509_key_name, dest_ca_pem, host, port, instance,
+                              constants.IEIO_SCRIPT, (dev, idx),
+                              timeouts, cbs, private=(idx, finished_fn)))
+
+      ieloop.Run()
+    finally:
+      ieloop.FinalizeAll()
+
+    return (True, cbs.disk_results)
+
   def _TransferFinished(self, idx):
     """Called once a transfer has finished.
 
diff --git a/lib/opcodes.py b/lib/opcodes.py
index 149dbab..06ce78d 100644
--- a/lib/opcodes.py
+++ b/lib/opcodes.py
@@ -668,13 +668,32 @@ class OpPrepareExport(OpCode):
 
 
 class OpExportInstance(OpCode):
-  """Export an instance."""
+  """Export an instance.
+
+  For local exports, the export destination is the node name. For remote
+  exports, the export destination is a list of tuples, each consisting of
+  hostname/IP address, port, HMAC and HMAC salt. The HMAC is calculated using
+  the cluster domain secret over the value "${index}:${hostname}:${port}". The
+  destination X509 CA must be a signed certificate.
+
+  @ivar mode: Export mode (one of L{constants.EXPORT_MODES})
+  @ivar target_node: Export destination
+  @ivar x509_key_name: X509 key to use (remote export only)
+  @ivar destination_x509_ca: Destination X509 CA in PEM format (remote export
+                             only)
+
+  """
   OP_ID = "OP_BACKUP_EXPORT"
   OP_DSC_FIELD = "instance_name"
   __slots__ = [
+    # TODO: Rename target_node as it changes meaning for different export modes
+    # (e.g. "destination")
     "instance_name", "target_node", "shutdown", "shutdown_timeout",
     "remove_instance",
     "ignore_remove_failures",
+    "mode",
+    "x509_key_name",
+    "destination_x509_ca",
     ]
 
 
-- 
1.7.0.4

Reply via email to