... and by that fixing a TODO as well.

Signed-off-by: Helga Velroyen <[email protected]>
---
 lib/server/masterd.py | 284 +++++++++++++++++++++++++++++---------------------
 1 file changed, 166 insertions(+), 118 deletions(-)

diff --git a/lib/server/masterd.py b/lib/server/masterd.py
index adfb914..ffd7deb 100644
--- a/lib/server/masterd.py
+++ b/lib/server/masterd.py
@@ -266,6 +266,152 @@ class ClientOps(object):
   def __init__(self, server):
     self.server = server
 
+  @staticmethod
+  def _SubmitJob(args, queue):
+    logging.info("Receiving new job")
+    (job_def, ) = args
+    ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
+    job_id = queue.SubmitJob(ops)
+    _LogNewJob(True, job_id, ops)
+    return job_id
+
+  @staticmethod
+  def _PickupJob(args, queue):
+    logging.info("Picking up new job from queue")
+    (job_id, ) = args
+    queue.PickupJob(job_id)
+    return job_id
+
+  @staticmethod
+  def _SubmitJobToDrainedQueue(args, queue):
+    logging.info("Forcefully receiving new job")
+    (job_def, ) = args
+    ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
+    job_id = queue.SubmitJobToDrainedQueue(ops)
+    _LogNewJob(True, job_id, ops)
+    return job_id
+
+  @staticmethod
+  def _SubmitManyJobs(args, queue):
+    logging.info("Receiving multiple jobs")
+    (job_defs, ) = args
+    jobs = []
+    for ops in job_defs:
+      jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
+    job_ids = queue.SubmitManyJobs(jobs)
+    for ((status, job_id), ops) in zip(job_ids, jobs):
+      _LogNewJob(status, job_id, ops)
+    return job_ids
+
+  @staticmethod
+  def _CancelJob(args, queue):
+    (job_id, ) = args
+    logging.info("Received job cancel request for %s", job_id)
+    return queue.CancelJob(job_id)
+
+  @staticmethod
+  def _ChangeJobPriority(args, queue):
+    (job_id, priority) = args
+    logging.info("Received request to change priority for job %s to %s",
+                 job_id, priority)
+    return queue.ChangeJobPriority(job_id, priority)
+
+  @staticmethod
+  def _ArchiveJob(args, queue):
+    (job_id, ) = args
+    logging.info("Received job archive request for %s", job_id)
+    return queue.ArchiveJob(job_id)
+
+  @staticmethod
+  def _AutoArchiveJobs(args, queue):
+    (age, timeout) = args
+    logging.info("Received job autoarchive request for age %s, timeout %s",
+                 age, timeout)
+    return queue.AutoArchiveJobs(age, timeout)
+
+  @staticmethod
+  def _WaitForJobChange(args, queue):
+    (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
+    logging.info("Received job poll request for %s", job_id)
+    return queue.WaitForJobChanges(job_id, fields, prev_job_info,
+                                   prev_log_serial, timeout)
+
+  def _PerformQuery(self, args, queue):
+    (what, fields, qfilter) = args
+
+    if what in constants.QR_VIA_OP:
+      result = self._Query(opcodes.OpQuery(what=what, fields=fields,
+                                           qfilter=qfilter))
+    elif what == constants.QR_LOCK:
+      if qfilter is not None:
+        raise errors.OpPrereqError("Lock queries can't be filtered",
+                                   errors.ECODE_INVAL)
+      return self.server.context.glm.QueryLocks(fields)
+    elif what == constants.QR_JOB:
+      return queue.QueryJobs(fields, qfilter)
+    elif what in constants.QR_VIA_LUXI:
+      luxi_client = runtime.GetClient()
+      result = luxi_client.Query(what, fields, qfilter).ToDict()
+    else:
+      raise errors.OpPrereqError("Resource type '%s' unknown" % what,
+                                 errors.ECODE_INVAL)
+
+    return result
+
+  @staticmethod
+  def _QueryFields(args):
+    (what, fields) = args
+    req = objects.QueryFieldsRequest(what=what, fields=fields)
+
+    try:
+      fielddefs = query.ALL_FIELDS[req.what]
+    except KeyError:
+      raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
+                                 errors.ECODE_INVAL)
+
+    return query.QueryFields(fielddefs, req.fields)
+
+  @staticmethod
+  def _QueryJobs(args, queue):
+    (job_ids, fields) = args
+    if isinstance(job_ids, (tuple, list)) and job_ids:
+      msg = utils.CommaJoin(job_ids)
+    else:
+      msg = str(job_ids)
+    logging.info("Received job query request for %s", msg)
+    return queue.OldStyleQueryJobs(job_ids, fields)
+
+  def _QueryConfigValues(self, args):
+    (fields, ) = args
+    logging.info("Received config values query request for %s", fields)
+    op = opcodes.OpClusterConfigQuery(output_fields=fields)
+    return self._Query(op)
+
+  def _QueryClusterInfo(self):
+    logging.info("Received cluster info query request")
+    op = opcodes.OpClusterQuery()
+    return self._Query(op)
+
+  def _QueryTags(self, args):
+    (kind, name) = args
+    logging.info("Received tags query request")
+    op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
+    return self._Query(op)
+
+  @staticmethod
+  def _SetDrainFlag(args, queue):
+    (drain_flag, ) = args
+    logging.info("Received queue drain flag change request to %s",
+                 drain_flag)
+    return queue.SetDrainFlag(drain_flag)
+
+  @staticmethod
+  def _SetWatcherPause(args, context):
+    (until, ) = args
+    # FIXME!
+    ec_id = None
+    return _SetWatcherPause(context, ec_id, until)
+
   def handle_request(self, method, args): # pylint: disable=R0911
     context = self.server.context
     queue = context.jobqueue
@@ -279,146 +425,48 @@ class ClientOps(object):
       logging.info("Received invalid request '%s'", method)
       raise ValueError("Invalid operation '%s'" % method)
 
-    # TODO: Rewrite to not exit in each 'if/elif' branch
-
+    job_id = None
     if method == luxi.REQ_SUBMIT_JOB:
-      logging.info("Receiving new job")
-      (job_def, ) = args
-      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
-      job_id = queue.SubmitJob(ops)
-      _LogNewJob(True, job_id, ops)
-      return job_id
-
+      job_id = self._SubmitJob(args, queue)
     elif method == luxi.REQ_PICKUP_JOB:
-      logging.info("Picking up new job from queue")
-      (job_id, ) = args
-      queue.PickupJob(job_id)
-
+      job_id = self._PickupJob(args, queue)
     elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
-      logging.info("Forcefully receiving new job")
-      (job_def, ) = args
-      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
-      job_id = queue.SubmitJobToDrainedQueue(ops)
-      _LogNewJob(True, job_id, ops)
-      return job_id
-
+      job_id = self._SubmitJobToDrainedQueue(args, queue)
     elif method == luxi.REQ_SUBMIT_MANY_JOBS:
-      logging.info("Receiving multiple jobs")
-      (job_defs, ) = args
-      jobs = []
-      for ops in job_defs:
-        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
-      job_ids = queue.SubmitManyJobs(jobs)
-      for ((status, job_id), ops) in zip(job_ids, jobs):
-        _LogNewJob(status, job_id, ops)
-      return job_ids
-
+      job_id = self._SubmitManyJobs(args, queue)
     elif method == luxi.REQ_CANCEL_JOB:
-      (job_id, ) = args
-      logging.info("Received job cancel request for %s", job_id)
-      return queue.CancelJob(job_id)
-
+      job_id = self._CancelJob(args, queue)
     elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
-      (job_id, priority) = args
-      logging.info("Received request to change priority for job %s to %s",
-                   job_id, priority)
-      return queue.ChangeJobPriority(job_id, priority)
-
+      job_id = self._ChangeJobPriority(args, queue)
     elif method == luxi.REQ_ARCHIVE_JOB:
-      (job_id, ) = args
-      logging.info("Received job archive request for %s", job_id)
-      return queue.ArchiveJob(job_id)
-
+      job_id = self._ArchiveJob(args, queue)
     elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
-      (age, timeout) = args
-      logging.info("Received job autoarchive request for age %s, timeout %s",
-                   age, timeout)
-      return queue.AutoArchiveJobs(age, timeout)
-
+      job_id = self._AutoArchiveJobs(args, queue)
     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
-      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
-      logging.info("Received job poll request for %s", job_id)
-      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
-                                     prev_log_serial, timeout)
-
+      job_id = self._WaitForJobChange(args, queue)
     elif method == luxi.REQ_QUERY:
-      (what, fields, qfilter) = args
-
-      if what in constants.QR_VIA_OP:
-        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
-                                             qfilter=qfilter))
-      elif what == constants.QR_LOCK:
-        if qfilter is not None:
-          raise errors.OpPrereqError("Lock queries can't be filtered",
-                                     errors.ECODE_INVAL)
-        return context.glm.QueryLocks(fields)
-      elif what == constants.QR_JOB:
-        return queue.QueryJobs(fields, qfilter)
-      elif what in constants.QR_VIA_LUXI:
-        luxi_client = runtime.GetClient()
-        result = luxi_client.Query(what, fields, qfilter).ToDict()
-      else:
-        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
-                                   errors.ECODE_INVAL)
-
-      return result
-
+      job_id = self._PerformQuery(args, queue)
     elif method == luxi.REQ_QUERY_FIELDS:
-      (what, fields) = args
-      req = objects.QueryFieldsRequest(what=what, fields=fields)
-
-      try:
-        fielddefs = query.ALL_FIELDS[req.what]
-      except KeyError:
-        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
-                                   errors.ECODE_INVAL)
-
-      return query.QueryFields(fielddefs, req.fields)
-
+      job_id = self._QueryFields(args)
     elif method == luxi.REQ_QUERY_JOBS:
-      (job_ids, fields) = args
-      if isinstance(job_ids, (tuple, list)) and job_ids:
-        msg = utils.CommaJoin(job_ids)
-      else:
-        msg = str(job_ids)
-      logging.info("Received job query request for %s", msg)
-      return queue.OldStyleQueryJobs(job_ids, fields)
-
+      job_id = self._QueryJobs(args, queue)
     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
-      (fields, ) = args
-      logging.info("Received config values query request for %s", fields)
-      op = opcodes.OpClusterConfigQuery(output_fields=fields)
-      return self._Query(op)
-
+      job_id = self._QueryConfigValues(args)
     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
-      logging.info("Received cluster info query request")
-      op = opcodes.OpClusterQuery()
-      return self._Query(op)
-
+      job_id = self._QueryClusterInfo()
     elif method == luxi.REQ_QUERY_TAGS:
-      (kind, name) = args
-      logging.info("Received tags query request")
-      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
-      return self._Query(op)
-
+      job_id = self._QueryTags(args)
     elif method == luxi.REQ_SET_DRAIN_FLAG:
-      (drain_flag, ) = args
-      logging.info("Received queue drain flag change request to %s",
-                   drain_flag)
-      return queue.SetDrainFlag(drain_flag)
-
+      job_id = self._SetDrainFlag(args, queue)
     elif method == luxi.REQ_SET_WATCHER_PAUSE:
-      (until, ) = args
-
-      # FIXME!
-      ec_id = None
-      return _SetWatcherPause(context, ec_id, until)
-
+      job_id = self._SetWatcherPause(args, context)
     else:
       logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
       raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
                                    " but not implemented" % method)
 
+    return job_id
+
   def _Query(self, op):
     """Runs the specified opcode and returns the result.
 
-- 
1.9.1.423.g4596e3a

Reply via email to