With masterd no longer in use, quite a few of its classes are not needed any more.
Signed-off-by: Klaus Aehlig <[email protected]> --- lib/server/masterd.py | 170 -------------------------------------------------- 1 file changed, 170 deletions(-) diff --git a/lib/server/masterd.py b/lib/server/masterd.py index cff55b7..bc39353 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -38,18 +38,13 @@ inheritance from parent classes requires it. # pylint: disable=C0103 # C0103: Invalid name ganeti-masterd -import time import logging from ganeti import config from ganeti import constants -from ganeti import daemon from ganeti import jqueue -from ganeti import luxi from ganeti import utils -from ganeti import errors import ganeti.rpc.node as rpc -from ganeti import ht CLIENT_REQUEST_WORKERS = 16 @@ -58,137 +53,6 @@ EXIT_NOTMASTER = constants.EXIT_NOTMASTER EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR -def _LogNewJob(status, info, ops): - """Log information about a recently submitted job. - - """ - op_summary = utils.CommaJoin(op.Summary() for op in ops) - - if status: - logging.info("New job with id %s, summary: %s", info, op_summary) - else: - logging.info("Failed to submit job, reason: '%s', summary: %s", - info, op_summary) - - -class MasterClientHandler(daemon.AsyncTerminatedMessageStream): - """Handler for master peers. - - """ - _MAX_UNHANDLED = 1 - - def __init__(self, server, connected_socket, client_address, family): - daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, - client_address, - constants.LUXI_EOM, - family, self._MAX_UNHANDLED) - self.server = server - - def handle_message(self, message, _): - self.server.request_workers.AddTask((self.server, message, self)) - - -class _MasterShutdownCheck(object): - """Logic for master daemon shutdown. - - """ - #: How long to wait between checks - _CHECK_INTERVAL = 5.0 - - #: How long to wait after all jobs are done (e.g. to give clients time to - #: retrieve the job status) - _SHUTDOWN_LINGER = 5.0 - - def __init__(self): - """Initializes this class. - - """ - self._had_active_jobs = None - self._linger_timeout = None - - def __call__(self, jq_prepare_result): - """Determines if master daemon is ready for shutdown. - - @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown} - @rtype: None or number - @return: None if master daemon is ready, timeout if the check must be - repeated - - """ - if jq_prepare_result: - # Check again shortly - logging.info("Job queue has been notified for shutdown but is still" - " busy; next check in %s seconds", self._CHECK_INTERVAL) - self._had_active_jobs = True - return self._CHECK_INTERVAL - - if not self._had_active_jobs: - # Can shut down as there were no active jobs on the first check - return None - - # No jobs are running anymore, but maybe some clients want to collect some - # information. Give them a short amount of time. - if self._linger_timeout is None: - self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True) - - remaining = self._linger_timeout.Remaining() - - logging.info("Job queue no longer busy; shutting down master daemon" - " in %s seconds", remaining) - - # TODO: Should the master daemon socket be closed at this point? Doing so - # wouldn't affect existing connections. - - if remaining < 0: - return None - else: - return remaining - - -class ClientOps(object): - """Class holding high-level client operations.""" - def __init__(self, server): - self.server = server - - @staticmethod - def _PickupJob(args, queue): - logging.info("Picking up new job from queue") - (job_id, ) = args - queue.PickupJob(job_id) - return job_id - - @staticmethod - def _ChangeJobPriority(args, queue): - (job_id, priority) = args - logging.info("Received request to change priority for job %s to %s", - job_id, priority) - return queue.ChangeJobPriority(job_id, priority) - - def handle_request(self, method, args): # pylint: disable=R0911 - context = self.server.context - queue = context.jobqueue - - # TODO: Parameter validation - if not isinstance(args, (tuple, list)): - logging.info("Received invalid arguments of type '%s'", type(args)) - raise ValueError("Invalid arguments type '%s'" % type(args)) - - if method not in luxi.REQ_ALL: - logging.info("Received invalid request '%s'", method) - raise ValueError("Invalid operation '%s'" % method) - - job_id = None - if method == luxi.REQ_PICKUP_JOB: - job_id = self._PickupJob(args, queue) - elif method == luxi.REQ_CHANGE_JOB_PRIORITY: - job_id = self._ChangeJobPriority(args, queue) - else: - logging.info("Request '%s' not supported by masterd", method) - raise ValueError("Unsupported operation '%s'" % method) - - return job_id - - class GanetiContext(object): """Context common to all ganeti threads. @@ -266,37 +130,3 @@ class GanetiContext(object): # Notify job queue self.jobqueue.RemoveNode(node.name) - - -def _SetWatcherPause(context, ec_id, until): - """Creates or removes the watcher pause file. - - @type context: L{GanetiContext} - @param context: Global Ganeti context - @type until: None or int - @param until: Unix timestamp saying until when the watcher shouldn't run - - """ - node_names = context.GetConfig(ec_id).GetNodeList() - - if until is None: - logging.info("Received request to no longer pause watcher") - else: - if not ht.TNumber(until): - raise TypeError("Duration must be numeric") - - if until < time.time(): - raise errors.GenericError("Unable to set pause end time in the past") - - logging.info("Received request to pause watcher until %s", until) - - result = context.rpc.call_set_watcher_pause(node_names, until) - - errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg) - for (node_name, nres) in result.items() - if nres.fail_msg and not nres.offline) - if errmsg: - raise errors.OpExecError("Watcher pause was set where possible, but failed" - " on the following node(s): %s" % errmsg) - - return until -- 2.2.0.rc0.207.ga3a616c
