This is yet another wrapper around the instance import/export utility classes, providing an even simpler API for instance imports/exports within the same cluster.
Signed-off-by: Michael Hanselmann <[email protected]> --- lib/masterd/instance.py | 210 +++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 210 insertions(+), 0 deletions(-) diff --git a/lib/masterd/instance.py b/lib/masterd/instance.py index edf5117..6fd9d02 100644 --- a/lib/masterd/instance.py +++ b/lib/masterd/instance.py @@ -741,3 +741,213 @@ class ImportExportLoop: success = diskie.Finalize() and success return success + + +class _TransferInstCbBase(ImportExportCbBase): + def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs, + dest_node, dest_ip): + """Initializes this class. + + """ + ImportExportCbBase.__init__(self) + + self.lu = lu + self.feedback_fn = feedback_fn + self.instance = instance + self.timeouts = timeouts + self.src_node = src_node + self.src_cbs = src_cbs + self.dest_node = dest_node + self.dest_ip = dest_ip + + +class _TransferInstSourceCb(_TransferInstCbBase): + def ReportConnected(self, ie, dtp): + """Called when a connection has been established. + + """ + assert self.src_cbs is None + assert dtp.src_export == ie + assert dtp.dest_import + + self.feedback_fn("%s is sending data on %s" % + (dtp.data.name, ie.node_name)) + + def ReportFinished(self, ie, dtp): + """Called when a transfer has finished. + + """ + assert self.src_cbs is None + assert dtp.src_export == ie + assert dtp.dest_import + + if ie.success: + self.feedback_fn("%s finished sending data" % dtp.data.name) + else: + self.feedback_fn("%s failed to send data: %s (recent output: %r)" % + (dtp.data.name, ie.final_message, ie.recent_output)) + + dtp.RecordResult(ie.success) + + cb = dtp.data.finished_fn + if cb: + cb() + + # TODO: Check whether sending SIGTERM right away is okay, maybe we should + # give the daemon a moment to sort things out + if dtp.dest_import and not ie.success: + dtp.dest_import.Abort() + + +class _TransferInstDestCb(_TransferInstCbBase): + def ReportListening(self, ie, dtp): + """Called when daemon started listening. + + """ + assert self.src_cbs + assert dtp.src_export is None + assert dtp.dest_import + + self.feedback_fn("%s is now listening, starting export" % dtp.data.name) + + # Start export on source node + de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip, + ie.listen_port, self.instance, + dtp.data.src_io, dtp.data.src_ioargs, + self.timeouts, self.src_cbs, private=dtp) + ie.loop.Add(de) + + dtp.src_export = de + + def ReportConnected(self, ie, dtp): + """Called when a connection has been established. + + """ + self.feedback_fn("%s is receiving data on %s" % + (dtp.data.name, self.dest_node)) + + def ReportFinished(self, ie, dtp): + """Called when a transfer has finished. + + """ + if ie.success: + self.feedback_fn("%s finished receiving data" % dtp.data.name) + else: + self.feedback_fn("%s failed to receive data: %s (recent output: %r)" % + (dtp.data.name, ie.final_message, ie.recent_output)) + + dtp.RecordResult(ie.success) + + # TODO: Check whether sending SIGTERM right away is okay, maybe we should + # give the daemon a moment to sort things out + if dtp.src_export and not ie.success: + dtp.src_export.Abort() + + +class DiskTransfer(object): + def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs, + finished_fn): + """Initializes this class. + + @type name: string + @param name: User-visible name for this transfer (e.g. "disk/0") + @param src_io: Source I/O type + @param src_ioargs: Source I/O arguments + @param dest_io: Destination I/O type + @param dest_ioargs: Destination I/O arguments + @type finished_fn: callable + @param finished_fn: Function called once transfer has finished + + """ + self.name = name + + self.src_io = src_io + self.src_ioargs = src_ioargs + + self.dest_io = dest_io + self.dest_ioargs = dest_ioargs + + self.finished_fn = finished_fn + + +class _DiskTransferPrivate(object): + def __init__(self, data, success): + """Initializes this class. + + @type data: L{DiskTransfer} + @type success: bool + + """ + self.data = data + + self.src_export = None + self.dest_import = None + + self.success = success + + def RecordResult(self, success): + self.success = self.success and success + + +def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, + instance, all_transfers): + """Transfers an instance's data from one node to another. + + @param lu: Logical unit instance + @param feedback_fn: Feedback function + @type src_node: string + @param src_node: Source node name + @type dest_node: string + @param dest_node: Destination node name + @type dest_ip: string + @param dest_ip: IP address of destination node + @type instance: L{objects.Instance} + @param instance: Instance object + @type all_transfers: list of L{DiskTransfer} instances + @param all_transfers: List of all disk transfers to be made + @rtype: list + @return: List with a boolean (True=successful, False=failed) for success for + each transfer + + """ + timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) + src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, + src_node, None, dest_node, dest_ip) + dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts, + src_node, src_cbs, dest_node, dest_ip) + + all_dtp = [] + + ieloop = ImportExportLoop(lu) + try: + for transfer in all_transfers: + if transfer: + feedback_fn("Exporting %s from %s to %s" % + (transfer.name, src_node, dest_node)) + + dtp = _DiskTransferPrivate(transfer, True) + + di = DiskImport(lu, dest_node, None, None, instance, + transfer.dest_io, transfer.dest_ioargs, + timeouts, dest_cbs, private=dtp) + ieloop.Add(di) + + dtp.dest_import = di + else: + dtp = _DiskTransferPrivate(None, False) + + all_dtp.append(dtp) + + ieloop.Run() + finally: + ieloop.FinalizeAll() + + assert len(all_dtp) == len(all_transfers) + assert compat.all([(dtp.src_export is None or + dtp.src_export.success is not None) and + (dtp.dest_import is None or + dtp.dest_import.success is not None) + for dtp in all_dtp]), \ + "Not all imports/exports are finalized" + + return [bool(dtp.success) for dtp in all_dtp] -- 1.7.0.4 -- Subscription settings: http://groups.google.com/group/ganeti-devel/subscribe?hl=en
