On Thu, Nov 7, 2013 at 1:20 PM, Klaus Aehlig <[email protected]> wrote: > As the responsibility for writing the job queue changed > to luxid, make masterd call to luxid when it is necessary > to write a job. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > lib/jqueue.py | 158 > ++++------------------------------------------------------ > 1 file changed, 11 insertions(+), 147 deletions(-) > > diff --git a/lib/jqueue.py b/lib/jqueue.py > index 2c9f974..3428b20 100644 > --- a/lib/jqueue.py > +++ b/lib/jqueue.py > @@ -48,6 +48,7 @@ from ganeti import constants > from ganeti import serializer > from ganeti import workerpool > from ganeti import locking > +from ganeti import luxi > from ganeti import opcodes > from ganeti import opcodes_base > from ganeti import errors > @@ -1937,36 +1938,6 @@ class JobQueue(object): > result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) > self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) > > - def _NewSerialsUnlocked(self, count): > - """Generates a new job identifier. > - > - Job identifiers are unique during the lifetime of a cluster. > - > - @type count: integer > - @param count: how many serials to return > - @rtype: list of int > - @return: a list of job identifiers. > - > - """ > - assert ht.TNonNegativeInt(count) > - > - # New number > - serial = self._last_serial + count > - > - # Write to file > - self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE, > - "%s\n" % serial, True) > - > - result = [jstore.FormatJobID(v) > - for v in range(self._last_serial + 1, serial + 1)] > - > - # Keep it only if we were able to write the file > - self._last_serial = serial > - > - assert len(result) == count > - > - return result > - > @staticmethod > def _GetJobPath(job_id): > """Returns the job file for a given job id. > @@ -2174,96 +2145,29 @@ class JobQueue(object): > > return True > > - @_RequireOpenQueue > - def _SubmitJobUnlocked(self, job_id, ops): > - """Create and store a new job. > - > - This enters the job into our job queue and also puts it on the new > - queue, in order for it to be picked up by the queue processors. > - > - @type job_id: job ID > - @param job_id: the job ID for the new job > - @type ops: list > - @param ops: The list of OpCodes that will become the new job. > - @rtype: L{_QueuedJob} > - @return: the job object to be queued > - @raise errors.JobQueueFull: if the job queue has too many jobs in it > - @raise errors.GenericError: If an opcode is not valid > - > - """ > - if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: > - raise errors.JobQueueFull() > - > - job = _QueuedJob(self, job_id, ops, True) > - > - for idx, op in enumerate(job.ops): > - # Check priority > - if op.priority not in constants.OP_PRIO_SUBMIT_VALID: > - allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) > - raise errors.GenericError("Opcode %s has invalid priority %s, > allowed" > - " are %s" % (idx, op.priority, allowed)) > - > - # Check job dependencies > - dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None) > - if not opcodes_base.TNoRelativeJobDependencies(dependencies): > - raise errors.GenericError("Opcode %s has invalid dependencies, must" > - " match %s: %s" % > - (idx, > opcodes_base.TNoRelativeJobDependencies, > - dependencies)) > - > - # Write to disk > - self.UpdateJobUnlocked(job) > - > - self._queue_size += 1 > - > - logging.debug("Adding new job %s to the cache", job_id) > - self._memcache[job_id] = job > - > - return job > - > - @locking.ssynchronized(_LOCK) > - @_RequireOpenQueue > - @_RequireNonDrainedQueue > - def SubmitJob(self, ops): > + @classmethod > + def SubmitJob(cls, ops): > """Create and store a new job. > > - @see: L{_SubmitJobUnlocked} > - > """ > - (job_id, ) = self._NewSerialsUnlocked(1) > - self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)]) > - return job_id > + return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops) > > - @locking.ssynchronized(_LOCK) > - @_RequireOpenQueue > - def SubmitJobToDrainedQueue(self, ops): > + @classmethod > + def SubmitJobToDrainedQueue(cls, ops): > """Forcefully create and store a new job. > > Do so, even if the job queue is drained. > - @see: L{_SubmitJobUnlocked} > > """ > - (job_id, ) = self._NewSerialsUnlocked(1) > - self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)]) > - return job_id > + return luxi.Client(address=pathutils.QUERY_SOCKET)\ > + .SubmitJobToDrainedQueue(ops) > > - @locking.ssynchronized(_LOCK) > - @_RequireOpenQueue > - @_RequireNonDrainedQueue > - def SubmitManyJobs(self, jobs): > + @classmethod > + def SubmitManyJobs(cls, jobs): > """Create and store multiple jobs. > > - @see: L{_SubmitJobUnlocked} > - > """ > - all_job_ids = self._NewSerialsUnlocked(len(jobs)) > - > - (results, added_jobs) = \ > - self._SubmitManyJobsUnlocked(jobs, all_job_ids, []) > - > - self._EnqueueJobsUnlocked(added_jobs) > - > - return results > + return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs) > > @staticmethod > def _FormatSubmitError(msg, ops): > @@ -2304,46 +2208,6 @@ class JobQueue(object): > > return (True, result) > > - def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids): > - """Create and store multiple jobs. > - > - @see: L{_SubmitJobUnlocked} > - > - """ > - results = [] > - added_jobs = [] > - > - def resolve_fn(job_idx, reljobid): > - assert reljobid < 0 > - return (previous_job_ids + job_ids[:job_idx])[reljobid] > - > - for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)): > - for op in ops: > - if getattr(op, opcodes_base.DEPEND_ATTR, None): > - (status, data) = \ > - self._ResolveJobDependencies(compat.partial(resolve_fn, idx), > - op.depends) > - if not status: > - # Abort resolving dependencies > - assert ht.TNonEmptyString(data), "No error message" > - break > - # Use resolved dependencies > - op.depends = data > - else: > - try: > - job = self._SubmitJobUnlocked(job_id, ops) > - except errors.GenericError, err: > - status = False > - data = self._FormatSubmitError(str(err), ops) > - else: > - status = True > - data = job_id > - added_jobs.append(job) > - > - results.append((status, data)) > - > - return (results, added_jobs) > - > @locking.ssynchronized(_LOCK) > def _EnqueueJobs(self, jobs): > """Helper function to add jobs to worker pool's queue. > -- > 1.8.4.1 >
LGTM, thanks. Side note (not preventing the submission of this patch series at all): I think we should rename pathutils.QUERY_SOCKET to pathutils.LUXID_SOCKET, given that the daemon has changed its name. Thanks, Michele -- Google Germany GmbH Dienerstr. 12 80331 München Registergericht und -nummer: Hamburg, HRB 86891 Sitz der Gesellschaft: Hamburg Geschäftsführer: Graham Law, Christine Elizabeth Flores
