LGTM. Thanks, Jose
On Fri, Oct 04, 2013 at 02:20:34PM +0200, Klaus Aehlig wrote: > This command will coordinate the switching to a new > Ganeti version across the cluster. This has become > possible by the new layout that allows several Ganeti > versions to be present at the same time. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > lib/client/gnt_cluster.py | 283 > ++++++++++++++++++++++++++++++++++++++++++++++ > 1 file changed, 283 insertions(+) > > diff --git a/lib/client/gnt_cluster.py b/lib/client/gnt_cluster.py > index 8d81519..225cd56 100644 > --- a/lib/client/gnt_cluster.py > +++ b/lib/client/gnt_cluster.py > @@ -43,7 +43,9 @@ from ganeti import objects > from ganeti import uidpool > from ganeti import compat > from ganeti import netutils > +from ganeti import ssconf > from ganeti import pathutils > +from ganeti import qlang > > > ON_OPT = cli_option("--on", default=False, > @@ -64,6 +66,12 @@ FORCE_DISTRIBUTION = cli_option("--yes-do-it", > dest="yes_do_it", > " is drained", > default=False, action="store_true") > > +TO_OPT = cli_option("--to", default=None, type="string", > + help="The Ganeti version to upgrade to") > + > +RESUME_OPT = cli_option("--resume", default=False, action="store_true", > + help="Resume any pending Ganeti upgrades") > + > _EPO_PING_INTERVAL = 30 # 30 seconds between pings > _EPO_PING_TIMEOUT = 1 # 1 second > _EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes > @@ -1617,6 +1625,278 @@ def ShowCreateCommand(opts, args): > ToStdout(_GetCreateCommand(result)) > > > +def _RunCommandAndReport(cmd): > + """Run a command and report its output, iff it failed. > + > + @param cmd: the command to execute > + @type cmd: list > + @rtype: bool > + @return: False, if the execution failed. > + > + """ > + result = utils.RunCmd(cmd) > + if result.failed: > + ToStderr("Command %s failed: %s; Output %s" % > + (cmd, result.fail_reason, result.output)) > + return False > + return True > + > + > +def _VerifyCommand(cmd): > + """Verify that a given command succeeds on all online nodes. > + > + As this function is intended to run during upgrades, it > + is implemented in such a way that it still works, if all Ganeti > + daemons are down. > + > + @param cmd: the command to execute > + @type cmd: list > + @rtype: list > + @return: the list of node names that are online where > + the command failed. > + > + """ > + command = utils.text.ShellQuoteArgs([str(val) for val in cmd]) > + > + nodes = ssconf.SimpleStore().GetOnlineNodeList() > + master_node = ssconf.SimpleStore().GetMasterNode() > + cluster_name = ssconf.SimpleStore().GetClusterName() > + > + # If master node is in 'nodes', make sure master node is at list end > + if master_node in nodes: > + nodes.remove(master_node) > + nodes.append(master_node) > + > + failed = [] > + > + srun = ssh.SshRunner(cluster_name=cluster_name) > + for name in nodes: > + result = srun.Run(name, constants.SSH_LOGIN_USER, command) > + if result.exit_code != 0: > + failed.append(name) > + > + return failed > + > + > +def _VerifyVersionInstalled(versionstring): > + """Verify that the given version of ganeti is installed on all online > nodes. > + > + Do nothing, if this is the case, otherwise print an appropriate > + message to stderr. > + > + @param versionstring: the version to check for > + @type versionstring: string > + @rtype: bool > + @return: True, if the version is installed on all online nodes > + > + """ > + badnodes = _VerifyCommand(["test", "-d", > + os.path.join(pathutils.PKGLIBDIR, > versionstring)]) > + if badnodes: > + ToStderr("Ganeti version %s not installed on nodes %s" > + % (versionstring, ", ".join(badnodes))) > + return False > + > + return True > + > + > +def _GetRunning(): > + """Determine the list of running jobs. > + > + @rtype: list > + @return: the number of jobs still running > + > + """ > + cl = GetClient() > + qfilter = qlang.MakeSimpleFilter("status", > + frozenset([constants.JOB_STATUS_RUNNING])) > + return len(cl.Query(constants.QR_JOB, [], qfilter).data) > + > + > +def _SetGanetiVersion(versionstring): > + """Set the active version of ganeti to the given versionstring > + > + @type versionstring: string > + @rtype: list > + @return: the list of nodes where the version change failed > + > + """ > + failed = [] > + failed.extend(_VerifyCommand( > + ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")])) > + failed.extend(_VerifyCommand( > + ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring), > + os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")])) > + failed.extend(_VerifyCommand( > + ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")])) > + failed.extend(_VerifyCommand( > + ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring), > + os.path.join(pathutils.SYSCONFDIR, "ganeti/share")])) > + return list(set(failed)) > + > + > +def _ExecuteCommands(fns): > + """Execute a list of functions, in reverse order. > + > + @type fns: list of functions. > + @param fns: the functions to be executed. > + > + """ > + for fn in reversed(fns): > + fn() > + > + > +# pylint: disable=R0911 > +def UpgradeGanetiCommand(opts, args): > + """Upgrade a cluster to a new ganeti version. > + > + @param opts: the command line options selected by the user > + @type args: list > + @param args: should be an empty list > + @rtype: int > + @return: the desired exit code > + > + """ > + if ((not opts.resume and opts.to is None) > + or (opts.resume and opts.to is not None)): > + ToStderr("Precisely one of the options --to and --resume" > + " has to be given") > + return 1 > + > + if opts.resume: > + # TODO: implement > + ToStderr("The --resume mode is not yet implemented") > + return 1 > + > + rollback = [] > + > + versionstring = opts.to > + version = utils.version.ParseVersion(versionstring) > + if version is None: > + ToStderr("Could not parse version string %s" % versionstring) > + return 1 > + > + msg = utils.version.UpgradeRange(version) > + if msg is not None: > + ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg)) > + return 1 > + > + downgrade = utils.version.ShouldCfgdowngrade(version) > + > + if not _VerifyVersionInstalled(versionstring): > + return 1 > + > + # TODO: write intent-to-upgrade file > + > + ToStdout("Draining queue") > + client = GetClient() > + client.SetQueueDrainFlag(True) > + > + rollback.append(lambda: GetClient().SetQueueDrainFlag(False)) > + > + if utils.SimpleRetry(0, _GetRunning, > + constants.UPGRADE_QUEUE_POLL_INTERVAL, > + constants.UPGRADE_QUEUE_DRAIN_TIMEOUT): > + ToStderr("Failed to completely empty the queue.") > + _ExecuteCommands(rollback) > + return 1 > + > + ToStdout("Stopping daemons on master node.") > + if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]): > + _ExecuteCommands(rollback) > + return 1 > + > + if not _VerifyVersionInstalled(versionstring): > + utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"]) > + _ExecuteCommands(rollback) > + return 1 > + > + ToStdout("Stopping daemons everywhere.") > + rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, > "start-all"])) > + badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"]) > + if badnodes: > + ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),)) > + _ExecuteCommands(rollback) > + return 1 > + > + backuptar = os.path.join(pathutils.LOCALSTATEDIR, > + "lib/ganeti%d.tar" % time.time()) > + ToStdout("Backing up configuration as %s" % backuptar) > + if not _RunCommandAndReport(["tar", "cf", backuptar, > + pathutils.DATA_DIR]): > + _ExecuteCommands(rollback) > + return 1 > + > + if downgrade: > + ToStdout("Downgrading configuration") > + if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]): > + _ExecuteCommands(rollback) > + return 1 > + > + # Configuration change is the point of no return. From then onwards, it is > + # safer to push through the up/dowgrade than to try to roll it back. > + > + returnvalue = 0 > + > + ToStdout("Switching to version %s on all nodes" % versionstring) > + rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION)) > + badnodes = _SetGanetiVersion(versionstring) > + if badnodes: > + ToStderr("Failed to switch to Ganeti version %s on nodes %s" > + % (versionstring, ", ".join(badnodes))) > + if not downgrade: > + _ExecuteCommands(rollback) > + return 1 > + > + # Now that we have changed to the new version of Ganeti we should > + # not communicate over luxi any more, as luxi might have changed in > + # incompatible ways. Therefore, manually call the corresponding ganeti > + # commands using their canonical (version independent) path. > + > + if not downgrade: > + ToStdout("Upgrading configuration") > + if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]): > + _ExecuteCommands(rollback) > + return 1 > + > + ToStdout("Starting daemons everywhere.") > + badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]) > + if badnodes: > + ToStderr("Warning: failed to start daemons on %s." % (", > ".join(badnodes),)) > + returnvalue = 1 > + > + ToStdout("Ensuring directories everywhere.") > + badnodes = _VerifyCommand([pathutils.ENSURE_DIRS]) > + if badnodes: > + ToStderr("Warning: failed to ensure directories on %s." % > + (", ".join(badnodes))) > + returnvalue = 1 > + > + ToStdout("Redistributing the configuration.") > + if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]): > + returnvalue = 1 > + > + ToStdout("Restarting daemons everywhere.") > + badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"]) > + badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])) > + if badnodes: > + ToStderr("Warning: failed to start daemons on %s." % > + (", ".join(list(set(badnodes))),)) > + returnvalue = 1 > + > + ToStdout("Undraining the queue.") > + if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]): > + returnvalue = 1 > + > + # TODO: write intent-to-upgrade file > + > + ToStdout("Verifying cluster.") > + if not _RunCommandAndReport(["gnt-cluster", "verify"]): > + returnvalue = 1 > + > + return returnvalue > + > + > commands = { > "init": ( > InitCluster, [ArgHost(min=1, max=1)], > @@ -1735,6 +2015,9 @@ commands = { > "show-ispecs-cmd": ( > ShowCreateCommand, ARGS_NONE, [], "", > "Show the command line to re-create the cluster"), > + "upgrade": ( > + UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "", > + "Upgrade (or downgrade) to a new Ganeti version"), > } > > > -- > 1.8.4 > -- Jose Antonio Lopes Ganeti Engineering Google Germany GmbH Dienerstr. 12, 80331, München Registergericht und -nummer: Hamburg, HRB 86891 Sitz der Gesellschaft: Hamburg Geschäftsführer: Graham Law, Christine Elizabeth Flores Steuernummer: 48/725/00206 Umsatzsteueridentifikationsnummer: DE813741370
