As masterd does not exist any more, there is no need the jobs as processes to support all the luxi operations. So remove support for all that are not potentially needed, i.e., all but picking up a job and changing its priority.
Signed-off-by: Klaus Aehlig <[email protected]> --- lib/jqueue/__init__.py | 17 ----- lib/server/masterd.py | 182 +------------------------------------------------ 2 files changed, 3 insertions(+), 196 deletions(-) diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py index 370e957..b381f52 100644 --- a/lib/jqueue/__init__.py +++ b/lib/jqueue/__init__.py @@ -1816,23 +1816,6 @@ class JobQueue(object): self._queue_size = len(self._GetJobIDsUnlocked(sort=False)) @classmethod - def SubmitJob(cls, ops): - """Create and store a new job. - - """ - return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops) - - @classmethod - def SubmitJobToDrainedQueue(cls, ops): - """Forcefully create and store a new job. - - Do so, even if the job queue is drained. - - """ - return luxi.Client(address=pathutils.QUERY_SOCKET)\ - .SubmitJobToDrainedQueue(ops) - - @classmethod def SubmitManyJobs(cls, jobs): """Create and store multiple jobs. diff --git a/lib/server/masterd.py b/lib/server/masterd.py index ee1ca77..b056584 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -40,8 +40,6 @@ import logging from ganeti import config from ganeti import constants from ganeti import daemon -from ganeti import mcpu -from ganeti import opcodes from ganeti import jqueue from ganeti import luxi import ganeti.rpc.errors as rpcerr @@ -50,9 +48,6 @@ from ganeti import errors from ganeti import workerpool import ganeti.rpc.node as rpc import ganeti.rpc.client as rpccl -from ganeti import objects -from ganeti import query -from ganeti import runtime from ganeti import ht @@ -266,15 +261,6 @@ class ClientOps(object): self.server = server @staticmethod - def _SubmitJob(args, queue): - logging.info("Receiving new job") - (job_def, ) = args - ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] - job_id = queue.SubmitJob(ops) - _LogNewJob(True, job_id, ops) - return job_id - - @staticmethod def _PickupJob(args, queue): logging.info("Picking up new job from queue") (job_id, ) = args @@ -282,132 +268,12 @@ class ClientOps(object): return job_id @staticmethod - def _SubmitJobToDrainedQueue(args, queue): - logging.info("Forcefully receiving new job") - (job_def, ) = args - ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] - job_id = queue.SubmitJobToDrainedQueue(ops) - _LogNewJob(True, job_id, ops) - return job_id - - @staticmethod - def _SubmitManyJobs(args, queue): - logging.info("Receiving multiple jobs") - (job_defs, ) = args - jobs = [] - for ops in job_defs: - jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) - job_ids = queue.SubmitManyJobs(jobs) - for ((status, job_id), ops) in zip(job_ids, jobs): - _LogNewJob(status, job_id, ops) - return job_ids - - @staticmethod - def _CancelJob(args, queue): - (job_id, ) = args - logging.info("Received job cancel request for %s", job_id) - return queue.CancelJob(job_id) - - @staticmethod def _ChangeJobPriority(args, queue): (job_id, priority) = args logging.info("Received request to change priority for job %s to %s", job_id, priority) return queue.ChangeJobPriority(job_id, priority) - @staticmethod - def _ArchiveJob(args, queue): - (job_id, ) = args - logging.info("Received job archive request for %s", job_id) - return queue.ArchiveJob(job_id) - - @staticmethod - def _AutoArchiveJobs(args, queue): - (age, timeout) = args - logging.info("Received job autoarchive request for age %s, timeout %s", - age, timeout) - return queue.AutoArchiveJobs(age, timeout) - - @staticmethod - def _WaitForJobChange(args, queue): - (job_id, fields, prev_job_info, prev_log_serial, timeout) = args - logging.info("Received job poll request for %s", job_id) - return queue.WaitForJobChanges(job_id, fields, prev_job_info, - prev_log_serial, timeout) - - def _PerformQuery(self, args, queue): - (what, fields, qfilter) = args - - if what in constants.QR_VIA_OP: - result = self._Query(opcodes.OpQuery(what=what, fields=fields, - qfilter=qfilter)) - elif what == constants.QR_LOCK: - raise errors.OpPrereqError("Lock queries cannot be asked to jobs") - elif what == constants.QR_JOB: - result = queue.QueryJobs(fields, qfilter) - elif what in constants.QR_VIA_LUXI: - luxi_client = runtime.GetClient() - result = luxi_client.Query(what, fields, qfilter).ToDict() - else: - raise errors.OpPrereqError("Resource type '%s' unknown" % what, - errors.ECODE_INVAL) - - return result - - @staticmethod - def _QueryFields(args): - (what, fields) = args - req = objects.QueryFieldsRequest(what=what, fields=fields) - - try: - fielddefs = query.ALL_FIELDS[req.what] - except KeyError: - raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, - errors.ECODE_INVAL) - - return query.QueryFields(fielddefs, req.fields) - - @staticmethod - def _QueryJobs(args, queue): - (job_ids, fields) = args - if isinstance(job_ids, (tuple, list)) and job_ids: - msg = utils.CommaJoin(job_ids) - else: - msg = str(job_ids) - logging.info("Received job query request for %s", msg) - return queue.OldStyleQueryJobs(job_ids, fields) - - def _QueryConfigValues(self, args): - (fields, ) = args - logging.info("Received config values query request for %s", fields) - op = opcodes.OpClusterConfigQuery(output_fields=fields) - return self._Query(op) - - def _QueryClusterInfo(self): - logging.info("Received cluster info query request") - op = opcodes.OpClusterQuery() - return self._Query(op) - - def _QueryTags(self, args): - (kind, name) = args - logging.info("Received tags query request") - op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False) - return self._Query(op) - - @staticmethod - def _SetDrainFlag(args, queue): - (drain_flag, ) = args - logging.info("Received queue drain flag change request to %s", - drain_flag) - return queue.SetDrainFlag(drain_flag) - - @staticmethod - def _SetWatcherPause(args, context): - (until, ) = args - # FIXME! - ec_id = None - return _SetWatcherPause(context, ec_id, until) - def handle_request(self, method, args): # pylint: disable=R0911 context = self.server.context queue = context.jobqueue @@ -422,58 +288,16 @@ class ClientOps(object): raise ValueError("Invalid operation '%s'" % method) job_id = None - if method == luxi.REQ_SUBMIT_JOB: - job_id = self._SubmitJob(args, queue) - elif method == luxi.REQ_PICKUP_JOB: + if method == luxi.REQ_PICKUP_JOB: job_id = self._PickupJob(args, queue) - elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE: - job_id = self._SubmitJobToDrainedQueue(args, queue) - elif method == luxi.REQ_SUBMIT_MANY_JOBS: - job_id = self._SubmitManyJobs(args, queue) - elif method == luxi.REQ_CANCEL_JOB: - job_id = self._CancelJob(args, queue) elif method == luxi.REQ_CHANGE_JOB_PRIORITY: job_id = self._ChangeJobPriority(args, queue) - elif method == luxi.REQ_ARCHIVE_JOB: - job_id = self._ArchiveJob(args, queue) - elif method == luxi.REQ_AUTO_ARCHIVE_JOBS: - job_id = self._AutoArchiveJobs(args, queue) - elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: - job_id = self._WaitForJobChange(args, queue) - elif method == luxi.REQ_QUERY: - job_id = self._PerformQuery(args, queue) - elif method == luxi.REQ_QUERY_FIELDS: - job_id = self._QueryFields(args) - elif method == luxi.REQ_QUERY_JOBS: - job_id = self._QueryJobs(args, queue) - elif method == luxi.REQ_QUERY_CONFIG_VALUES: - job_id = self._QueryConfigValues(args) - elif method == luxi.REQ_QUERY_CLUSTER_INFO: - job_id = self._QueryClusterInfo() - elif method == luxi.REQ_QUERY_TAGS: - job_id = self._QueryTags(args) - elif method == luxi.REQ_SET_DRAIN_FLAG: - job_id = self._SetDrainFlag(args, queue) - elif method == luxi.REQ_SET_WATCHER_PAUSE: - job_id = self._SetWatcherPause(args, context) else: - logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method) - raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL," - " but not implemented" % method) + logging.info("Request '%s' not supported by masterd", method) + raise ValueError("Unsupported operation '%s'" % method) return job_id - def _Query(self, op): - """Runs the specified opcode and returns the result. - - """ - # Queries don't have a job id - proc = mcpu.Processor(self.server.context, None, enable_locks=False) - - # TODO: Executing an opcode using locks will acquire them in blocking mode. - # Consider using a timeout for retries. - return proc.ExecOpCode(op, None) - class GanetiContext(object): """Context common to all ganeti threads. -- 2.0.0.526.g5318336
