On Thu, Nov 7, 2013 at 1:20 PM, Klaus Aehlig <[email protected]> wrote:
> As the responsibility for writing the job queue changed
> to luxid, make masterd call to luxid when it is necessary
> to write a job.
>
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
>  lib/jqueue.py | 158 
> ++++------------------------------------------------------
>  1 file changed, 11 insertions(+), 147 deletions(-)
>
> diff --git a/lib/jqueue.py b/lib/jqueue.py
> index 2c9f974..3428b20 100644
> --- a/lib/jqueue.py
> +++ b/lib/jqueue.py
> @@ -48,6 +48,7 @@ from ganeti import constants
>  from ganeti import serializer
>  from ganeti import workerpool
>  from ganeti import locking
> +from ganeti import luxi
>  from ganeti import opcodes
>  from ganeti import opcodes_base
>  from ganeti import errors
> @@ -1937,36 +1938,6 @@ class JobQueue(object):
>      result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
>      self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
>
> -  def _NewSerialsUnlocked(self, count):
> -    """Generates a new job identifier.
> -
> -    Job identifiers are unique during the lifetime of a cluster.
> -
> -    @type count: integer
> -    @param count: how many serials to return
> -    @rtype: list of int
> -    @return: a list of job identifiers.
> -
> -    """
> -    assert ht.TNonNegativeInt(count)
> -
> -    # New number
> -    serial = self._last_serial + count
> -
> -    # Write to file
> -    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
> -                             "%s\n" % serial, True)
> -
> -    result = [jstore.FormatJobID(v)
> -              for v in range(self._last_serial + 1, serial + 1)]
> -
> -    # Keep it only if we were able to write the file
> -    self._last_serial = serial
> -
> -    assert len(result) == count
> -
> -    return result
> -
>    @staticmethod
>    def _GetJobPath(job_id):
>      """Returns the job file for a given job id.
> @@ -2174,96 +2145,29 @@ class JobQueue(object):
>
>      return True
>
> -  @_RequireOpenQueue
> -  def _SubmitJobUnlocked(self, job_id, ops):
> -    """Create and store a new job.
> -
> -    This enters the job into our job queue and also puts it on the new
> -    queue, in order for it to be picked up by the queue processors.
> -
> -    @type job_id: job ID
> -    @param job_id: the job ID for the new job
> -    @type ops: list
> -    @param ops: The list of OpCodes that will become the new job.
> -    @rtype: L{_QueuedJob}
> -    @return: the job object to be queued
> -    @raise errors.JobQueueFull: if the job queue has too many jobs in it
> -    @raise errors.GenericError: If an opcode is not valid
> -
> -    """
> -    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
> -      raise errors.JobQueueFull()
> -
> -    job = _QueuedJob(self, job_id, ops, True)
> -
> -    for idx, op in enumerate(job.ops):
> -      # Check priority
> -      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
> -        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
> -        raise errors.GenericError("Opcode %s has invalid priority %s, 
> allowed"
> -                                  " are %s" % (idx, op.priority, allowed))
> -
> -      # Check job dependencies
> -      dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
> -      if not opcodes_base.TNoRelativeJobDependencies(dependencies):
> -        raise errors.GenericError("Opcode %s has invalid dependencies, must"
> -                                  " match %s: %s" %
> -                                  (idx, 
> opcodes_base.TNoRelativeJobDependencies,
> -                                   dependencies))
> -
> -    # Write to disk
> -    self.UpdateJobUnlocked(job)
> -
> -    self._queue_size += 1
> -
> -    logging.debug("Adding new job %s to the cache", job_id)
> -    self._memcache[job_id] = job
> -
> -    return job
> -
> -  @locking.ssynchronized(_LOCK)
> -  @_RequireOpenQueue
> -  @_RequireNonDrainedQueue
> -  def SubmitJob(self, ops):
> +  @classmethod
> +  def SubmitJob(cls, ops):
>      """Create and store a new job.
>
> -    @see: L{_SubmitJobUnlocked}
> -
>      """
> -    (job_id, ) = self._NewSerialsUnlocked(1)
> -    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
> -    return job_id
> +    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
>
> -  @locking.ssynchronized(_LOCK)
> -  @_RequireOpenQueue
> -  def SubmitJobToDrainedQueue(self, ops):
> +  @classmethod
> +  def SubmitJobToDrainedQueue(cls, ops):
>      """Forcefully create and store a new job.
>
>      Do so, even if the job queue is drained.
> -    @see: L{_SubmitJobUnlocked}
>
>      """
> -    (job_id, ) = self._NewSerialsUnlocked(1)
> -    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
> -    return job_id
> +    return luxi.Client(address=pathutils.QUERY_SOCKET)\
> +        .SubmitJobToDrainedQueue(ops)
>
> -  @locking.ssynchronized(_LOCK)
> -  @_RequireOpenQueue
> -  @_RequireNonDrainedQueue
> -  def SubmitManyJobs(self, jobs):
> +  @classmethod
> +  def SubmitManyJobs(cls, jobs):
>      """Create and store multiple jobs.
>
> -    @see: L{_SubmitJobUnlocked}
> -
>      """
> -    all_job_ids = self._NewSerialsUnlocked(len(jobs))
> -
> -    (results, added_jobs) = \
> -      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
> -
> -    self._EnqueueJobsUnlocked(added_jobs)
> -
> -    return results
> +    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
>
>    @staticmethod
>    def _FormatSubmitError(msg, ops):
> @@ -2304,46 +2208,6 @@ class JobQueue(object):
>
>      return (True, result)
>
> -  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
> -    """Create and store multiple jobs.
> -
> -    @see: L{_SubmitJobUnlocked}
> -
> -    """
> -    results = []
> -    added_jobs = []
> -
> -    def resolve_fn(job_idx, reljobid):
> -      assert reljobid < 0
> -      return (previous_job_ids + job_ids[:job_idx])[reljobid]
> -
> -    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
> -      for op in ops:
> -        if getattr(op, opcodes_base.DEPEND_ATTR, None):
> -          (status, data) = \
> -            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
> -                                         op.depends)
> -          if not status:
> -            # Abort resolving dependencies
> -            assert ht.TNonEmptyString(data), "No error message"
> -            break
> -          # Use resolved dependencies
> -          op.depends = data
> -      else:
> -        try:
> -          job = self._SubmitJobUnlocked(job_id, ops)
> -        except errors.GenericError, err:
> -          status = False
> -          data = self._FormatSubmitError(str(err), ops)
> -        else:
> -          status = True
> -          data = job_id
> -          added_jobs.append(job)
> -
> -      results.append((status, data))
> -
> -    return (results, added_jobs)
> -
>    @locking.ssynchronized(_LOCK)
>    def _EnqueueJobs(self, jobs):
>      """Helper function to add jobs to worker pool's queue.
> --
> 1.8.4.1
>

LGTM, thanks.

Side note (not preventing the submission of this patch series at all):
I think we should rename pathutils.QUERY_SOCKET to
pathutils.LUXID_SOCKET, given that the daemon has changed its name.

Thanks,
Michele

-- 
Google Germany GmbH
Dienerstr. 12
80331 München

Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg
Geschäftsführer: Graham Law, Christine Elizabeth Flores

Reply via email to