All requests to watch a job for changes are now handled by luxid. So this code is unused in the Python job queue.
Signed-off-by: Klaus Aehlig <[email protected]> --- lib/jqueue/__init__.py | 255 -------------------------------------- test/py/ganeti.jqueue_unittest.py | 190 ---------------------------- 2 files changed, 445 deletions(-) diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py index 81d7cfc..370e957 100644 --- a/lib/jqueue/__init__.py +++ b/lib/jqueue/__init__.py @@ -101,25 +101,6 @@ def _CallJqUpdate(runner, names, file_name, content): return runner.call_jobqueue_update(names, virt_file_name, content) -class _SimpleJobQuery: - """Wrapper for job queries. - - Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}. - - """ - def __init__(self, fields): - """Initializes this class. - - """ - self._query = query.Query(query.JOB_FIELDS, fields) - - def __call__(self, job): - """Executes a job query using cached field list. - - """ - return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0] - - class _QueuedOpCode(object): """Encapsulates an opcode object. @@ -666,210 +647,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): return self._queue.SubmitManyJobs(jobs) -class _JobChangesChecker(object): - def __init__(self, fields, prev_job_info, prev_log_serial): - """Initializes this class. - - @type fields: list of strings - @param fields: Fields requested by LUXI client - @type prev_job_info: string - @param prev_job_info: previous job info, as passed by the LUXI client - @type prev_log_serial: string - @param prev_log_serial: previous job serial, as passed by the LUXI client - - """ - self._squery = _SimpleJobQuery(fields) - self._prev_job_info = prev_job_info - self._prev_log_serial = prev_log_serial - - def __call__(self, job): - """Checks whether job has changed. - - @type job: L{_QueuedJob} - @param job: Job object - - """ - assert not job.writable, "Expected read-only job" - - status = job.CalcStatus() - job_info = self._squery(job) - log_entries = job.GetLogEntries(self._prev_log_serial) - - # Serializing and deserializing data can cause type changes (e.g. from - # tuple to list) or precision loss. We're doing it here so that we get - # the same modifications as the data received from the client. Without - # this, the comparison afterwards might fail without the data being - # significantly different. - # TODO: we just deserialized from disk, investigate how to make sure that - # the job info and log entries are compatible to avoid this further step. - # TODO: Doing something like in testutils.py:UnifyValueType might be more - # efficient, though floats will be tricky - job_info = serializer.LoadJson(serializer.DumpJson(job_info)) - log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) - - # Don't even try to wait if the job is no longer running, there will be - # no changes. - if (status not in (constants.JOB_STATUS_QUEUED, - constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITING) or - job_info != self._prev_job_info or - (log_entries and self._prev_log_serial != log_entries[0][0])): - logging.debug("Job %s changed", job.id) - return (job_info, log_entries) - - return None - - -class _JobFileChangesWaiter(object): - def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager): - """Initializes this class. - - @type filename: string - @param filename: Path to job file - @raises errors.InotifyError: if the notifier cannot be setup - - """ - self._wm = _inotify_wm_cls() - self._inotify_handler = \ - asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) - self._notifier = \ - pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) - try: - self._inotify_handler.enable() - except Exception: - # pyinotify doesn't close file descriptors automatically - self._notifier.stop() - raise - - def _OnInotify(self, notifier_enabled): - """Callback for inotify. - - """ - if not notifier_enabled: - self._inotify_handler.enable() - - def Wait(self, timeout): - """Waits for the job file to change. - - @type timeout: float - @param timeout: Timeout in seconds - @return: Whether there have been events - - """ - assert timeout >= 0 - have_events = self._notifier.check_events(timeout * 1000) - if have_events: - self._notifier.read_events() - self._notifier.process_events() - return have_events - - def Close(self): - """Closes underlying notifier and its file descriptor. - - """ - self._notifier.stop() - - -class _JobChangesWaiter(object): - def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter): - """Initializes this class. - - @type filename: string - @param filename: Path to job file - - """ - self._filewaiter = None - self._filename = filename - self._waiter_cls = _waiter_cls - - def Wait(self, timeout): - """Waits for a job to change. - - @type timeout: float - @param timeout: Timeout in seconds - @return: Whether there have been events - - """ - if self._filewaiter: - return self._filewaiter.Wait(timeout) - - # Lazy setup: Avoid inotify setup cost when job file has already changed. - # If this point is reached, return immediately and let caller check the job - # file again in case there were changes since the last check. This avoids a - # race condition. - self._filewaiter = self._waiter_cls(self._filename) - - return True - - def Close(self): - """Closes underlying waiter. - - """ - if self._filewaiter: - self._filewaiter.Close() - - -class _WaitForJobChangesHelper(object): - """Helper class using inotify to wait for changes in a job file. - - This class takes a previous job status and serial, and alerts the client when - the current job status has changed. - - """ - @staticmethod - def _CheckForChanges(counter, job_load_fn, check_fn): - if counter.next() > 0: - # If this isn't the first check the job is given some more time to change - # again. This gives better performance for jobs generating many - # changes/messages. - time.sleep(0.1) - - job = job_load_fn() - if not job: - raise errors.JobLost() - - result = check_fn(job) - if result is None: - raise utils.RetryAgain() - - return result - - def __call__(self, filename, job_load_fn, - fields, prev_job_info, prev_log_serial, timeout, - _waiter_cls=_JobChangesWaiter): - """Waits for changes on a job. - - @type filename: string - @param filename: File on which to wait for changes - @type job_load_fn: callable - @param job_load_fn: Function to load job - @type fields: list of strings - @param fields: Which fields to check for changes - @type prev_job_info: list or None - @param prev_job_info: Last job information returned - @type prev_log_serial: int - @param prev_log_serial: Last job message serial number - @type timeout: float - @param timeout: maximum time to wait in seconds - - """ - counter = itertools.count() - try: - check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) - waiter = _waiter_cls(filename) - try: - return utils.Retry(compat.partial(self._CheckForChanges, - counter, job_load_fn, check_fn), - utils.RETRY_REMAINING_TIME, timeout, - wait_fn=waiter.Wait) - finally: - waiter.Close() - except errors.JobLost: - return None - except utils.RetryTimeout: - return constants.JOB_NOTCHANGED - - def _EncodeOpError(err): """Encodes an error which occurred while processing an opcode. @@ -2167,38 +1944,6 @@ class JobQueue(object): logging.debug("Writing job %s to %s", job.id, filename) self._UpdateJobQueueFile(filename, data, replicate) - def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, - timeout): - """Waits for changes in a job. - - @type job_id: int - @param job_id: Job identifier - @type fields: list of strings - @param fields: Which fields to check for changes - @type prev_job_info: list or None - @param prev_job_info: Last job information returned - @type prev_log_serial: int - @param prev_log_serial: Last job message serial number - @type timeout: float - @param timeout: maximum time to wait in seconds - @rtype: tuple (job info, log entries) - @return: a tuple of the job information as required via - the fields parameter, and the log entries as a list - - if the job has not changed and the timeout has expired, - we instead return a special value, - L{constants.JOB_NOTCHANGED}, which should be interpreted - as such by the clients - - """ - load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True, - writable=False) - - helper = _WaitForJobChangesHelper() - - return helper(self._GetJobPath(job_id), load_fn, - fields, prev_job_info, prev_log_serial, timeout) - def HasJobBeenFinalized(self, job_id): """Checks if a job has been finalized. diff --git a/test/py/ganeti.jqueue_unittest.py b/test/py/ganeti.jqueue_unittest.py index 78ade29..80e60f9 100755 --- a/test/py/ganeti.jqueue_unittest.py +++ b/test/py/ganeti.jqueue_unittest.py @@ -75,196 +75,6 @@ class _FakeJob: return self._log[newer_than:] -class TestJobChangesChecker(unittest.TestCase): - def testStatus(self): - job = _FakeJob(9094, constants.JOB_STATUS_QUEUED) - checker = jqueue._JobChangesChecker(["status"], None, None) - self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], [])) - - job.SetStatus(constants.JOB_STATUS_RUNNING) - self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], [])) - - job.SetStatus(constants.JOB_STATUS_SUCCESS) - self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], [])) - - # job.id is used by checker - self.assertEqual(job.id, 9094) - - def testStatusWithPrev(self): - job = _FakeJob(12807, constants.JOB_STATUS_QUEUED) - checker = jqueue._JobChangesChecker(["status"], - [constants.JOB_STATUS_QUEUED], None) - self.assert_(checker(job) is None) - - job.SetStatus(constants.JOB_STATUS_RUNNING) - self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], [])) - - def testFinalStatus(self): - for status in constants.JOBS_FINALIZED: - job = _FakeJob(2178711, status) - checker = jqueue._JobChangesChecker(["status"], [status], None) - # There won't be any changes in this status, hence it should signal - # a change immediately - self.assertEqual(checker(job), ([status], [])) - - def testLog(self): - job = _FakeJob(9094, constants.JOB_STATUS_RUNNING) - checker = jqueue._JobChangesChecker(["status"], None, None) - self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], [])) - - job.AddLogEntry("Hello World") - (job_info, log_entries) = checker(job) - self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING]) - self.assertEqual(log_entries, [[0, "Hello World"]]) - - checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries)) - self.assert_(checker2(job) is None) - - job.AddLogEntry("Foo Bar") - job.SetStatus(constants.JOB_STATUS_ERROR) - - (job_info, log_entries) = checker2(job) - self.assertEqual(job_info, [constants.JOB_STATUS_ERROR]) - self.assertEqual(log_entries, [[1, "Foo Bar"]]) - - checker3 = jqueue._JobChangesChecker(["status"], None, None) - (job_info, log_entries) = checker3(job) - self.assertEqual(job_info, [constants.JOB_STATUS_ERROR]) - self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]]) - - -class TestJobChangesWaiter(unittest.TestCase): - def setUp(self): - self.tmpdir = tempfile.mkdtemp() - self.filename = utils.PathJoin(self.tmpdir, "job-1") - utils.WriteFile(self.filename, data="") - - def tearDown(self): - shutil.rmtree(self.tmpdir) - - def _EnsureNotifierClosed(self, notifier): - try: - os.fstat(notifier._fd) - except EnvironmentError, err: - self.assertEqual(err.errno, errno.EBADF) - else: - self.fail("File descriptor wasn't closed") - - def testClose(self): - for wait in [False, True]: - waiter = jqueue._JobFileChangesWaiter(self.filename) - try: - if wait: - waiter.Wait(0.001) - finally: - waiter.Close() - - # Ensure file descriptor was closed - self._EnsureNotifierClosed(waiter._notifier) - - def testChangingFile(self): - waiter = jqueue._JobFileChangesWaiter(self.filename) - try: - self.assertFalse(waiter.Wait(0.1)) - utils.WriteFile(self.filename, data="changed") - self.assert_(waiter.Wait(60)) - finally: - waiter.Close() - - self._EnsureNotifierClosed(waiter._notifier) - - def testChangingFile2(self): - waiter = jqueue._JobChangesWaiter(self.filename) - try: - self.assertFalse(waiter._filewaiter) - self.assert_(waiter.Wait(0.1)) - self.assert_(waiter._filewaiter) - - # File waiter is now used, but there have been no changes - self.assertFalse(waiter.Wait(0.1)) - utils.WriteFile(self.filename, data="changed") - self.assert_(waiter.Wait(60)) - finally: - waiter.Close() - - self._EnsureNotifierClosed(waiter._filewaiter._notifier) - - -class _FailingWatchManager(pyinotify.WatchManager): - """Subclass of L{pyinotify.WatchManager} which always fails to register. - - """ - def add_watch(self, filename, mask): - assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] | - pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"]) - - return { - filename: -1, - } - - -class TestWaitForJobChangesHelper(unittest.TestCase): - def setUp(self): - self.tmpdir = tempfile.mkdtemp() - self.filename = utils.PathJoin(self.tmpdir, "job-2614226563") - utils.WriteFile(self.filename, data="") - - def tearDown(self): - shutil.rmtree(self.tmpdir) - - def _LoadWaitingJob(self): - return _FakeJob(2614226563, constants.JOB_STATUS_WAITING) - - def _LoadLostJob(self): - return None - - def testNoChanges(self): - wfjc = jqueue._WaitForJobChangesHelper() - - # No change - self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"], - [constants.JOB_STATUS_WAITING], None, 0.1), - constants.JOB_NOTCHANGED) - - # No previous information - self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, - ["status"], None, None, 1.0), - ([constants.JOB_STATUS_WAITING], [])) - - def testLostJob(self): - wfjc = jqueue._WaitForJobChangesHelper() - self.assert_(wfjc(self.filename, self._LoadLostJob, - ["status"], None, None, 1.0) is None) - - def testNonExistentFile(self): - wfjc = jqueue._WaitForJobChangesHelper() - - filename = utils.PathJoin(self.tmpdir, "does-not-exist") - self.assertFalse(os.path.exists(filename)) - - result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0, - _waiter_cls=compat.partial(jqueue._JobChangesWaiter, - _waiter_cls=NotImplemented)) - self.assertTrue(result is None) - - def testInotifyError(self): - jobfile_waiter_cls = \ - compat.partial(jqueue._JobFileChangesWaiter, - _inotify_wm_cls=_FailingWatchManager) - - jobchange_waiter_cls = \ - compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls) - - wfjc = jqueue._WaitForJobChangesHelper() - - # Test if failing to watch a job file (e.g. due to - # fs.inotify.max_user_watches being too low) raises errors.InotifyError - self.assertRaises(errors.InotifyError, wfjc, - self.filename, self._LoadWaitingJob, - ["status"], [constants.JOB_STATUS_WAITING], None, 1.0, - _waiter_cls=jobchange_waiter_cls) - - class TestEncodeOpError(unittest.TestCase): def test(self): encerr = jqueue._EncodeOpError(errors.LockError("Test 1")) -- 2.0.0.526.g5318336
