LGTM, thanks

+1 for patches are "Remove ..."
+1 removing 897 lines while adding just 3


On Tue, Jun 24, 2014 at 1:55 PM, 'Klaus Aehlig' via ganeti-devel <
[email protected]> wrote:

> As masterd does not exist any more, there is no need
> the jobs as processes to support all the luxi operations.
> So remove support for all that are not potentially needed,
> i.e., all but picking up a job and changing its priority.
>
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
>  lib/jqueue/__init__.py |  17 -----
>  lib/server/masterd.py  | 182
> +------------------------------------------------
>  2 files changed, 3 insertions(+), 196 deletions(-)
>
> diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
> index 370e957..b381f52 100644
> --- a/lib/jqueue/__init__.py
> +++ b/lib/jqueue/__init__.py
> @@ -1816,23 +1816,6 @@ class JobQueue(object):
>      self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
>
>    @classmethod
> -  def SubmitJob(cls, ops):
> -    """Create and store a new job.
> -
> -    """
> -    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
> -
> -  @classmethod
> -  def SubmitJobToDrainedQueue(cls, ops):
> -    """Forcefully create and store a new job.
> -
> -    Do so, even if the job queue is drained.
> -
> -    """
> -    return luxi.Client(address=pathutils.QUERY_SOCKET)\
> -        .SubmitJobToDrainedQueue(ops)
> -
> -  @classmethod
>    def SubmitManyJobs(cls, jobs):
>      """Create and store multiple jobs.
>
> diff --git a/lib/server/masterd.py b/lib/server/masterd.py
> index ee1ca77..b056584 100644
> --- a/lib/server/masterd.py
> +++ b/lib/server/masterd.py
> @@ -40,8 +40,6 @@ import logging
>  from ganeti import config
>  from ganeti import constants
>  from ganeti import daemon
> -from ganeti import mcpu
> -from ganeti import opcodes
>  from ganeti import jqueue
>  from ganeti import luxi
>  import ganeti.rpc.errors as rpcerr
> @@ -50,9 +48,6 @@ from ganeti import errors
>  from ganeti import workerpool
>  import ganeti.rpc.node as rpc
>  import ganeti.rpc.client as rpccl
> -from ganeti import objects
> -from ganeti import query
> -from ganeti import runtime
>  from ganeti import ht
>
>
> @@ -266,15 +261,6 @@ class ClientOps(object):
>      self.server = server
>
>    @staticmethod
> -  def _SubmitJob(args, queue):
> -    logging.info("Receiving new job")
> -    (job_def, ) = args
> -    ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
> -    job_id = queue.SubmitJob(ops)
> -    _LogNewJob(True, job_id, ops)
> -    return job_id
> -
> -  @staticmethod
>    def _PickupJob(args, queue):
>      logging.info("Picking up new job from queue")
>      (job_id, ) = args
> @@ -282,132 +268,12 @@ class ClientOps(object):
>      return job_id
>
>    @staticmethod
> -  def _SubmitJobToDrainedQueue(args, queue):
> -    logging.info("Forcefully receiving new job")
> -    (job_def, ) = args
> -    ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
> -    job_id = queue.SubmitJobToDrainedQueue(ops)
> -    _LogNewJob(True, job_id, ops)
> -    return job_id
> -
> -  @staticmethod
> -  def _SubmitManyJobs(args, queue):
> -    logging.info("Receiving multiple jobs")
> -    (job_defs, ) = args
> -    jobs = []
> -    for ops in job_defs:
> -      jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
> -    job_ids = queue.SubmitManyJobs(jobs)
> -    for ((status, job_id), ops) in zip(job_ids, jobs):
> -      _LogNewJob(status, job_id, ops)
> -    return job_ids
> -
> -  @staticmethod
> -  def _CancelJob(args, queue):
> -    (job_id, ) = args
> -    logging.info("Received job cancel request for %s", job_id)
> -    return queue.CancelJob(job_id)
> -
> -  @staticmethod
>    def _ChangeJobPriority(args, queue):
>      (job_id, priority) = args
>      logging.info("Received request to change priority for job %s to %s",
>                   job_id, priority)
>      return queue.ChangeJobPriority(job_id, priority)
>
> -  @staticmethod
> -  def _ArchiveJob(args, queue):
> -    (job_id, ) = args
> -    logging.info("Received job archive request for %s", job_id)
> -    return queue.ArchiveJob(job_id)
> -
> -  @staticmethod
> -  def _AutoArchiveJobs(args, queue):
> -    (age, timeout) = args
> -    logging.info("Received job autoarchive request for age %s, timeout
> %s",
> -                 age, timeout)
> -    return queue.AutoArchiveJobs(age, timeout)
> -
> -  @staticmethod
> -  def _WaitForJobChange(args, queue):
> -    (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
> -    logging.info("Received job poll request for %s", job_id)
> -    return queue.WaitForJobChanges(job_id, fields, prev_job_info,
> -                                   prev_log_serial, timeout)
> -
> -  def _PerformQuery(self, args, queue):
> -    (what, fields, qfilter) = args
> -
> -    if what in constants.QR_VIA_OP:
> -      result = self._Query(opcodes.OpQuery(what=what, fields=fields,
> -                                           qfilter=qfilter))
> -    elif what == constants.QR_LOCK:
> -      raise errors.OpPrereqError("Lock queries cannot be asked to jobs")
> -    elif what == constants.QR_JOB:
> -      result = queue.QueryJobs(fields, qfilter)
> -    elif what in constants.QR_VIA_LUXI:
> -      luxi_client = runtime.GetClient()
> -      result = luxi_client.Query(what, fields, qfilter).ToDict()
> -    else:
> -      raise errors.OpPrereqError("Resource type '%s' unknown" % what,
> -                                 errors.ECODE_INVAL)
> -
> -    return result
> -
> -  @staticmethod
> -  def _QueryFields(args):
> -    (what, fields) = args
> -    req = objects.QueryFieldsRequest(what=what, fields=fields)
> -
> -    try:
> -      fielddefs = query.ALL_FIELDS[req.what]
> -    except KeyError:
> -      raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
> -                                 errors.ECODE_INVAL)
> -
> -    return query.QueryFields(fielddefs, req.fields)
> -
> -  @staticmethod
> -  def _QueryJobs(args, queue):
> -    (job_ids, fields) = args
> -    if isinstance(job_ids, (tuple, list)) and job_ids:
> -      msg = utils.CommaJoin(job_ids)
> -    else:
> -      msg = str(job_ids)
> -    logging.info("Received job query request for %s", msg)
> -    return queue.OldStyleQueryJobs(job_ids, fields)
> -
> -  def _QueryConfigValues(self, args):
> -    (fields, ) = args
> -    logging.info("Received config values query request for %s", fields)
> -    op = opcodes.OpClusterConfigQuery(output_fields=fields)
> -    return self._Query(op)
> -
> -  def _QueryClusterInfo(self):
> -    logging.info("Received cluster info query request")
> -    op = opcodes.OpClusterQuery()
> -    return self._Query(op)
> -
> -  def _QueryTags(self, args):
> -    (kind, name) = args
> -    logging.info("Received tags query request")
> -    op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
> -    return self._Query(op)
> -
> -  @staticmethod
> -  def _SetDrainFlag(args, queue):
> -    (drain_flag, ) = args
> -    logging.info("Received queue drain flag change request to %s",
> -                 drain_flag)
> -    return queue.SetDrainFlag(drain_flag)
> -
> -  @staticmethod
> -  def _SetWatcherPause(args, context):
> -    (until, ) = args
> -    # FIXME!
> -    ec_id = None
> -    return _SetWatcherPause(context, ec_id, until)
> -
>    def handle_request(self, method, args): # pylint: disable=R0911
>      context = self.server.context
>      queue = context.jobqueue
> @@ -422,58 +288,16 @@ class ClientOps(object):
>        raise ValueError("Invalid operation '%s'" % method)
>
>      job_id = None
> -    if method == luxi.REQ_SUBMIT_JOB:
> -      job_id = self._SubmitJob(args, queue)
> -    elif method == luxi.REQ_PICKUP_JOB:
> +    if method == luxi.REQ_PICKUP_JOB:
>        job_id = self._PickupJob(args, queue)
> -    elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
> -      job_id = self._SubmitJobToDrainedQueue(args, queue)
> -    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
> -      job_id = self._SubmitManyJobs(args, queue)
> -    elif method == luxi.REQ_CANCEL_JOB:
> -      job_id = self._CancelJob(args, queue)
>      elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
>        job_id = self._ChangeJobPriority(args, queue)
> -    elif method == luxi.REQ_ARCHIVE_JOB:
> -      job_id = self._ArchiveJob(args, queue)
> -    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
> -      job_id = self._AutoArchiveJobs(args, queue)
> -    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
> -      job_id = self._WaitForJobChange(args, queue)
> -    elif method == luxi.REQ_QUERY:
> -      job_id = self._PerformQuery(args, queue)
> -    elif method == luxi.REQ_QUERY_FIELDS:
> -      job_id = self._QueryFields(args)
> -    elif method == luxi.REQ_QUERY_JOBS:
> -      job_id = self._QueryJobs(args, queue)
> -    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
> -      job_id = self._QueryConfigValues(args)
> -    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
> -      job_id = self._QueryClusterInfo()
> -    elif method == luxi.REQ_QUERY_TAGS:
> -      job_id = self._QueryTags(args)
> -    elif method == luxi.REQ_SET_DRAIN_FLAG:
> -      job_id = self._SetDrainFlag(args, queue)
> -    elif method == luxi.REQ_SET_WATCHER_PAUSE:
> -      job_id = self._SetWatcherPause(args, context)
>      else:
> -      logging.critical("Request '%s' in luxi.REQ_ALL, but not known",
> method)
> -      raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
> -                                   " but not implemented" % method)
> +      logging.info("Request '%s' not supported by masterd", method)
> +      raise ValueError("Unsupported operation '%s'" % method)
>
>      return job_id
>
> -  def _Query(self, op):
> -    """Runs the specified opcode and returns the result.
> -
> -    """
> -    # Queries don't have a job id
> -    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
> -
> -    # TODO: Executing an opcode using locks will acquire them in blocking
> mode.
> -    # Consider using a timeout for retries.
> -    return proc.ExecOpCode(op, None)
> -
>
>  class GanetiContext(object):
>    """Context common to all ganeti threads.
> --
> 2.0.0.526.g5318336
>
>

Reply via email to