LGTM, thanks +1 for patches are "Remove ..." +1 removing 897 lines while adding just 3
On Tue, Jun 24, 2014 at 1:55 PM, 'Klaus Aehlig' via ganeti-devel < [email protected]> wrote: > As masterd does not exist any more, there is no need > the jobs as processes to support all the luxi operations. > So remove support for all that are not potentially needed, > i.e., all but picking up a job and changing its priority. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > lib/jqueue/__init__.py | 17 ----- > lib/server/masterd.py | 182 > +------------------------------------------------ > 2 files changed, 3 insertions(+), 196 deletions(-) > > diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py > index 370e957..b381f52 100644 > --- a/lib/jqueue/__init__.py > +++ b/lib/jqueue/__init__.py > @@ -1816,23 +1816,6 @@ class JobQueue(object): > self._queue_size = len(self._GetJobIDsUnlocked(sort=False)) > > @classmethod > - def SubmitJob(cls, ops): > - """Create and store a new job. > - > - """ > - return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops) > - > - @classmethod > - def SubmitJobToDrainedQueue(cls, ops): > - """Forcefully create and store a new job. > - > - Do so, even if the job queue is drained. > - > - """ > - return luxi.Client(address=pathutils.QUERY_SOCKET)\ > - .SubmitJobToDrainedQueue(ops) > - > - @classmethod > def SubmitManyJobs(cls, jobs): > """Create and store multiple jobs. > > diff --git a/lib/server/masterd.py b/lib/server/masterd.py > index ee1ca77..b056584 100644 > --- a/lib/server/masterd.py > +++ b/lib/server/masterd.py > @@ -40,8 +40,6 @@ import logging > from ganeti import config > from ganeti import constants > from ganeti import daemon > -from ganeti import mcpu > -from ganeti import opcodes > from ganeti import jqueue > from ganeti import luxi > import ganeti.rpc.errors as rpcerr > @@ -50,9 +48,6 @@ from ganeti import errors > from ganeti import workerpool > import ganeti.rpc.node as rpc > import ganeti.rpc.client as rpccl > -from ganeti import objects > -from ganeti import query > -from ganeti import runtime > from ganeti import ht > > > @@ -266,15 +261,6 @@ class ClientOps(object): > self.server = server > > @staticmethod > - def _SubmitJob(args, queue): > - logging.info("Receiving new job") > - (job_def, ) = args > - ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] > - job_id = queue.SubmitJob(ops) > - _LogNewJob(True, job_id, ops) > - return job_id > - > - @staticmethod > def _PickupJob(args, queue): > logging.info("Picking up new job from queue") > (job_id, ) = args > @@ -282,132 +268,12 @@ class ClientOps(object): > return job_id > > @staticmethod > - def _SubmitJobToDrainedQueue(args, queue): > - logging.info("Forcefully receiving new job") > - (job_def, ) = args > - ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] > - job_id = queue.SubmitJobToDrainedQueue(ops) > - _LogNewJob(True, job_id, ops) > - return job_id > - > - @staticmethod > - def _SubmitManyJobs(args, queue): > - logging.info("Receiving multiple jobs") > - (job_defs, ) = args > - jobs = [] > - for ops in job_defs: > - jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) > - job_ids = queue.SubmitManyJobs(jobs) > - for ((status, job_id), ops) in zip(job_ids, jobs): > - _LogNewJob(status, job_id, ops) > - return job_ids > - > - @staticmethod > - def _CancelJob(args, queue): > - (job_id, ) = args > - logging.info("Received job cancel request for %s", job_id) > - return queue.CancelJob(job_id) > - > - @staticmethod > def _ChangeJobPriority(args, queue): > (job_id, priority) = args > logging.info("Received request to change priority for job %s to %s", > job_id, priority) > return queue.ChangeJobPriority(job_id, priority) > > - @staticmethod > - def _ArchiveJob(args, queue): > - (job_id, ) = args > - logging.info("Received job archive request for %s", job_id) > - return queue.ArchiveJob(job_id) > - > - @staticmethod > - def _AutoArchiveJobs(args, queue): > - (age, timeout) = args > - logging.info("Received job autoarchive request for age %s, timeout > %s", > - age, timeout) > - return queue.AutoArchiveJobs(age, timeout) > - > - @staticmethod > - def _WaitForJobChange(args, queue): > - (job_id, fields, prev_job_info, prev_log_serial, timeout) = args > - logging.info("Received job poll request for %s", job_id) > - return queue.WaitForJobChanges(job_id, fields, prev_job_info, > - prev_log_serial, timeout) > - > - def _PerformQuery(self, args, queue): > - (what, fields, qfilter) = args > - > - if what in constants.QR_VIA_OP: > - result = self._Query(opcodes.OpQuery(what=what, fields=fields, > - qfilter=qfilter)) > - elif what == constants.QR_LOCK: > - raise errors.OpPrereqError("Lock queries cannot be asked to jobs") > - elif what == constants.QR_JOB: > - result = queue.QueryJobs(fields, qfilter) > - elif what in constants.QR_VIA_LUXI: > - luxi_client = runtime.GetClient() > - result = luxi_client.Query(what, fields, qfilter).ToDict() > - else: > - raise errors.OpPrereqError("Resource type '%s' unknown" % what, > - errors.ECODE_INVAL) > - > - return result > - > - @staticmethod > - def _QueryFields(args): > - (what, fields) = args > - req = objects.QueryFieldsRequest(what=what, fields=fields) > - > - try: > - fielddefs = query.ALL_FIELDS[req.what] > - except KeyError: > - raise errors.OpPrereqError("Resource type '%s' unknown" % req.what, > - errors.ECODE_INVAL) > - > - return query.QueryFields(fielddefs, req.fields) > - > - @staticmethod > - def _QueryJobs(args, queue): > - (job_ids, fields) = args > - if isinstance(job_ids, (tuple, list)) and job_ids: > - msg = utils.CommaJoin(job_ids) > - else: > - msg = str(job_ids) > - logging.info("Received job query request for %s", msg) > - return queue.OldStyleQueryJobs(job_ids, fields) > - > - def _QueryConfigValues(self, args): > - (fields, ) = args > - logging.info("Received config values query request for %s", fields) > - op = opcodes.OpClusterConfigQuery(output_fields=fields) > - return self._Query(op) > - > - def _QueryClusterInfo(self): > - logging.info("Received cluster info query request") > - op = opcodes.OpClusterQuery() > - return self._Query(op) > - > - def _QueryTags(self, args): > - (kind, name) = args > - logging.info("Received tags query request") > - op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False) > - return self._Query(op) > - > - @staticmethod > - def _SetDrainFlag(args, queue): > - (drain_flag, ) = args > - logging.info("Received queue drain flag change request to %s", > - drain_flag) > - return queue.SetDrainFlag(drain_flag) > - > - @staticmethod > - def _SetWatcherPause(args, context): > - (until, ) = args > - # FIXME! > - ec_id = None > - return _SetWatcherPause(context, ec_id, until) > - > def handle_request(self, method, args): # pylint: disable=R0911 > context = self.server.context > queue = context.jobqueue > @@ -422,58 +288,16 @@ class ClientOps(object): > raise ValueError("Invalid operation '%s'" % method) > > job_id = None > - if method == luxi.REQ_SUBMIT_JOB: > - job_id = self._SubmitJob(args, queue) > - elif method == luxi.REQ_PICKUP_JOB: > + if method == luxi.REQ_PICKUP_JOB: > job_id = self._PickupJob(args, queue) > - elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE: > - job_id = self._SubmitJobToDrainedQueue(args, queue) > - elif method == luxi.REQ_SUBMIT_MANY_JOBS: > - job_id = self._SubmitManyJobs(args, queue) > - elif method == luxi.REQ_CANCEL_JOB: > - job_id = self._CancelJob(args, queue) > elif method == luxi.REQ_CHANGE_JOB_PRIORITY: > job_id = self._ChangeJobPriority(args, queue) > - elif method == luxi.REQ_ARCHIVE_JOB: > - job_id = self._ArchiveJob(args, queue) > - elif method == luxi.REQ_AUTO_ARCHIVE_JOBS: > - job_id = self._AutoArchiveJobs(args, queue) > - elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE: > - job_id = self._WaitForJobChange(args, queue) > - elif method == luxi.REQ_QUERY: > - job_id = self._PerformQuery(args, queue) > - elif method == luxi.REQ_QUERY_FIELDS: > - job_id = self._QueryFields(args) > - elif method == luxi.REQ_QUERY_JOBS: > - job_id = self._QueryJobs(args, queue) > - elif method == luxi.REQ_QUERY_CONFIG_VALUES: > - job_id = self._QueryConfigValues(args) > - elif method == luxi.REQ_QUERY_CLUSTER_INFO: > - job_id = self._QueryClusterInfo() > - elif method == luxi.REQ_QUERY_TAGS: > - job_id = self._QueryTags(args) > - elif method == luxi.REQ_SET_DRAIN_FLAG: > - job_id = self._SetDrainFlag(args, queue) > - elif method == luxi.REQ_SET_WATCHER_PAUSE: > - job_id = self._SetWatcherPause(args, context) > else: > - logging.critical("Request '%s' in luxi.REQ_ALL, but not known", > method) > - raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL," > - " but not implemented" % method) > + logging.info("Request '%s' not supported by masterd", method) > + raise ValueError("Unsupported operation '%s'" % method) > > return job_id > > - def _Query(self, op): > - """Runs the specified opcode and returns the result. > - > - """ > - # Queries don't have a job id > - proc = mcpu.Processor(self.server.context, None, enable_locks=False) > - > - # TODO: Executing an opcode using locks will acquire them in blocking > mode. > - # Consider using a timeout for retries. > - return proc.ExecOpCode(op, None) > - > > class GanetiContext(object): > """Context common to all ganeti threads. > -- > 2.0.0.526.g5318336 > >
