On May 23 15:21, 'Helga Velroyen' via ganeti-devel wrote:
> ... and by that fixing a TODO as well.
> 
> Signed-off-by: Helga Velroyen <[email protected]>
> ---
>  lib/server/masterd.py | 284 
> +++++++++++++++++++++++++++++---------------------
>  1 file changed, 166 insertions(+), 118 deletions(-)
> 
> diff --git a/lib/server/masterd.py b/lib/server/masterd.py
> index adfb914..ffd7deb 100644
> --- a/lib/server/masterd.py
> +++ b/lib/server/masterd.py
> @@ -266,6 +266,152 @@ class ClientOps(object):
>    def __init__(self, server):
>      self.server = server
>  
> +  @staticmethod
> +  def _SubmitJob(args, queue):
> +    logging.info("Receiving new job")
> +    (job_def, ) = args
> +    ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
> +    job_id = queue.SubmitJob(ops)
> +    _LogNewJob(True, job_id, ops)
> +    return job_id
> +
> +  @staticmethod
> +  def _PickupJob(args, queue):
> +    logging.info("Picking up new job from queue")
> +    (job_id, ) = args
> +    queue.PickupJob(job_id)
> +    return job_id
> +
> +  @staticmethod
> +  def _SubmitJobToDrainedQueue(args, queue):
> +    logging.info("Forcefully receiving new job")
> +    (job_def, ) = args
> +    ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
> +    job_id = queue.SubmitJobToDrainedQueue(ops)
> +    _LogNewJob(True, job_id, ops)
> +    return job_id
> +
> +  @staticmethod
> +  def _SubmitManyJobs(args, queue):
> +    logging.info("Receiving multiple jobs")
> +    (job_defs, ) = args
> +    jobs = []
> +    for ops in job_defs:
> +      jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
> +    job_ids = queue.SubmitManyJobs(jobs)
> +    for ((status, job_id), ops) in zip(job_ids, jobs):
> +      _LogNewJob(status, job_id, ops)
> +    return job_ids
> +
> +  @staticmethod
> +  def _CancelJob(args, queue):
> +    (job_id, ) = args
> +    logging.info("Received job cancel request for %s", job_id)
> +    return queue.CancelJob(job_id)
> +
> +  @staticmethod
> +  def _ChangeJobPriority(args, queue):
> +    (job_id, priority) = args
> +    logging.info("Received request to change priority for job %s to %s",
> +                 job_id, priority)
> +    return queue.ChangeJobPriority(job_id, priority)
> +
> +  @staticmethod
> +  def _ArchiveJob(args, queue):
> +    (job_id, ) = args
> +    logging.info("Received job archive request for %s", job_id)
> +    return queue.ArchiveJob(job_id)
> +
> +  @staticmethod
> +  def _AutoArchiveJobs(args, queue):
> +    (age, timeout) = args
> +    logging.info("Received job autoarchive request for age %s, timeout %s",
> +                 age, timeout)
> +    return queue.AutoArchiveJobs(age, timeout)
> +
> +  @staticmethod
> +  def _WaitForJobChange(args, queue):
> +    (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
> +    logging.info("Received job poll request for %s", job_id)
> +    return queue.WaitForJobChanges(job_id, fields, prev_job_info,
> +                                   prev_log_serial, timeout)
> +
> +  def _PerformQuery(self, args, queue):
> +    (what, fields, qfilter) = args
> +
> +    if what in constants.QR_VIA_OP:
> +      result = self._Query(opcodes.OpQuery(what=what, fields=fields,
> +                                           qfilter=qfilter))
> +    elif what == constants.QR_LOCK:
> +      if qfilter is not None:
> +        raise errors.OpPrereqError("Lock queries can't be filtered",
> +                                   errors.ECODE_INVAL)
> +      return self.server.context.glm.QueryLocks(fields)

Instead of return:

  result = ...

> +    elif what == constants.QR_JOB:
> +      return queue.QueryJobs(fields, qfilter)

Instead of return:

  result = ...

Rest LGTM. No need to resend.

Thanks,
Jose

> +    elif what in constants.QR_VIA_LUXI:
> +      luxi_client = runtime.GetClient()
> +      result = luxi_client.Query(what, fields, qfilter).ToDict()
> +    else:
> +      raise errors.OpPrereqError("Resource type '%s' unknown" % what,
> +                                 errors.ECODE_INVAL)
> +
> +    return result
> +
> +  @staticmethod
> +  def _QueryFields(args):
> +    (what, fields) = args
> +    req = objects.QueryFieldsRequest(what=what, fields=fields)
> +
> +    try:
> +      fielddefs = query.ALL_FIELDS[req.what]
> +    except KeyError:
> +      raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
> +                                 errors.ECODE_INVAL)
> +
> +    return query.QueryFields(fielddefs, req.fields)
> +
> +  @staticmethod
> +  def _QueryJobs(args, queue):
> +    (job_ids, fields) = args
> +    if isinstance(job_ids, (tuple, list)) and job_ids:
> +      msg = utils.CommaJoin(job_ids)
> +    else:
> +      msg = str(job_ids)
> +    logging.info("Received job query request for %s", msg)
> +    return queue.OldStyleQueryJobs(job_ids, fields)
> +
> +  def _QueryConfigValues(self, args):
> +    (fields, ) = args
> +    logging.info("Received config values query request for %s", fields)
> +    op = opcodes.OpClusterConfigQuery(output_fields=fields)
> +    return self._Query(op)
> +
> +  def _QueryClusterInfo(self):
> +    logging.info("Received cluster info query request")
> +    op = opcodes.OpClusterQuery()
> +    return self._Query(op)
> +
> +  def _QueryTags(self, args):
> +    (kind, name) = args
> +    logging.info("Received tags query request")
> +    op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
> +    return self._Query(op)
> +
> +  @staticmethod
> +  def _SetDrainFlag(args, queue):
> +    (drain_flag, ) = args
> +    logging.info("Received queue drain flag change request to %s",
> +                 drain_flag)
> +    return queue.SetDrainFlag(drain_flag)
> +
> +  @staticmethod
> +  def _SetWatcherPause(args, context):
> +    (until, ) = args
> +    # FIXME!
> +    ec_id = None
> +    return _SetWatcherPause(context, ec_id, until)
> +
>    def handle_request(self, method, args): # pylint: disable=R0911
>      context = self.server.context
>      queue = context.jobqueue
> @@ -279,146 +425,48 @@ class ClientOps(object):
>        logging.info("Received invalid request '%s'", method)
>        raise ValueError("Invalid operation '%s'" % method)
>  
> -    # TODO: Rewrite to not exit in each 'if/elif' branch
> -
> +    job_id = None
>      if method == luxi.REQ_SUBMIT_JOB:
> -      logging.info("Receiving new job")
> -      (job_def, ) = args
> -      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
> -      job_id = queue.SubmitJob(ops)
> -      _LogNewJob(True, job_id, ops)
> -      return job_id
> -
> +      job_id = self._SubmitJob(args, queue)
>      elif method == luxi.REQ_PICKUP_JOB:
> -      logging.info("Picking up new job from queue")
> -      (job_id, ) = args
> -      queue.PickupJob(job_id)
> -
> +      job_id = self._PickupJob(args, queue)
>      elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
> -      logging.info("Forcefully receiving new job")
> -      (job_def, ) = args
> -      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
> -      job_id = queue.SubmitJobToDrainedQueue(ops)
> -      _LogNewJob(True, job_id, ops)
> -      return job_id
> -
> +      job_id = self._SubmitJobToDrainedQueue(args, queue)
>      elif method == luxi.REQ_SUBMIT_MANY_JOBS:
> -      logging.info("Receiving multiple jobs")
> -      (job_defs, ) = args
> -      jobs = []
> -      for ops in job_defs:
> -        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
> -      job_ids = queue.SubmitManyJobs(jobs)
> -      for ((status, job_id), ops) in zip(job_ids, jobs):
> -        _LogNewJob(status, job_id, ops)
> -      return job_ids
> -
> +      job_id = self._SubmitManyJobs(args, queue)
>      elif method == luxi.REQ_CANCEL_JOB:
> -      (job_id, ) = args
> -      logging.info("Received job cancel request for %s", job_id)
> -      return queue.CancelJob(job_id)
> -
> +      job_id = self._CancelJob(args, queue)
>      elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
> -      (job_id, priority) = args
> -      logging.info("Received request to change priority for job %s to %s",
> -                   job_id, priority)
> -      return queue.ChangeJobPriority(job_id, priority)
> -
> +      job_id = self._ChangeJobPriority(args, queue)
>      elif method == luxi.REQ_ARCHIVE_JOB:
> -      (job_id, ) = args
> -      logging.info("Received job archive request for %s", job_id)
> -      return queue.ArchiveJob(job_id)
> -
> +      job_id = self._ArchiveJob(args, queue)
>      elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
> -      (age, timeout) = args
> -      logging.info("Received job autoarchive request for age %s, timeout %s",
> -                   age, timeout)
> -      return queue.AutoArchiveJobs(age, timeout)
> -
> +      job_id = self._AutoArchiveJobs(args, queue)
>      elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
> -      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
> -      logging.info("Received job poll request for %s", job_id)
> -      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
> -                                     prev_log_serial, timeout)
> -
> +      job_id = self._WaitForJobChange(args, queue)
>      elif method == luxi.REQ_QUERY:
> -      (what, fields, qfilter) = args
> -
> -      if what in constants.QR_VIA_OP:
> -        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
> -                                             qfilter=qfilter))
> -      elif what == constants.QR_LOCK:
> -        if qfilter is not None:
> -          raise errors.OpPrereqError("Lock queries can't be filtered",
> -                                     errors.ECODE_INVAL)
> -        return context.glm.QueryLocks(fields)
> -      elif what == constants.QR_JOB:
> -        return queue.QueryJobs(fields, qfilter)
> -      elif what in constants.QR_VIA_LUXI:
> -        luxi_client = runtime.GetClient()
> -        result = luxi_client.Query(what, fields, qfilter).ToDict()
> -      else:
> -        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
> -                                   errors.ECODE_INVAL)
> -
> -      return result
> -
> +      job_id = self._PerformQuery(args, queue)
>      elif method == luxi.REQ_QUERY_FIELDS:
> -      (what, fields) = args
> -      req = objects.QueryFieldsRequest(what=what, fields=fields)
> -
> -      try:
> -        fielddefs = query.ALL_FIELDS[req.what]
> -      except KeyError:
> -        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
> -                                   errors.ECODE_INVAL)
> -
> -      return query.QueryFields(fielddefs, req.fields)
> -
> +      job_id = self._QueryFields(args)
>      elif method == luxi.REQ_QUERY_JOBS:
> -      (job_ids, fields) = args
> -      if isinstance(job_ids, (tuple, list)) and job_ids:
> -        msg = utils.CommaJoin(job_ids)
> -      else:
> -        msg = str(job_ids)
> -      logging.info("Received job query request for %s", msg)
> -      return queue.OldStyleQueryJobs(job_ids, fields)
> -
> +      job_id = self._QueryJobs(args, queue)
>      elif method == luxi.REQ_QUERY_CONFIG_VALUES:
> -      (fields, ) = args
> -      logging.info("Received config values query request for %s", fields)
> -      op = opcodes.OpClusterConfigQuery(output_fields=fields)
> -      return self._Query(op)
> -
> +      job_id = self._QueryConfigValues(args)
>      elif method == luxi.REQ_QUERY_CLUSTER_INFO:
> -      logging.info("Received cluster info query request")
> -      op = opcodes.OpClusterQuery()
> -      return self._Query(op)
> -
> +      job_id = self._QueryClusterInfo()
>      elif method == luxi.REQ_QUERY_TAGS:
> -      (kind, name) = args
> -      logging.info("Received tags query request")
> -      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
> -      return self._Query(op)
> -
> +      job_id = self._QueryTags(args)
>      elif method == luxi.REQ_SET_DRAIN_FLAG:
> -      (drain_flag, ) = args
> -      logging.info("Received queue drain flag change request to %s",
> -                   drain_flag)
> -      return queue.SetDrainFlag(drain_flag)
> -
> +      job_id = self._SetDrainFlag(args, queue)
>      elif method == luxi.REQ_SET_WATCHER_PAUSE:
> -      (until, ) = args
> -
> -      # FIXME!
> -      ec_id = None
> -      return _SetWatcherPause(context, ec_id, until)
> -
> +      job_id = self._SetWatcherPause(args, context)
>      else:
>        logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
>        raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
>                                     " but not implemented" % method)
>  
> +    return job_id
> +
>    def _Query(self, op):
>      """Runs the specified opcode and returns the result.
>  
> -- 
> 1.9.1.423.g4596e3a
> 

-- 
Jose Antonio Lopes
Ganeti Engineering
Google Germany GmbH
Dienerstr. 12, 80331, München

Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg
Geschäftsführer: Graham Law, Christine Elizabeth Flores
Steuernummer: 48/725/00206
Umsatzsteueridentifikationsnummer: DE813741370

Reply via email to