This is the implementation of docs/design-cluster-merger.rst. It allows the automatic merging of one or more clusters into the invoking cluster.
While this version is tested and working it still needs some tweaking here and there for error handling and user experience. Signed-off-by: René Nussbaumer <[email protected]> --- Makefile.am | 1 + lib/ssh.py | 12 +- tools/cluster-merge | 407 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 417 insertions(+), 3 deletions(-) create mode 100644 tools/cluster-merge diff --git a/Makefile.am b/Makefile.am index b5c385f..786b2fd 100644 --- a/Makefile.am +++ b/Makefile.am @@ -235,6 +235,7 @@ dist_tools_SCRIPTS = \ tools/burnin \ tools/cfgshell \ tools/cfgupgrade \ + tools/cluster-merge \ tools/lvmstrap pkglib_SCRIPTS = \ diff --git a/lib/ssh.py b/lib/ssh.py index 31fbdbb..b2cdab0 100644 --- a/lib/ssh.py +++ b/lib/ssh.py @@ -75,7 +75,7 @@ class SshRunner: self.cluster_name = cluster_name def _BuildSshOptions(self, batch, ask_key, use_cluster_key, - strict_host_check): + strict_host_check, private_key=None): """Builds a list with needed SSH options. @param batch: same as ssh's batch option @@ -84,6 +84,7 @@ class SshRunner: @param use_cluster_key: if True, use the cluster name as the HostKeyAlias name @param strict_host_check: this makes the host key checking strict + @param private_key: use this private key instead of the default @rtype: list @return: the list of options ready to use in L{utils.RunCmd} @@ -99,6 +100,9 @@ class SshRunner: if use_cluster_key: options.append("-oHostKeyAlias=%s" % self.cluster_name) + if private_key: + options.append('-i%s' % private_key) + # TODO: Too many boolean options, maybe convert them to more descriptive # constants. @@ -122,7 +126,8 @@ class SshRunner: return options def BuildCmd(self, hostname, user, command, batch=True, ask_key=False, - tty=False, use_cluster_key=True, strict_host_check=True): + tty=False, use_cluster_key=True, strict_host_check=True, + private_key=None): """Build an ssh command to execute a command on a remote node. @param hostname: the target host, string @@ -135,13 +140,14 @@ class SshRunner: @param use_cluster_key: whether to expect and use the cluster-global SSH key @param strict_host_check: whether to check the host's SSH key at all + @param private_key: use this private key instead of the default @return: the ssh call to run 'command' on the remote host. """ argv = [constants.SSH, "-q"] argv.extend(self._BuildSshOptions(batch, ask_key, use_cluster_key, - strict_host_check)) + strict_host_check, private_key)) if tty: argv.append("-t") argv.extend(["%...@%s" % (user, hostname), command]) diff --git a/tools/cluster-merge b/tools/cluster-merge new file mode 100644 index 0000000..77ec286 --- /dev/null +++ b/tools/cluster-merge @@ -0,0 +1,407 @@ +#!/usr/bin/python +# + +# Copyright (C) 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +"""Tool to merge two or more clusters together. + +The clusters have to run the same version of Ganeti! +""" + +import logging +import os +import optparse +import sys +import tempfile + +from ganeti import cli +from ganeti import config +from ganeti import constants +from ganeti import errors +from ganeti import ssh +from ganeti import utils + + +def flatten(unflatten_list): + """Flattens a list. + + @param unflatten_list: A list of unflatten list objects. + @return: A flatten list + """ + if isinstance(unflatten_list, list): + return sum(map(flatten, unflatten_list)) + else: + return unflatten_list + + +class Merger(object): + ROLLBACK_STEPS = ( + 'Remove our key from authorized_keys on nodes: %(nodes)s', + 'Start all instances again on the merging clusters: %(clusters)s', + 'Restore /var/lib/ganeti/config.data from another master candidate') + + def __init__(self, clusters): + """Initialize object with sane defaults and infos required. + + @param clusters: The list of clusters to merge in + """ + self.ssh_map = {} + self.rollback_step = -1 + self.clusters = clusters + self.work_dir = tempfile.mkdtemp(suffix="cluster-merger") + self.ssh_runner = ssh.SshRunner("faked-cluster-merger-cluster") + + def Setup(self, remote_key_name="id_dsa"): + """Sets up our end so we can do the merger. + + This method is setting us up as a preparation for the merger. + It makes the initial contact and gathers information needed. + + @param remote_key_name: The name of remote private key to fetch + default: id_dsa + @raise errors.RemoteError: for errors in communication/grabbing + """ + remote_path = os.path.join("~", ".ssh", remote_key_name) + + # Fetch remotes private key + for cluster in self.clusters: + self.ssh_map[cluster] = {} + key_path = self.ssh_map[cluster]["key"] = os.path.join(self.work_dir, + cluster) + key_file = open(key_path, "w") + try: + os.chmod(key_path, 0600) + cmd = self._RunCmd(hostname=cluster, command="cat %s" % remote_path, + batch=False, ask_key=False) + if cmd.failed: + raise errors.RemoteError( + "There was an error while grabbing key from %s. Exit: %i" % + (cluster, cmd.exit_code)) + + key_file.write(cmd.stdout) + finally: + key_file.close() + + cmd = self._RunCmd(hostname=cluster, private_key=key_path, + command="gnt-node list -o name --no-header") + if not cmd.failed: + self.ssh_map[cluster]["nodes"] = cmd.stdout.splitlines() + else: + raise errors.RemoteError( + "Unable to retrieve list of nodes. Exit: %i" % + cmd.exit_code) + + cmd = self._RunCmd(hostname=cluster, private_key=key_path, + command="gnt-instance list -o name --no-header") + if not cmd.failed: + self.ssh_map[cluster]["instances"] = cmd.stdout.splitlines() + else: + raise errors.RemoteError( + "Unable to retrieve list of instances. Exit: %i" % + cmd.exit_code) + + def _PrepareAuthorizedKeys(self): + """Prepare the authorized_keys on every merging node. + + This method add our public key to remotes authorized_key for further + communication. + """ + pub_key_file = open(ssh.GetUserFiles("root")[1]) + try: + pub_key = pub_key_file.read() + finally: + pub_key_file.close() + + for cluster, data in self.ssh_map.iteritems(): + for node in data["nodes"]: + cmd = self._RunCmd( + hostname=node, private_key=data["key"], + command=("cat >> /root/.ssh/authorized_keys << '!EOF.'\n%s!EOF.\n" % + pub_key)) + if cmd.failed: + raise errors.RemoteError( + "Unable to add our public key to %s in %s." % (node, cluster)) + + def _RunCmd(self, *args, **kwargs): + """Wrapping SshRunner.Run with default parameters. + + For explanation of parameters see L{SshRunner.Run}. + """ + my_opts = { "user": "root", + "use_cluster_key": False, + "strict_host_check": False} + my_opts.update(kwargs) + return self.ssh_runner.Run(*args, **my_opts) + + def _StopMergingInstances(self): + """Stop instances on merging clusters.""" + for cluster in self.clusters: + cmd = self._RunCmd(hostname=cluster, + command="gnt-instance shutdown --all --force-multiple") + + def _DisableWatcher(self): + """Disable watch on all merging clusters, including ourself.""" + for cluster in ["localhost"] + self.clusters: + cmd = self._RunCmd(hostname=cluster, + command="gnt-cluster watcher pause 1800") + + def _EnableWatcher(self): + """Reenable watcher (locally).""" + cmd = utils.RunCmd("gnt-cluster watcher continue") + + def _StopDaemons(self): + """Stop all daemons on merging nodes.""" + for cluster, data in self.ssh_map.iteritems(): + for node in data["nodes"]: + cmd = self._RunCmd(hostname=node, + command="/etc/init.d/ganeti stop") + + def _FetchRemoteConfig(self): + """Fetches and stores remote cluster config from the master. + + This step is needed before we can merge the config. + """ + for cluster in self.clusters: + cmd = self._RunCmd(hostname=cluster, + command="cat '%s'" % constants.CLUSTER_CONF_FILE) + + if cmd.failed: + raise errors.RemoteError("Unable to retrieve remote config on %s." % + cluster) + + config_path = self.ssh_map[cluster]["config"] = os.path.join( + self.work_dir, '%s_config.data' % cluster) + config_file = open(config_path, "w") + try: + config_file.write(cmd.stdout) + finally: + config_file.close() + + def _KillMasterDaemon(self, max_retries=10): + """Kills the local master daemon. + + @raise errors.ProgrammerError: If unable to kill + """ + for retry in xrange(max_retries): + # Due to async nature we don't care about the return value here and + # check later if the process was really killed + utils.RunCmd("pkill ganeti-masterd") + if utils.RunCmd("pgrep ganeti-masterd").failed: + # This means it's not running anymore + break + + if retry >= (max_retries - 1): + raise errors.ProgrammerError("Unable to kill local ganeti-masterd.") + + def _MergeConfig(self): + """Merges all foreign config into our own config.""" + my_config = config.ConfigWriter(offline=True) + fake_ec_id = 0 # Needs to be uniq over the whole config merge + + for cluster, data in self.ssh_map.iteritems(): + other_config = config.ConfigWriter(data["config"]) + + for node in other_config.GetNodeList(): + node_info = other_config.GetNodeInfo(node) + node_info.master_candidate = False + my_config.AddNode(node_info, str(fake_ec_id)) + fake_ec_id += 1 + + for instance in other_config.GetInstanceList(): + instance_info = other_config.GetInstanceInfo(instance) + + # Update the DRBD port assignments + # This is a little bit hackish + for dsk in instance_info.disks: + if dsk.dev_type in constants.LDS_DRBD: + port = my_config.AllocatePort() + + logical_id = list(dsk.logical_id) + logical_id[2] = port + dsk.logical_id = tuple(logical_id) + + physical_id = list(dsk.physical_id) + physical_id[1] = physical_id[3] = port + dsk.physical_id = tuple(physical_id) + + my_config.AddInstance(instance_info, str(fake_ec_id)) + fake_ec_id += 1 + + def _StartMasterDaemon(self, no_vote=False): + """Starts the local master daemon. + + @param no_vote: Should the masterd started without voting? default: False + @raise errors.ProgrammerError: If unable to start daemon. + """ + options = "" + if no_vote: + options = "--no-voting --yes-do-it" + + if utils.RunCmd("ganeti-masterd %s" % options).failed: + raise errors.ProgrammerError( + "Couldn't start ganeti-masterd %s." % options) + + def _ReaddMergedNodesAndRedist(self): + """Readds all merging nodes and make sure their config is up-to-date. + + @raise errors.ProgrammerError: If anything fails. + """ + for _, data in self.ssh_map.iteritems(): + for node in data["nodes"]: + cmd = utils.RunCmd("gnt-node add --readd --no-ssh-key-check %s" % node) + if cmd.failed: + raise errors.ProgrammerError( + "Couldn't readd %s. Stderr is: %s" % (node, cmd.stderr)) + + if utils.RunCmd("gnt-cluster redist-conf").failed: + raise errors.ProgrammerError("Redistribution failed.") + + def _StartupAllInstances(self): + """Starts up all instances (locally). + + @raise errors.ProgrammerError: If unable to start clusters + """ + if utils.RunCmd("gnt-instance startup --all --force-multiple").failed: + raise errors.ProgrammerError("Unable to start all instances.") + + def _VerifyCluster(self): + """Runs gnt-cluster verify to verify the health. + + @raise errors.ProgrammError: If cluster fails on verification + """ + if utils.RunCmd("gnt-cluster verify").failed: + raise errors.ProgrammerError("Verification of cluster failed.") + + def Merge(self): + """Does the actual merge. + + It runs all the steps in the right order and updates the user about steps + taken. Also it keeps track of rollback_steps to undo everything. + """ + logging.info("Pre cluster verification") + self._VerifyCluster() + logging.info("Prepare authorized_keys") + self._PrepareAuthorizedKeys() + self.rollback_step += 1 + logging.info("Stopping merging instances (takes a while)") + self._StopMergingInstances() + self.rollback_step += 1 + logging.info("Disable watcher") + self._DisableWatcher() + logging.info("Stop daemons on merging nodes") + self._StopDaemons() + logging.info("Merging config") + self._FetchRemoteConfig() + self._KillMasterDaemon() + self._MergeConfig() + self.rollback_step += 1 + self._StartMasterDaemon(no_vote=True) + logging.warning("We are at the point of no return. Merge can not easily" + " be undone after this point.") + logging.info("Readd nodes and redistribute config") + # Set it back to -1: Nothing to recover from + self.rollback_step = -1 + self._ReaddMergedNodesAndRedist() + self._KillMasterDaemon() + self._StartMasterDaemon() + logging.info("Starting instances again") + self._StartupAllInstances() + logging.info("Post cluster verification") + self._VerifyCluster() + + def Cleanup(self): + """Clean up our environment. + + This cleans up remote private keys and configs and after that + deletes the temporary directory. + """ + for data in self.ssh_map.values(): + if 'key' in data: + os.unlink(data["key"]) + if 'config' in data: + os.unlink(data["config"]) + os.rmdir(self.work_dir) + + def RollbackDescription(self, step): + """Returns the rollback description for given step. + + @param step: The step to describe + @return: The described step + """ + nodes = flatten([data['nodes'] for _, data in self.ssh_map.iteritems()]) + info = {'clusters': self.clusters, + 'nodes': nodes} + return " * %s" % (cluster_merger.ROLLBACK_STEPS[step] % info) + +def SetupLogging(options): + """Setting up logging infrastructure. + + @param options: Parsed command line options + """ + utils.SetupLogging("/var/log/ganeti/cluster-merge.log", + stderr_logging=True, program="cluster-merge", + debug=options.debug) + + if options.verbose: + # FIXME: Is there a better less hackish way? + root_logger = logging.getLogger("") + for hndl in root_logger.handlers: + if isinstance(hndl, logging.StreamHandler): + if hndl.stream.name == '<stderr>': + break + hndl = None + + if hndl: + hndl.setLevel(logging.INFO) + + +def main(): + """Main routine.""" + program = os.path.basename(sys.argv[0]) + + parser = optparse.OptionParser(usage="%prog [--debug|--verbose] <cluster> ...") + parser.add_option(cli.DEBUG_OPT) + parser.add_option(cli.VERBOSE_OPT) + + (options, args) = parser.parse_args() + + SetupLogging(options) + + if not args: + raise errors.ProgrammerError("No clusters specified.") + + cluster_merger = Merger(args) + try: + try: + cluster_merger.Setup() + cluster_merger.Merge() + except errors.GenericError, e: + logging.critical(e) + if cluster_merger.rollback_step < 0: + logging.info("There is nothing which can/must be rolled back.") + else: + logging.info("In order to rollback do the following:") + for step in xrange(cluster_merger.rollback_step): + logging.info(cluster_merger.RollbackDescription(step)) + + # TODO: Keep track of steps done for a flawless resume? + finally: + cluster_merger.Cleanup() + +if __name__ == "__main__": + main() -- 1.6.6.2
