This is the implementation of docs/design-cluster-merger.rst. It allows the automatic merging of one or more clusters into the invoking cluster.
While this version is tested and working it still needs some tweaking here and there for error handling and user experience. Signed-off-by: René Nussbaumer <[email protected]> --- Makefile.am | 1 + tools/cluster-merge | 421 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 422 insertions(+), 0 deletions(-) create mode 100644 tools/cluster-merge diff --git a/Makefile.am b/Makefile.am index a0d87f7..8679c66 100644 --- a/Makefile.am +++ b/Makefile.am @@ -233,6 +233,7 @@ dist_tools_SCRIPTS = \ tools/burnin \ tools/cfgshell \ tools/cfgupgrade \ + tools/cluster-merge \ tools/lvmstrap pkglib_SCRIPTS = \ diff --git a/tools/cluster-merge b/tools/cluster-merge new file mode 100644 index 0000000..152b654 --- /dev/null +++ b/tools/cluster-merge @@ -0,0 +1,421 @@ +#!/usr/bin/python +# + +# Copyright (C) 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +"""Tool to merge two or more clusters together. + +The clusters have to run the same version of Ganeti! +""" + +# pylint: disable-msg=C0103 +# C0103: Invalid name cluster-merge + +import logging +import os +import optparse +import sys +import tempfile + +from ganeti import cli +from ganeti import config +from ganeti import constants +from ganeti import errors +from ganeti import ssh +from ganeti import utils + + +def Flatten(unflatten_list): + """Flattens a list. + + @param unflatten_list: A list of unflatten list objects. + @return: A flatten list + """ + if isinstance(unflatten_list, list): + return sum(map(Flatten, unflatten_list)) + else: + return unflatten_list + + +class Merger(object): + ROLLBACK_STEPS = ( + "Remove our key from authorized_keys on nodes: %(nodes)s", + "Start all instances again on the merging clusters: %(clusters)s", + "Restore %s from another master candidate" % constants.CLUSTER_CONF_FILE) + + def __init__(self, clusters): + """Initialize object with sane defaults and infos required. + + @param clusters: The list of clusters to merge in + """ + self.ssh_map = {} + self.rollback_step = -1 + self.clusters = clusters + self.work_dir = tempfile.mkdtemp(suffix="cluster-merger") + self.ssh_runner = ssh.SshRunner("faked-cluster-merger-cluster") + + def Setup(self, remote_key_name="id_dsa"): + """Sets up our end so we can do the merger. + + This method is setting us up as a preparation for the merger. + It makes the initial contact and gathers information needed. + + @param remote_key_name: The name of remote private key to fetch + default: id_dsa + @raise errors.RemoteError: for errors in communication/grabbing + """ + + remote_path = os.path.join("~", ".ssh", remote_key_name) + + # Fetch remotes private key + for cluster in self.clusters: + self.ssh_map[cluster] = {} + + # Define the closure (we need to reference outer variables) + def _WriteKey(key_file): + """Writes the remote private key. + + @param key_file: The opened key_file + """ + cmd = self._RunCmd(hostname=cluster, command="cat %s" % remote_path, + batch=False, ask_key=False) + if cmd.failed: + raise errors.RemoteError("There was an error while grabbing key from" + " %s. Exit: %i" % (cluster, cmd.exit_code)) + + key_file.write(cmd.stdout) + + key_path = self.ssh_map[cluster]["key"] = os.path.join(self.work_dir, + cluster) + utils.WriteFile(file_name=key_path, mode=0600, fn=_WriteKey) + + cmd = self._RunCmd(hostname=cluster, private_key=key_path, + command="gnt-node list -o name --no-header") + if not cmd.failed: + self.ssh_map[cluster]["nodes"] = cmd.stdout.splitlines() + else: + raise errors.RemoteError("Unable to retrieve list of nodes." + " Exit: %i" % cmd.exit_code) + + cmd = self._RunCmd(hostname=cluster, private_key=key_path, + command="gnt-instance list -o name --no-header") + if not cmd.failed: + self.ssh_map[cluster]["instances"] = cmd.stdout.splitlines() + else: + raise errors.RemoteError("Unable to retrieve list of instances." + " Exit: %i" % cmd.exit_code) + + def _PrepareAuthorizedKeys(self): + """Prepare the authorized_keys on every merging node. + + This method add our public key to remotes authorized_key for further + communication. + """ + pub_key_file = open(ssh.GetUserFiles("root")[1]) + try: + pub_key = pub_key_file.read() + finally: + pub_key_file.close() + + for cluster, data in self.ssh_map.iteritems(): + for node in data["nodes"]: + cmd = self._RunCmd(hostname=node, private_key=data["key"], + command=("cat >> %s/.ssh/authorized_keys << '!EOF.'" + "\n%s!EOF.\n" % + (utils.GetHomeDir("root"), pub_key))) + if cmd.failed: + raise errors.RemoteError("Unable to add our public key to %s in %s." + % (node, cluster)) + + def _RunCmd(self, *args, **kwargs): + """Wrapping SshRunner.Run with default parameters. + + For explanation of parameters see L{SshRunner.Run}. + """ + my_opts = {"user": "root", + "use_cluster_key": False, + "strict_host_check": False} + my_opts.update(kwargs) + # pylint: disable-msg=W0142 + # W0142: Merger._RunCmd: Used * or ** magic + return self.ssh_runner.Run(*args, **my_opts) + + def _StopMergingInstances(self): + """Stop instances on merging clusters.""" + for cluster in self.clusters: + self._RunCmd(hostname=cluster, + command="gnt-instance shutdown --all --force-multiple") + + def _DisableWatcher(self): + """Disable watch on all merging clusters, including ourself.""" + for cluster in ["localhost"] + self.clusters: + self._RunCmd(hostname=cluster, + command="gnt-cluster watcher pause 1800") + + # pylint: disable-msg=R0201 + # R0201: Method could be a function + def _EnableWatcher(self): + """Reenable watcher (locally).""" + utils.RunCmd("gnt-cluster watcher continue") + + def _StopDaemons(self): + """Stop all daemons on merging nodes.""" + for data in self.ssh_map.values(): + for node in data["nodes"]: + self._RunCmd(hostname=node, + command="/etc/init.d/ganeti stop") + + def _FetchRemoteConfig(self): + """Fetches and stores remote cluster config from the master. + + This step is needed before we can merge the config. + """ + for cluster in self.clusters: + cmd = self._RunCmd(hostname=cluster, + command="cat '%s'" % constants.CLUSTER_CONF_FILE) + + if cmd.failed: + raise errors.RemoteError("Unable to retrieve remote config on %s." % + cluster) + + config_path = self.ssh_map[cluster]["config"] = os.path.join( + self.work_dir, "%s_config.data" % cluster) + config_file = open(config_path, "w") + try: + config_file.write(cmd.stdout) + finally: + config_file.close() + + # pylint: disable-msg=R0201 + # R0201: Method could be a function + def _KillMasterDaemon(self): + """Kills the local master daemon. + + @raise errors.ProgrammerError: If unable to kill + """ + if utils.RunCmd("%s stop_master" % constants.DAEMON_UTIL).failed: + raise errors.ProgrammerError("Unable to kill ganeti-masterd.") + + def _MergeConfig(self): + """Merges all foreign config into our own config.""" + my_config = config.ConfigWriter(offline=True) + fake_ec_id = 0 # Needs to be uniq over the whole config merge + + for data in self.ssh_map.values(): + other_config = config.ConfigWriter(data["config"]) + + for node in other_config.GetNodeList(): + node_info = other_config.GetNodeInfo(node) + node_info.master_candidate = False + my_config.AddNode(node_info, str(fake_ec_id)) + fake_ec_id += 1 + + for instance in other_config.GetInstanceList(): + instance_info = other_config.GetInstanceInfo(instance) + + # Update the DRBD port assignments + # This is a little bit hackish + for dsk in instance_info.disks: + if dsk.dev_type in constants.LDS_DRBD: + port = my_config.AllocatePort() + + logical_id = list(dsk.logical_id) + logical_id[2] = port + dsk.logical_id = tuple(logical_id) + + physical_id = list(dsk.physical_id) + physical_id[1] = physical_id[3] = port + dsk.physical_id = tuple(physical_id) + + my_config.AddInstance(instance_info, str(fake_ec_id)) + fake_ec_id += 1 + + # pylint: disable-msg=R0201 + # R0201: Method could be a function + def _StartMasterDaemon(self, no_vote=False): + """Starts the local master daemon. + + @param no_vote: Should the masterd started without voting? default: False + @raise errors.ProgrammerError: If unable to start daemon. + """ + options = "" + if no_vote: + options = "--no-voting --yes-do-it" + + if utils.RunCmd("ganeti-masterd %s" % options).failed: + raise errors.ProgrammerError( + "Couldn't start ganeti-masterd %s." % options) + + def _ReaddMergedNodesAndRedist(self): + """Readds all merging nodes and make sure their config is up-to-date. + + @raise errors.ProgrammerError: If anything fails. + """ + for _, data in self.ssh_map.iteritems(): + for node in data["nodes"]: + cmd = utils.RunCmd("gnt-node add --readd --no-ssh-key-check %s" % node) + if cmd.failed: + raise errors.ProgrammerError("Couldn't readd %s. Stderr is: %s" % + (node, cmd.stderr)) + + if utils.RunCmd("gnt-cluster redist-conf").failed: + raise errors.ProgrammerError("Redistribution failed.") + + # pylint: disable-msg=R0201 + # R0201: Method could be a function + def _StartupAllInstances(self): + """Starts up all instances (locally). + + @raise errors.ProgrammerError: If unable to start clusters + """ + if utils.RunCmd("gnt-instance startup --all --force-multiple").failed: + raise errors.ProgrammerError("Unable to start all instances.") + + # pylint: disable-msg=R0201 + # R0201: Method could be a function + def _VerifyCluster(self): + """Runs gnt-cluster verify to verify the health. + + @raise errors.ProgrammError: If cluster fails on verification + """ + if utils.RunCmd("gnt-cluster verify").failed: + raise errors.ProgrammerError("Verification of cluster failed.") + + def Merge(self): + """Does the actual merge. + + It runs all the steps in the right order and updates the user about steps + taken. Also it keeps track of rollback_steps to undo everything. + """ + logging.info("Pre cluster verification") + self._VerifyCluster() + logging.info("Prepare authorized_keys") + self._PrepareAuthorizedKeys() + self.rollback_step += 1 + logging.info("Stopping merging instances (takes a while)") + self._StopMergingInstances() + self.rollback_step += 1 + logging.info("Disable watcher") + self._DisableWatcher() + logging.info("Stop daemons on merging nodes") + self._StopDaemons() + logging.info("Merging config") + self._FetchRemoteConfig() + self._KillMasterDaemon() + self._MergeConfig() + self.rollback_step += 1 + self._StartMasterDaemon(no_vote=True) + logging.warning("We are at the point of no return. Merge can not easily" + " be undone after this point.") + logging.info("Readd nodes and redistribute config") + # Set it back to -1: Nothing to recover from + self.rollback_step = -1 + self._ReaddMergedNodesAndRedist() + self._KillMasterDaemon() + self._StartMasterDaemon() + logging.info("Starting instances again") + self._StartupAllInstances() + logging.info("Post cluster verification") + self._VerifyCluster() + + def Cleanup(self): + """Clean up our environment. + + This cleans up remote private keys and configs and after that + deletes the temporary directory. + """ + for data in self.ssh_map.values(): + if "key" in data: + os.unlink(data["key"]) + if "config" in data: + os.unlink(data["config"]) + os.rmdir(self.work_dir) + + def RollbackDescription(self, step): + """Returns the rollback description for given step. + + @param step: The step to describe + @return: The described step + """ + nodes = Flatten([data["nodes"] for _, data in self.ssh_map.iteritems()]) + info = {"clusters": self.clusters, + "nodes": nodes} + return " * %s" % (self.ROLLBACK_STEPS[step] % info) + +def SetupLogging(options): + """Setting up logging infrastructure. + + @param options: Parsed command line options + """ + utils.SetupLogging("/var/log/ganeti/cluster-merge.log", + stderr_logging=True, program="cluster-merge", + debug=options.debug) + + if options.verbose: + # FIXME: Is there a better less hackish way? + root_logger = logging.getLogger("") + hndl = None + # pylint: disable-msg=W0631 + # W0631: Using possibly undefined loop variable 'hndl' + for hndl in root_logger.handlers: + if isinstance(hndl, logging.StreamHandler): + if hndl.stream.name == "<stderr>": + break + hndl = None + + if hndl: + hndl.setLevel(logging.INFO) + + +def main(): + """Main routine.""" + program = os.path.basename(sys.argv[0]) + + parser = optparse.OptionParser(usage=("%prog [--debug|--verbose] <cluster>" + " ..."), program=program) + parser.add_option(cli.DEBUG_OPT) + parser.add_option(cli.VERBOSE_OPT) + + (options, args) = parser.parse_args() + + SetupLogging(options) + + if not args: + raise errors.ProgrammerError("No clusters specified.") + + cluster_merger = Merger(args) + try: + try: + cluster_merger.Setup() + cluster_merger.Merge() + except errors.GenericError, e: + logging.critical(e) + if cluster_merger.rollback_step < 0: + logging.info("There is nothing which can/must be rolled back.") + else: + logging.info("In order to rollback do the following:") + for step in range(cluster_merger.rollback_step): + logging.info(cluster_merger.RollbackDescription(step)) + + # TODO: Keep track of steps done for a flawless resume? + finally: + cluster_merger.Cleanup() + + +if __name__ == "__main__": + main() -- 1.6.6.2
