---
 lib/cmdlib.py           |   84 +++++++++++++++++++++++++-
 lib/constants.py        |    7 ++
 lib/masterd/instance.py |  154 +++++++++++++++++++++++++++++++++++++++++++++++
 lib/opcodes.py          |   13 ++++-
 4 files changed, 254 insertions(+), 4 deletions(-)

diff --git a/lib/cmdlib.py b/lib/cmdlib.py
index 8dbdb27..aff2b4e 100644
--- a/lib/cmdlib.py
+++ b/lib/cmdlib.py
@@ -6103,8 +6103,7 @@ class LUCreateInstance(LogicalUnit):
     self.adopt_disks = has_adopt
 
     # verify creation mode
-    if self.op.mode not in (constants.INSTANCE_CREATE,
-                            constants.INSTANCE_IMPORT):
+    if self.op.mode not in constants.INSTANCE_CREATE_MODES:
       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
                                  self.op.mode, errors.ECODE_INVAL)
 
@@ -6114,6 +6113,9 @@ class LUCreateInstance(LogicalUnit):
       self.op.instance_name = self.hostname1.name
       # used in CheckPrereq for ip ping check
       self.check_ip = self.hostname1.ip
+    elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+      raise errors.OpPrereqError("Remote imports require names to be checked" %
+                                 errors.ECODE_INVAL)
     else:
       self.check_ip = None
 
@@ -6133,6 +6135,8 @@ class LUCreateInstance(LogicalUnit):
                                  " node must be given",
                                  errors.ECODE_INVAL)
 
+    self._cds = _GetClusterDomainSecret()
+
     if self.op.mode == constants.INSTANCE_IMPORT:
       # On import force_variant must be True, because if we forced it at
       # initial install, our only chance when importing it back is that it
@@ -6142,7 +6146,7 @@ class LUCreateInstance(LogicalUnit):
       if self.op.no_install:
         self.LogInfo("No-installation mode has no effect during import")
 
-    else: # INSTANCE_CREATE
+    elif self.op.mode == constants.INSTANCE_CREATE:
       if getattr(self.op, "os_type", None) is None:
         raise errors.OpPrereqError("No guest OS specified",
                                    errors.ECODE_INVAL)
@@ -6151,6 +6155,56 @@ class LUCreateInstance(LogicalUnit):
         raise errors.OpPrereqError("No disk template specified",
                                    errors.ECODE_INVAL)
 
+    elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+      # Check handshake to ensure both clusters have the same domain secret
+      src_handshake = getattr(self.op, "source_handshake", None)
+      if not src_handshake:
+        raise errors.OpPrereqError("Missing source handshake",
+                                   errors.ECODE_INVAL)
+
+      try:
+        (hmac_digest, hmac_salt) = src_handshake
+      except (TypeError, ValueError), err:
+        raise errors.OpPrereqError("Invalid data for handshake: %s" % err)
+
+      if not utils.VerifySha1Hmac(self._cds, constants.RIE_HANDSHAKE,
+                                  hmac_digest, salt=hmac_salt):
+        raise errors.OpPrereqError("Handshake didn't match, clusters don't"
+                                   " share the same domain secret",
+                                   errors.ECODE_INVAL)
+
+      # Load and check source CA
+      self.source_x509_ca_pem = getattr(self.op, "source_x509_ca", None)
+      if not self.source_x509_ca_pem:
+        raise errors.OpPrereqError("Missing source X509 CA",
+                                   errors.ECODE_INVAL)
+
+      try:
+        (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
+                                                    self._cds)
+      except OpenSSL.crypto.Error, err:
+        raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
+                                   (err, ), errors.ECODE_INVAL)
+
+      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
+      if errcode is not None:
+        raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
+                                   errors.ECODE_INVAL)
+
+      self.source_x509_ca = cert
+
+      src_instance_name = getattr(self.op, "source_instance_name", None)
+      if not src_instance_name:
+        raise errors.OpPrereqError("Missing source instance name",
+                                   errors.ECODE_INVAL)
+
+      self.source_instance_name = \
+        utils.GetHostInfo(utils.HostInfo.NormalizeName(src_instance_name)).name
+
+    else:
+      raise errors.OpPrereqError("Invalid instance creation mode %r" %
+                                 self.op.mode, errors.ECODE_INVAL)
+
   def ExpandNames(self):
     """ExpandNames for CreateInstance.
 
@@ -6829,6 +6883,30 @@ class LUCreateInstance(LogicalUnit):
           self.LogWarning("Some disks for instance %s on node %s were not"
                           " imported successfully" % (instance, pnode_name))
 
+      elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+        feedback_fn("* preparing remote import...")
+        connect_timeout = constants.RIE_CONNECT_TIMEOUT
+        timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
+
+        disk_results = masterd.instance.RemoteImport(self, feedback_fn, iobj,
+                                                     self.source_x509_ca,
+                                                     self._cds, timeouts)
+        if not compat.all(disk_results):
+          # TODO: Should the instance still be started, even if some disks
+          # failed to import (valid for local imports, too)?
+          self.LogWarning("Some disks for instance %s on node %s were not"
+                          " imported successfully" % (instance, pnode_name))
+
+        # Run rename script on newly imported instance
+        assert iobj.name == instance
+        feedback_fn("Running rename script on %s" % instance)
+        result = self.rpc.call_instance_run_rename(pnode_name, iobj,
+                                                   self.source_instance_name,
+                                                   self.op.debug_level)
+        if result.fail_msg:
+          self.LogWarning("Failed to run rename script for %s on node"
+                          " %s: %s" % (instance, pnode_name, result.fail_msg))
+
       else:
         # also checked in the prereq part
         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
diff --git a/lib/constants.py b/lib/constants.py
index fdba305..7a5943b 100644
--- a/lib/constants.py
+++ b/lib/constants.py
@@ -340,6 +340,12 @@ LOCKS_APPEND = 'append'
 # instance creation modes
 INSTANCE_CREATE = "create"
 INSTANCE_IMPORT = "import"
+INSTANCE_REMOTE_IMPORT = "remote-import"
+INSTANCE_CREATE_MODES = frozenset([
+  INSTANCE_CREATE,
+  INSTANCE_IMPORT,
+  INSTANCE_REMOTE_IMPORT,
+  ])
 
 # Remote import/export handshake message
 RIE_HANDSHAKE = "Hi, I'm Ganeti"
@@ -698,6 +704,7 @@ OPS_FINALIZED = frozenset([OP_STATUS_CANCELED,
 # Execution log types
 ELOG_MESSAGE = "message"
 ELOG_PROGRESS = "progress"
+ELOG_REMOTE_IMPORT = "remote-import"
 
 # max dynamic devices
 MAX_NICS = 8
diff --git a/lib/masterd/instance.py b/lib/masterd/instance.py
index ca244a1..a9f7a83 100644
--- a/lib/masterd/instance.py
+++ b/lib/masterd/instance.py
@@ -1187,3 +1187,157 @@ class ExportInstanceHelper:
     assert len(self._removed_snaps) == len(self._instance.disks)
     for idx in range(len(self._instance.disks)):
       self._RemoveSnapshot(idx)
+
+
+class _RemoteImportCb(ImportExportCbBase):
+  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
+               external_address):
+    """Initializes this class.
+
+    @type cds: string
+    @param cds: Cluster domain secret
+    @type x509_cert_pem: string
+    @param x509_cert_pem: CA used for signing import key
+    @type disk_count: number
+    @param disk_count: Number of disks
+    @type external_address: string
+    @param external_address: External address of destination node
+
+    """
+    ImportExportCbBase.__init__(self)
+    self._feedback_fn = feedback_fn
+    self._cds = cds
+    self._x509_cert_pem = x509_cert_pem
+    self._disk_count = disk_count
+    self._external_address = external_address
+
+    self._dresults = [None] * disk_count
+    self._daemon_port = [None] * disk_count
+
+    self._salt = utils.GenerateSecret(8)
+
+  @property
+  def disk_results(self):
+    """Returns per-disk results.
+
+    """
+    return self._dresults
+
+  def _CheckAllListening(self):
+    """Checks whether all daemons are listening.
+
+    If all daemons are listening, the information is sent to the client.
+
+    """
+    if not compat.all(dp is not None for dp in self._daemon_port):
+      return
+
+    host = self._external_address
+
+    disks = []
+    for idx, port in enumerate(self._daemon_port):
+      hmac_digest = utils.Sha1Hmac(self._cds, "%s:%s:%s" % (idx, host, port),
+                                   salt=self._salt)
+      disks.append((host, port, hmac_digest, self._salt))
+
+    assert len(disks) == self._disk_count
+
+    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
+      "disks": disks,
+      "x509_ca": self._x509_cert_pem,
+      })
+
+  def ReportListening(self, ie, private):
+    """Called when daemon started listening.
+
+    """
+    (idx, ) = private
+
+    self._feedback_fn("Disk %s is now listening" % idx)
+
+    assert self._daemon_port[idx] is None
+
+    self._daemon_port[idx] = ie.listen_port
+
+    self._CheckAllListening()
+
+  def ReportConnected(self, ie, private):
+    """Called when a connection has been established.
+
+    """
+    (idx, ) = private
+
+    self._feedback_fn("Disk %s is now receiving data" % idx)
+
+  def ReportFinished(self, ie, private):
+    """Called when a transfer has finished.
+
+    """
+    (idx, ) = private
+
+    # Daemon is certainly no longer listening
+    self._daemon_port[idx] = None
+
+    if ie.success:
+      self._feedback_fn("Disk %s finished receiving data" % idx)
+    else:
+      self._feedback_fn(("Disk %s failed to receive data: %s"
+                         " (recent output: %r)") %
+                        (idx, ie.final_message, ie.recent_output))
+
+    self._dresults[idx] = bool(ie.success)
+
+
+def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
+  """Imports an instance from another cluster.
+
+  @param lu: Logical unit instance
+  @param feedback_fn: Feedback function
+  @type instance: L{objects.Instance}
+  @param instance: Instance object
+  @type source_x509_ca: OpenSSL.crypto.X509
+  @param source_x509_ca: Import source's X509 CA
+  @type cds: string
+  @param cds: Cluster domain secret
+  @type timeouts: L{ImportExportTimeouts}
+  @param timeouts: Timeouts for this import
+
+  """
+  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                                  source_x509_ca)
+
+  # Create crypto key
+  result = lu.rpc.call_x509_cert_create(instance.primary_node,
+                                        constants.RIE_CERT_VALIDITY)
+  result.Raise("Can't create X509 key and certificate on %s" % result.node)
+
+  (x509_key_name, x509_cert_pem) = result.payload
+  try:
+    # Load certificate
+    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                                x509_cert_pem)
+
+    # Sign certificate
+    signed_x509_cert_pem = \
+      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
+
+    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
+                          len(instance.disks), instance.primary_node)
+
+    ieloop = ImportExportLoop(lu)
+    try:
+      for idx, dev in enumerate(instance.disks):
+        ieloop.Add(DiskImport(lu, instance.primary_node,
+                              x509_key_name, source_ca_pem, instance,
+                              constants.IEIO_SCRIPT, (dev, idx),
+                              timeouts, cbs, private=(idx, )))
+
+      ieloop.Run()
+    finally:
+      ieloop.FinalizeAll()
+  finally:
+    # Remove crypto key and certificate
+    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
+    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
+
+  return cbs.disk_results
diff --git a/lib/opcodes.py b/lib/opcodes.py
index 06ce78d..e05d2d8 100644
--- a/lib/opcodes.py
+++ b/lib/opcodes.py
@@ -460,7 +460,15 @@ class OpNodeEvacuationStrategy(OpCode):
 # instance opcodes
 
 class OpCreateInstance(OpCode):
-  """Create an instance."""
+  """Create an instance.
+
+  @ivar instance_name: Instance name
+  @ivar mode: Instance creation mode (one of 
L{constants.INSTANCE_CREATE_MODES})
+  @ivar source_handshake: Signed handshake from source (remote import only)
+  @ivar source_x509_ca: Source X509 CA in PEM format (remote import only)
+  @ivar source_instance_name: Previous name of instance (remote import only)
+
+  """
   OP_ID = "OP_INSTANCE_CREATE"
   OP_DSC_FIELD = "instance_name"
   __slots__ = [
@@ -473,6 +481,9 @@ class OpCreateInstance(OpCode):
     "file_storage_dir", "file_driver",
     "iallocator",
     "hypervisor", "hvparams", "beparams",
+    "source_handshake",
+    "source_x509_ca",
+    "source_instance_name",
     "dry_run",
     ]
 
-- 
1.7.0.4

Reply via email to