This is yet another wrapper around the instance import/export utility
classes, providing an even simpler API for instance imports/exports within
the same cluster.

Signed-off-by: Michael Hanselmann <[email protected]>
---
 lib/masterd/instance.py |  210 +++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 210 insertions(+), 0 deletions(-)

diff --git a/lib/masterd/instance.py b/lib/masterd/instance.py
index edf5117..6fd9d02 100644
--- a/lib/masterd/instance.py
+++ b/lib/masterd/instance.py
@@ -741,3 +741,213 @@ class ImportExportLoop:
       success = diskie.Finalize() and success
 
     return success
+
+
+class _TransferInstCbBase(ImportExportCbBase):
+  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
+               dest_node, dest_ip):
+    """Initializes this class.
+
+    """
+    ImportExportCbBase.__init__(self)
+
+    self.lu = lu
+    self.feedback_fn = feedback_fn
+    self.instance = instance
+    self.timeouts = timeouts
+    self.src_node = src_node
+    self.src_cbs = src_cbs
+    self.dest_node = dest_node
+    self.dest_ip = dest_ip
+
+
+class _TransferInstSourceCb(_TransferInstCbBase):
+  def ReportConnected(self, ie, dtp):
+    """Called when a connection has been established.
+
+    """
+    assert self.src_cbs is None
+    assert dtp.src_export == ie
+    assert dtp.dest_import
+
+    self.feedback_fn("%s is sending data on %s" %
+                     (dtp.data.name, ie.node_name))
+
+  def ReportFinished(self, ie, dtp):
+    """Called when a transfer has finished.
+
+    """
+    assert self.src_cbs is None
+    assert dtp.src_export == ie
+    assert dtp.dest_import
+
+    if ie.success:
+      self.feedback_fn("%s finished sending data" % dtp.data.name)
+    else:
+      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
+                       (dtp.data.name, ie.final_message, ie.recent_output))
+
+    dtp.RecordResult(ie.success)
+
+    cb = dtp.data.finished_fn
+    if cb:
+      cb()
+
+    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
+    # give the daemon a moment to sort things out
+    if dtp.dest_import and not ie.success:
+      dtp.dest_import.Abort()
+
+
+class _TransferInstDestCb(_TransferInstCbBase):
+  def ReportListening(self, ie, dtp):
+    """Called when daemon started listening.
+
+    """
+    assert self.src_cbs
+    assert dtp.src_export is None
+    assert dtp.dest_import
+
+    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
+
+    # Start export on source node
+    de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip,
+                    ie.listen_port, self.instance,
+                    dtp.data.src_io, dtp.data.src_ioargs,
+                    self.timeouts, self.src_cbs, private=dtp)
+    ie.loop.Add(de)
+
+    dtp.src_export = de
+
+  def ReportConnected(self, ie, dtp):
+    """Called when a connection has been established.
+
+    """
+    self.feedback_fn("%s is receiving data on %s" %
+                     (dtp.data.name, self.dest_node))
+
+  def ReportFinished(self, ie, dtp):
+    """Called when a transfer has finished.
+
+    """
+    if ie.success:
+      self.feedback_fn("%s finished receiving data" % dtp.data.name)
+    else:
+      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
+                       (dtp.data.name, ie.final_message, ie.recent_output))
+
+    dtp.RecordResult(ie.success)
+
+    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
+    # give the daemon a moment to sort things out
+    if dtp.src_export and not ie.success:
+      dtp.src_export.Abort()
+
+
+class DiskTransfer(object):
+  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
+               finished_fn):
+    """Initializes this class.
+
+    @type name: string
+    @param name: User-visible name for this transfer (e.g. "disk/0")
+    @param src_io: Source I/O type
+    @param src_ioargs: Source I/O arguments
+    @param dest_io: Destination I/O type
+    @param dest_ioargs: Destination I/O arguments
+    @type finished_fn: callable
+    @param finished_fn: Function called once transfer has finished
+
+    """
+    self.name = name
+
+    self.src_io = src_io
+    self.src_ioargs = src_ioargs
+
+    self.dest_io = dest_io
+    self.dest_ioargs = dest_ioargs
+
+    self.finished_fn = finished_fn
+
+
+class _DiskTransferPrivate(object):
+  def __init__(self, data, success):
+    """Initializes this class.
+
+    @type data: L{DiskTransfer}
+    @type success: bool
+
+    """
+    self.data = data
+
+    self.src_export = None
+    self.dest_import = None
+
+    self.success = success
+
+  def RecordResult(self, success):
+    self.success = self.success and success
+
+
+def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
+                         instance, all_transfers):
+  """Transfers an instance's data from one node to another.
+
+  @param lu: Logical unit instance
+  @param feedback_fn: Feedback function
+  @type src_node: string
+  @param src_node: Source node name
+  @type dest_node: string
+  @param dest_node: Destination node name
+  @type dest_ip: string
+  @param dest_ip: IP address of destination node
+  @type instance: L{objects.Instance}
+  @param instance: Instance object
+  @type all_transfers: list of L{DiskTransfer} instances
+  @param all_transfers: List of all disk transfers to be made
+  @rtype: list
+  @return: List with a boolean (True=successful, False=failed) for success for
+           each transfer
+
+  """
+  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
+  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
+                                  src_node, None, dest_node, dest_ip)
+  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
+                                 src_node, src_cbs, dest_node, dest_ip)
+
+  all_dtp = []
+
+  ieloop = ImportExportLoop(lu)
+  try:
+    for transfer in all_transfers:
+      if transfer:
+        feedback_fn("Exporting %s from %s to %s" %
+                    (transfer.name, src_node, dest_node))
+
+        dtp = _DiskTransferPrivate(transfer, True)
+
+        di = DiskImport(lu, dest_node, None, None, instance,
+                        transfer.dest_io, transfer.dest_ioargs,
+                        timeouts, dest_cbs, private=dtp)
+        ieloop.Add(di)
+
+        dtp.dest_import = di
+      else:
+        dtp = _DiskTransferPrivate(None, False)
+
+      all_dtp.append(dtp)
+
+    ieloop.Run()
+  finally:
+    ieloop.FinalizeAll()
+
+  assert len(all_dtp) == len(all_transfers)
+  assert compat.all([(dtp.src_export is None or
+                      dtp.src_export.success is not None) and
+                     (dtp.dest_import is None or
+                      dtp.dest_import.success is not None)
+                     for dtp in all_dtp]), \
+         "Not all imports/exports are finalized"
+
+  return [bool(dtp.success) for dtp in all_dtp]
-- 
1.7.0.4



-- 
Subscription settings: 
http://groups.google.com/group/ganeti-devel/subscribe?hl=en

Reply via email to