LGTM
On Tue, Jun 24, 2014 at 1:55 PM, 'Klaus Aehlig' via ganeti-devel < [email protected]> wrote: > All requests to watch a job for changes are now > handled by luxid. So this code is unused in the > Python job queue. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > lib/jqueue/__init__.py | 255 > -------------------------------------- > test/py/ganeti.jqueue_unittest.py | 190 ---------------------------- > 2 files changed, 445 deletions(-) > > diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py > index 81d7cfc..370e957 100644 > --- a/lib/jqueue/__init__.py > +++ b/lib/jqueue/__init__.py > @@ -101,25 +101,6 @@ def _CallJqUpdate(runner, names, file_name, content): > return runner.call_jobqueue_update(names, virt_file_name, content) > > > -class _SimpleJobQuery: > - """Wrapper for job queries. > - > - Instance keeps list of fields cached, useful e.g. in > L{_JobChangesChecker}. > - > - """ > - def __init__(self, fields): > - """Initializes this class. > - > - """ > - self._query = query.Query(query.JOB_FIELDS, fields) > - > - def __call__(self, job): > - """Executes a job query using cached field list. > - > - """ > - return self._query.OldStyleQuery([(job.id, job)], > sort_by_name=False)[0] > - > - > class _QueuedOpCode(object): > """Encapsulates an opcode object. > > @@ -666,210 +647,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): > return self._queue.SubmitManyJobs(jobs) > > > -class _JobChangesChecker(object): > - def __init__(self, fields, prev_job_info, prev_log_serial): > - """Initializes this class. > - > - @type fields: list of strings > - @param fields: Fields requested by LUXI client > - @type prev_job_info: string > - @param prev_job_info: previous job info, as passed by the LUXI client > - @type prev_log_serial: string > - @param prev_log_serial: previous job serial, as passed by the LUXI > client > - > - """ > - self._squery = _SimpleJobQuery(fields) > - self._prev_job_info = prev_job_info > - self._prev_log_serial = prev_log_serial > - > - def __call__(self, job): > - """Checks whether job has changed. > - > - @type job: L{_QueuedJob} > - @param job: Job object > - > - """ > - assert not job.writable, "Expected read-only job" > - > - status = job.CalcStatus() > - job_info = self._squery(job) > - log_entries = job.GetLogEntries(self._prev_log_serial) > - > - # Serializing and deserializing data can cause type changes (e.g. from > - # tuple to list) or precision loss. We're doing it here so that we get > - # the same modifications as the data received from the client. Without > - # this, the comparison afterwards might fail without the data being > - # significantly different. > - # TODO: we just deserialized from disk, investigate how to make sure > that > - # the job info and log entries are compatible to avoid this further > step. > - # TODO: Doing something like in testutils.py:UnifyValueType might be > more > - # efficient, though floats will be tricky > - job_info = serializer.LoadJson(serializer.DumpJson(job_info)) > - log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) > - > - # Don't even try to wait if the job is no longer running, there will > be > - # no changes. > - if (status not in (constants.JOB_STATUS_QUEUED, > - constants.JOB_STATUS_RUNNING, > - constants.JOB_STATUS_WAITING) or > - job_info != self._prev_job_info or > - (log_entries and self._prev_log_serial != log_entries[0][0])): > - logging.debug("Job %s changed", job.id) > - return (job_info, log_entries) > - > - return None > - > - > -class _JobFileChangesWaiter(object): > - def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager): > - """Initializes this class. > - > - @type filename: string > - @param filename: Path to job file > - @raises errors.InotifyError: if the notifier cannot be setup > - > - """ > - self._wm = _inotify_wm_cls() > - self._inotify_handler = \ > - asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, > filename) > - self._notifier = \ > - pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) > - try: > - self._inotify_handler.enable() > - except Exception: > - # pyinotify doesn't close file descriptors automatically > - self._notifier.stop() > - raise > - > - def _OnInotify(self, notifier_enabled): > - """Callback for inotify. > - > - """ > - if not notifier_enabled: > - self._inotify_handler.enable() > - > - def Wait(self, timeout): > - """Waits for the job file to change. > - > - @type timeout: float > - @param timeout: Timeout in seconds > - @return: Whether there have been events > - > - """ > - assert timeout >= 0 > - have_events = self._notifier.check_events(timeout * 1000) > - if have_events: > - self._notifier.read_events() > - self._notifier.process_events() > - return have_events > - > - def Close(self): > - """Closes underlying notifier and its file descriptor. > - > - """ > - self._notifier.stop() > - > - > -class _JobChangesWaiter(object): > - def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter): > - """Initializes this class. > - > - @type filename: string > - @param filename: Path to job file > - > - """ > - self._filewaiter = None > - self._filename = filename > - self._waiter_cls = _waiter_cls > - > - def Wait(self, timeout): > - """Waits for a job to change. > - > - @type timeout: float > - @param timeout: Timeout in seconds > - @return: Whether there have been events > - > - """ > - if self._filewaiter: > - return self._filewaiter.Wait(timeout) > - > - # Lazy setup: Avoid inotify setup cost when job file has already > changed. > - # If this point is reached, return immediately and let caller check > the job > - # file again in case there were changes since the last check. This > avoids a > - # race condition. > - self._filewaiter = self._waiter_cls(self._filename) > - > - return True > - > - def Close(self): > - """Closes underlying waiter. > - > - """ > - if self._filewaiter: > - self._filewaiter.Close() > - > - > -class _WaitForJobChangesHelper(object): > - """Helper class using inotify to wait for changes in a job file. > - > - This class takes a previous job status and serial, and alerts the > client when > - the current job status has changed. > - > - """ > - @staticmethod > - def _CheckForChanges(counter, job_load_fn, check_fn): > - if counter.next() > 0: > - # If this isn't the first check the job is given some more time to > change > - # again. This gives better performance for jobs generating many > - # changes/messages. > - time.sleep(0.1) > - > - job = job_load_fn() > - if not job: > - raise errors.JobLost() > - > - result = check_fn(job) > - if result is None: > - raise utils.RetryAgain() > - > - return result > - > - def __call__(self, filename, job_load_fn, > - fields, prev_job_info, prev_log_serial, timeout, > - _waiter_cls=_JobChangesWaiter): > - """Waits for changes on a job. > - > - @type filename: string > - @param filename: File on which to wait for changes > - @type job_load_fn: callable > - @param job_load_fn: Function to load job > - @type fields: list of strings > - @param fields: Which fields to check for changes > - @type prev_job_info: list or None > - @param prev_job_info: Last job information returned > - @type prev_log_serial: int > - @param prev_log_serial: Last job message serial number > - @type timeout: float > - @param timeout: maximum time to wait in seconds > - > - """ > - counter = itertools.count() > - try: > - check_fn = _JobChangesChecker(fields, prev_job_info, > prev_log_serial) > - waiter = _waiter_cls(filename) > - try: > - return utils.Retry(compat.partial(self._CheckForChanges, > - counter, job_load_fn, check_fn), > - utils.RETRY_REMAINING_TIME, timeout, > - wait_fn=waiter.Wait) > - finally: > - waiter.Close() > - except errors.JobLost: > - return None > - except utils.RetryTimeout: > - return constants.JOB_NOTCHANGED > - > - > def _EncodeOpError(err): > """Encodes an error which occurred while processing an opcode. > > @@ -2167,38 +1944,6 @@ class JobQueue(object): > logging.debug("Writing job %s to %s", job.id, filename) > self._UpdateJobQueueFile(filename, data, replicate) > > - def WaitForJobChanges(self, job_id, fields, prev_job_info, > prev_log_serial, > - timeout): > - """Waits for changes in a job. > - > - @type job_id: int > - @param job_id: Job identifier > - @type fields: list of strings > - @param fields: Which fields to check for changes > - @type prev_job_info: list or None > - @param prev_job_info: Last job information returned > - @type prev_log_serial: int > - @param prev_log_serial: Last job message serial number > - @type timeout: float > - @param timeout: maximum time to wait in seconds > - @rtype: tuple (job info, log entries) > - @return: a tuple of the job information as required via > - the fields parameter, and the log entries as a list > - > - if the job has not changed and the timeout has expired, > - we instead return a special value, > - L{constants.JOB_NOTCHANGED}, which should be interpreted > - as such by the clients > - > - """ > - load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True, > - writable=False) > - > - helper = _WaitForJobChangesHelper() > - > - return helper(self._GetJobPath(job_id), load_fn, > - fields, prev_job_info, prev_log_serial, timeout) > - > def HasJobBeenFinalized(self, job_id): > """Checks if a job has been finalized. > > diff --git a/test/py/ganeti.jqueue_unittest.py b/test/py/ > ganeti.jqueue_unittest.py > index 78ade29..80e60f9 100755 > --- a/test/py/ganeti.jqueue_unittest.py > +++ b/test/py/ganeti.jqueue_unittest.py > @@ -75,196 +75,6 @@ class _FakeJob: > return self._log[newer_than:] > > > -class TestJobChangesChecker(unittest.TestCase): > - def testStatus(self): > - job = _FakeJob(9094, constants.JOB_STATUS_QUEUED) > - checker = jqueue._JobChangesChecker(["status"], None, None) > - self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], [])) > - > - job.SetStatus(constants.JOB_STATUS_RUNNING) > - self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], [])) > - > - job.SetStatus(constants.JOB_STATUS_SUCCESS) > - self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], [])) > - > - # job.id is used by checker > - self.assertEqual(job.id, 9094) > - > - def testStatusWithPrev(self): > - job = _FakeJob(12807, constants.JOB_STATUS_QUEUED) > - checker = jqueue._JobChangesChecker(["status"], > - [constants.JOB_STATUS_QUEUED], > None) > - self.assert_(checker(job) is None) > - > - job.SetStatus(constants.JOB_STATUS_RUNNING) > - self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], [])) > - > - def testFinalStatus(self): > - for status in constants.JOBS_FINALIZED: > - job = _FakeJob(2178711, status) > - checker = jqueue._JobChangesChecker(["status"], [status], None) > - # There won't be any changes in this status, hence it should signal > - # a change immediately > - self.assertEqual(checker(job), ([status], [])) > - > - def testLog(self): > - job = _FakeJob(9094, constants.JOB_STATUS_RUNNING) > - checker = jqueue._JobChangesChecker(["status"], None, None) > - self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], [])) > - > - job.AddLogEntry("Hello World") > - (job_info, log_entries) = checker(job) > - self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING]) > - self.assertEqual(log_entries, [[0, "Hello World"]]) > - > - checker2 = jqueue._JobChangesChecker(["status"], job_info, > len(log_entries)) > - self.assert_(checker2(job) is None) > - > - job.AddLogEntry("Foo Bar") > - job.SetStatus(constants.JOB_STATUS_ERROR) > - > - (job_info, log_entries) = checker2(job) > - self.assertEqual(job_info, [constants.JOB_STATUS_ERROR]) > - self.assertEqual(log_entries, [[1, "Foo Bar"]]) > - > - checker3 = jqueue._JobChangesChecker(["status"], None, None) > - (job_info, log_entries) = checker3(job) > - self.assertEqual(job_info, [constants.JOB_STATUS_ERROR]) > - self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]]) > - > - > -class TestJobChangesWaiter(unittest.TestCase): > - def setUp(self): > - self.tmpdir = tempfile.mkdtemp() > - self.filename = utils.PathJoin(self.tmpdir, "job-1") > - utils.WriteFile(self.filename, data="") > - > - def tearDown(self): > - shutil.rmtree(self.tmpdir) > - > - def _EnsureNotifierClosed(self, notifier): > - try: > - os.fstat(notifier._fd) > - except EnvironmentError, err: > - self.assertEqual(err.errno, errno.EBADF) > - else: > - self.fail("File descriptor wasn't closed") > - > - def testClose(self): > - for wait in [False, True]: > - waiter = jqueue._JobFileChangesWaiter(self.filename) > - try: > - if wait: > - waiter.Wait(0.001) > - finally: > - waiter.Close() > - > - # Ensure file descriptor was closed > - self._EnsureNotifierClosed(waiter._notifier) > - > - def testChangingFile(self): > - waiter = jqueue._JobFileChangesWaiter(self.filename) > - try: > - self.assertFalse(waiter.Wait(0.1)) > - utils.WriteFile(self.filename, data="changed") > - self.assert_(waiter.Wait(60)) > - finally: > - waiter.Close() > - > - self._EnsureNotifierClosed(waiter._notifier) > - > - def testChangingFile2(self): > - waiter = jqueue._JobChangesWaiter(self.filename) > - try: > - self.assertFalse(waiter._filewaiter) > - self.assert_(waiter.Wait(0.1)) > - self.assert_(waiter._filewaiter) > - > - # File waiter is now used, but there have been no changes > - self.assertFalse(waiter.Wait(0.1)) > - utils.WriteFile(self.filename, data="changed") > - self.assert_(waiter.Wait(60)) > - finally: > - waiter.Close() > - > - self._EnsureNotifierClosed(waiter._filewaiter._notifier) > - > - > -class _FailingWatchManager(pyinotify.WatchManager): > - """Subclass of L{pyinotify.WatchManager} which always fails to register. > - > - """ > - def add_watch(self, filename, mask): > - assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] | > - pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"]) > - > - return { > - filename: -1, > - } > - > - > -class TestWaitForJobChangesHelper(unittest.TestCase): > - def setUp(self): > - self.tmpdir = tempfile.mkdtemp() > - self.filename = utils.PathJoin(self.tmpdir, "job-2614226563") > - utils.WriteFile(self.filename, data="") > - > - def tearDown(self): > - shutil.rmtree(self.tmpdir) > - > - def _LoadWaitingJob(self): > - return _FakeJob(2614226563, constants.JOB_STATUS_WAITING) > - > - def _LoadLostJob(self): > - return None > - > - def testNoChanges(self): > - wfjc = jqueue._WaitForJobChangesHelper() > - > - # No change > - self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"], > - [constants.JOB_STATUS_WAITING], None, 0.1), > - constants.JOB_NOTCHANGED) > - > - # No previous information > - self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, > - ["status"], None, None, 1.0), > - ([constants.JOB_STATUS_WAITING], [])) > - > - def testLostJob(self): > - wfjc = jqueue._WaitForJobChangesHelper() > - self.assert_(wfjc(self.filename, self._LoadLostJob, > - ["status"], None, None, 1.0) is None) > - > - def testNonExistentFile(self): > - wfjc = jqueue._WaitForJobChangesHelper() > - > - filename = utils.PathJoin(self.tmpdir, "does-not-exist") > - self.assertFalse(os.path.exists(filename)) > - > - result = wfjc(filename, self._LoadLostJob, ["status"], None, None, > 1.0, > - _waiter_cls=compat.partial(jqueue._JobChangesWaiter, > - _waiter_cls=NotImplemented)) > - self.assertTrue(result is None) > - > - def testInotifyError(self): > - jobfile_waiter_cls = \ > - compat.partial(jqueue._JobFileChangesWaiter, > - _inotify_wm_cls=_FailingWatchManager) > - > - jobchange_waiter_cls = \ > - compat.partial(jqueue._JobChangesWaiter, > _waiter_cls=jobfile_waiter_cls) > - > - wfjc = jqueue._WaitForJobChangesHelper() > - > - # Test if failing to watch a job file (e.g. due to > - # fs.inotify.max_user_watches being too low) raises > errors.InotifyError > - self.assertRaises(errors.InotifyError, wfjc, > - self.filename, self._LoadWaitingJob, > - ["status"], [constants.JOB_STATUS_WAITING], None, > 1.0, > - _waiter_cls=jobchange_waiter_cls) > - > - > class TestEncodeOpError(unittest.TestCase): > def test(self): > encerr = jqueue._EncodeOpError(errors.LockError("Test 1")) > -- > 2.0.0.526.g5318336 > >
