---
lib/cmdlib.py | 84 +++++++++++++++++++++++++-
lib/constants.py | 7 ++
lib/masterd/instance.py | 154 +++++++++++++++++++++++++++++++++++++++++++++++
lib/opcodes.py | 13 ++++-
4 files changed, 254 insertions(+), 4 deletions(-)
diff --git a/lib/cmdlib.py b/lib/cmdlib.py
index 8dbdb27..aff2b4e 100644
--- a/lib/cmdlib.py
+++ b/lib/cmdlib.py
@@ -6103,8 +6103,7 @@ class LUCreateInstance(LogicalUnit):
self.adopt_disks = has_adopt
# verify creation mode
- if self.op.mode not in (constants.INSTANCE_CREATE,
- constants.INSTANCE_IMPORT):
+ if self.op.mode not in constants.INSTANCE_CREATE_MODES:
raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
self.op.mode, errors.ECODE_INVAL)
@@ -6114,6 +6113,9 @@ class LUCreateInstance(LogicalUnit):
self.op.instance_name = self.hostname1.name
# used in CheckPrereq for ip ping check
self.check_ip = self.hostname1.ip
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ raise errors.OpPrereqError("Remote imports require names to be checked" %
+ errors.ECODE_INVAL)
else:
self.check_ip = None
@@ -6133,6 +6135,8 @@ class LUCreateInstance(LogicalUnit):
" node must be given",
errors.ECODE_INVAL)
+ self._cds = _GetClusterDomainSecret()
+
if self.op.mode == constants.INSTANCE_IMPORT:
# On import force_variant must be True, because if we forced it at
# initial install, our only chance when importing it back is that it
@@ -6142,7 +6146,7 @@ class LUCreateInstance(LogicalUnit):
if self.op.no_install:
self.LogInfo("No-installation mode has no effect during import")
- else: # INSTANCE_CREATE
+ elif self.op.mode == constants.INSTANCE_CREATE:
if getattr(self.op, "os_type", None) is None:
raise errors.OpPrereqError("No guest OS specified",
errors.ECODE_INVAL)
@@ -6151,6 +6155,56 @@ class LUCreateInstance(LogicalUnit):
raise errors.OpPrereqError("No disk template specified",
errors.ECODE_INVAL)
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ # Check handshake to ensure both clusters have the same domain secret
+ src_handshake = getattr(self.op, "source_handshake", None)
+ if not src_handshake:
+ raise errors.OpPrereqError("Missing source handshake",
+ errors.ECODE_INVAL)
+
+ try:
+ (hmac_digest, hmac_salt) = src_handshake
+ except (TypeError, ValueError), err:
+ raise errors.OpPrereqError("Invalid data for handshake: %s" % err)
+
+ if not utils.VerifySha1Hmac(self._cds, constants.RIE_HANDSHAKE,
+ hmac_digest, salt=hmac_salt):
+ raise errors.OpPrereqError("Handshake didn't match, clusters don't"
+ " share the same domain secret",
+ errors.ECODE_INVAL)
+
+ # Load and check source CA
+ self.source_x509_ca_pem = getattr(self.op, "source_x509_ca", None)
+ if not self.source_x509_ca_pem:
+ raise errors.OpPrereqError("Missing source X509 CA",
+ errors.ECODE_INVAL)
+
+ try:
+ (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
+ self._cds)
+ except OpenSSL.crypto.Error, err:
+ raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
+ (err, ), errors.ECODE_INVAL)
+
+ (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
+ if errcode is not None:
+ raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
+ errors.ECODE_INVAL)
+
+ self.source_x509_ca = cert
+
+ src_instance_name = getattr(self.op, "source_instance_name", None)
+ if not src_instance_name:
+ raise errors.OpPrereqError("Missing source instance name",
+ errors.ECODE_INVAL)
+
+ self.source_instance_name = \
+ utils.GetHostInfo(utils.HostInfo.NormalizeName(src_instance_name)).name
+
+ else:
+ raise errors.OpPrereqError("Invalid instance creation mode %r" %
+ self.op.mode, errors.ECODE_INVAL)
+
def ExpandNames(self):
"""ExpandNames for CreateInstance.
@@ -6829,6 +6883,30 @@ class LUCreateInstance(LogicalUnit):
self.LogWarning("Some disks for instance %s on node %s were not"
" imported successfully" % (instance, pnode_name))
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ feedback_fn("* preparing remote import...")
+ connect_timeout = constants.RIE_CONNECT_TIMEOUT
+ timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
+
+ disk_results = masterd.instance.RemoteImport(self, feedback_fn, iobj,
+ self.source_x509_ca,
+ self._cds, timeouts)
+ if not compat.all(disk_results):
+ # TODO: Should the instance still be started, even if some disks
+ # failed to import (valid for local imports, too)?
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
+
+ # Run rename script on newly imported instance
+ assert iobj.name == instance
+ feedback_fn("Running rename script on %s" % instance)
+ result = self.rpc.call_instance_run_rename(pnode_name, iobj,
+ self.source_instance_name,
+ self.op.debug_level)
+ if result.fail_msg:
+ self.LogWarning("Failed to run rename script for %s on node"
+ " %s: %s" % (instance, pnode_name, result.fail_msg))
+
else:
# also checked in the prereq part
raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
diff --git a/lib/constants.py b/lib/constants.py
index fdba305..7a5943b 100644
--- a/lib/constants.py
+++ b/lib/constants.py
@@ -340,6 +340,12 @@ LOCKS_APPEND = 'append'
# instance creation modes
INSTANCE_CREATE = "create"
INSTANCE_IMPORT = "import"
+INSTANCE_REMOTE_IMPORT = "remote-import"
+INSTANCE_CREATE_MODES = frozenset([
+ INSTANCE_CREATE,
+ INSTANCE_IMPORT,
+ INSTANCE_REMOTE_IMPORT,
+ ])
# Remote import/export handshake message
RIE_HANDSHAKE = "Hi, I'm Ganeti"
@@ -698,6 +704,7 @@ OPS_FINALIZED = frozenset([OP_STATUS_CANCELED,
# Execution log types
ELOG_MESSAGE = "message"
ELOG_PROGRESS = "progress"
+ELOG_REMOTE_IMPORT = "remote-import"
# max dynamic devices
MAX_NICS = 8
diff --git a/lib/masterd/instance.py b/lib/masterd/instance.py
index ca244a1..a9f7a83 100644
--- a/lib/masterd/instance.py
+++ b/lib/masterd/instance.py
@@ -1187,3 +1187,157 @@ class ExportInstanceHelper:
assert len(self._removed_snaps) == len(self._instance.disks)
for idx in range(len(self._instance.disks)):
self._RemoveSnapshot(idx)
+
+
+class _RemoteImportCb(ImportExportCbBase):
+ def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
+ external_address):
+ """Initializes this class.
+
+ @type cds: string
+ @param cds: Cluster domain secret
+ @type x509_cert_pem: string
+ @param x509_cert_pem: CA used for signing import key
+ @type disk_count: number
+ @param disk_count: Number of disks
+ @type external_address: string
+ @param external_address: External address of destination node
+
+ """
+ ImportExportCbBase.__init__(self)
+ self._feedback_fn = feedback_fn
+ self._cds = cds
+ self._x509_cert_pem = x509_cert_pem
+ self._disk_count = disk_count
+ self._external_address = external_address
+
+ self._dresults = [None] * disk_count
+ self._daemon_port = [None] * disk_count
+
+ self._salt = utils.GenerateSecret(8)
+
+ @property
+ def disk_results(self):
+ """Returns per-disk results.
+
+ """
+ return self._dresults
+
+ def _CheckAllListening(self):
+ """Checks whether all daemons are listening.
+
+ If all daemons are listening, the information is sent to the client.
+
+ """
+ if not compat.all(dp is not None for dp in self._daemon_port):
+ return
+
+ host = self._external_address
+
+ disks = []
+ for idx, port in enumerate(self._daemon_port):
+ hmac_digest = utils.Sha1Hmac(self._cds, "%s:%s:%s" % (idx, host, port),
+ salt=self._salt)
+ disks.append((host, port, hmac_digest, self._salt))
+
+ assert len(disks) == self._disk_count
+
+ self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
+ "disks": disks,
+ "x509_ca": self._x509_cert_pem,
+ })
+
+ def ReportListening(self, ie, private):
+ """Called when daemon started listening.
+
+ """
+ (idx, ) = private
+
+ self._feedback_fn("Disk %s is now listening" % idx)
+
+ assert self._daemon_port[idx] is None
+
+ self._daemon_port[idx] = ie.listen_port
+
+ self._CheckAllListening()
+
+ def ReportConnected(self, ie, private):
+ """Called when a connection has been established.
+
+ """
+ (idx, ) = private
+
+ self._feedback_fn("Disk %s is now receiving data" % idx)
+
+ def ReportFinished(self, ie, private):
+ """Called when a transfer has finished.
+
+ """
+ (idx, ) = private
+
+ # Daemon is certainly no longer listening
+ self._daemon_port[idx] = None
+
+ if ie.success:
+ self._feedback_fn("Disk %s finished receiving data" % idx)
+ else:
+ self._feedback_fn(("Disk %s failed to receive data: %s"
+ " (recent output: %r)") %
+ (idx, ie.final_message, ie.recent_output))
+
+ self._dresults[idx] = bool(ie.success)
+
+
+def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
+ """Imports an instance from another cluster.
+
+ @param lu: Logical unit instance
+ @param feedback_fn: Feedback function
+ @type instance: L{objects.Instance}
+ @param instance: Instance object
+ @type source_x509_ca: OpenSSL.crypto.X509
+ @param source_x509_ca: Import source's X509 CA
+ @type cds: string
+ @param cds: Cluster domain secret
+ @type timeouts: L{ImportExportTimeouts}
+ @param timeouts: Timeouts for this import
+
+ """
+ source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ source_x509_ca)
+
+ # Create crypto key
+ result = lu.rpc.call_x509_cert_create(instance.primary_node,
+ constants.RIE_CERT_VALIDITY)
+ result.Raise("Can't create X509 key and certificate on %s" % result.node)
+
+ (x509_key_name, x509_cert_pem) = result.payload
+ try:
+ # Load certificate
+ x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ x509_cert_pem)
+
+ # Sign certificate
+ signed_x509_cert_pem = \
+ utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
+
+ cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
+ len(instance.disks), instance.primary_node)
+
+ ieloop = ImportExportLoop(lu)
+ try:
+ for idx, dev in enumerate(instance.disks):
+ ieloop.Add(DiskImport(lu, instance.primary_node,
+ x509_key_name, source_ca_pem, instance,
+ constants.IEIO_SCRIPT, (dev, idx),
+ timeouts, cbs, private=(idx, )))
+
+ ieloop.Run()
+ finally:
+ ieloop.FinalizeAll()
+ finally:
+ # Remove crypto key and certificate
+ result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
+ result.Raise("Can't remove X509 key and certificate on %s" % result.node)
+
+ return cbs.disk_results
diff --git a/lib/opcodes.py b/lib/opcodes.py
index 06ce78d..e05d2d8 100644
--- a/lib/opcodes.py
+++ b/lib/opcodes.py
@@ -460,7 +460,15 @@ class OpNodeEvacuationStrategy(OpCode):
# instance opcodes
class OpCreateInstance(OpCode):
- """Create an instance."""
+ """Create an instance.
+
+ @ivar instance_name: Instance name
+ @ivar mode: Instance creation mode (one of
L{constants.INSTANCE_CREATE_MODES})
+ @ivar source_handshake: Signed handshake from source (remote import only)
+ @ivar source_x509_ca: Source X509 CA in PEM format (remote import only)
+ @ivar source_instance_name: Previous name of instance (remote import only)
+
+ """
OP_ID = "OP_INSTANCE_CREATE"
OP_DSC_FIELD = "instance_name"
__slots__ = [
@@ -473,6 +481,9 @@ class OpCreateInstance(OpCode):
"file_storage_dir", "file_driver",
"iallocator",
"hypervisor", "hvparams", "beparams",
+ "source_handshake",
+ "source_x509_ca",
+ "source_instance_name",
"dry_run",
]
--
1.7.0.4