All requests to watch a job for changes are now
handled by luxid. So this code is unused in the
Python job queue.

Signed-off-by: Klaus Aehlig <[email protected]>
---
 lib/jqueue/__init__.py            | 255 --------------------------------------
 test/py/ganeti.jqueue_unittest.py | 190 ----------------------------
 2 files changed, 445 deletions(-)

diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
index 81d7cfc..370e957 100644
--- a/lib/jqueue/__init__.py
+++ b/lib/jqueue/__init__.py
@@ -101,25 +101,6 @@ def _CallJqUpdate(runner, names, file_name, content):
   return runner.call_jobqueue_update(names, virt_file_name, content)
 
 
-class _SimpleJobQuery:
-  """Wrapper for job queries.
-
-  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
-
-  """
-  def __init__(self, fields):
-    """Initializes this class.
-
-    """
-    self._query = query.Query(query.JOB_FIELDS, fields)
-
-  def __call__(self, job):
-    """Executes a job query using cached field list.
-
-    """
-    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
-
-
 class _QueuedOpCode(object):
   """Encapsulates an opcode object.
 
@@ -666,210 +647,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     return self._queue.SubmitManyJobs(jobs)
 
 
-class _JobChangesChecker(object):
-  def __init__(self, fields, prev_job_info, prev_log_serial):
-    """Initializes this class.
-
-    @type fields: list of strings
-    @param fields: Fields requested by LUXI client
-    @type prev_job_info: string
-    @param prev_job_info: previous job info, as passed by the LUXI client
-    @type prev_log_serial: string
-    @param prev_log_serial: previous job serial, as passed by the LUXI client
-
-    """
-    self._squery = _SimpleJobQuery(fields)
-    self._prev_job_info = prev_job_info
-    self._prev_log_serial = prev_log_serial
-
-  def __call__(self, job):
-    """Checks whether job has changed.
-
-    @type job: L{_QueuedJob}
-    @param job: Job object
-
-    """
-    assert not job.writable, "Expected read-only job"
-
-    status = job.CalcStatus()
-    job_info = self._squery(job)
-    log_entries = job.GetLogEntries(self._prev_log_serial)
-
-    # Serializing and deserializing data can cause type changes (e.g. from
-    # tuple to list) or precision loss. We're doing it here so that we get
-    # the same modifications as the data received from the client. Without
-    # this, the comparison afterwards might fail without the data being
-    # significantly different.
-    # TODO: we just deserialized from disk, investigate how to make sure that
-    # the job info and log entries are compatible to avoid this further step.
-    # TODO: Doing something like in testutils.py:UnifyValueType might be more
-    # efficient, though floats will be tricky
-    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
-    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
-
-    # Don't even try to wait if the job is no longer running, there will be
-    # no changes.
-    if (status not in (constants.JOB_STATUS_QUEUED,
-                       constants.JOB_STATUS_RUNNING,
-                       constants.JOB_STATUS_WAITING) or
-        job_info != self._prev_job_info or
-        (log_entries and self._prev_log_serial != log_entries[0][0])):
-      logging.debug("Job %s changed", job.id)
-      return (job_info, log_entries)
-
-    return None
-
-
-class _JobFileChangesWaiter(object):
-  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
-    """Initializes this class.
-
-    @type filename: string
-    @param filename: Path to job file
-    @raises errors.InotifyError: if the notifier cannot be setup
-
-    """
-    self._wm = _inotify_wm_cls()
-    self._inotify_handler = \
-      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
-    self._notifier = \
-      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
-    try:
-      self._inotify_handler.enable()
-    except Exception:
-      # pyinotify doesn't close file descriptors automatically
-      self._notifier.stop()
-      raise
-
-  def _OnInotify(self, notifier_enabled):
-    """Callback for inotify.
-
-    """
-    if not notifier_enabled:
-      self._inotify_handler.enable()
-
-  def Wait(self, timeout):
-    """Waits for the job file to change.
-
-    @type timeout: float
-    @param timeout: Timeout in seconds
-    @return: Whether there have been events
-
-    """
-    assert timeout >= 0
-    have_events = self._notifier.check_events(timeout * 1000)
-    if have_events:
-      self._notifier.read_events()
-    self._notifier.process_events()
-    return have_events
-
-  def Close(self):
-    """Closes underlying notifier and its file descriptor.
-
-    """
-    self._notifier.stop()
-
-
-class _JobChangesWaiter(object):
-  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
-    """Initializes this class.
-
-    @type filename: string
-    @param filename: Path to job file
-
-    """
-    self._filewaiter = None
-    self._filename = filename
-    self._waiter_cls = _waiter_cls
-
-  def Wait(self, timeout):
-    """Waits for a job to change.
-
-    @type timeout: float
-    @param timeout: Timeout in seconds
-    @return: Whether there have been events
-
-    """
-    if self._filewaiter:
-      return self._filewaiter.Wait(timeout)
-
-    # Lazy setup: Avoid inotify setup cost when job file has already changed.
-    # If this point is reached, return immediately and let caller check the job
-    # file again in case there were changes since the last check. This avoids a
-    # race condition.
-    self._filewaiter = self._waiter_cls(self._filename)
-
-    return True
-
-  def Close(self):
-    """Closes underlying waiter.
-
-    """
-    if self._filewaiter:
-      self._filewaiter.Close()
-
-
-class _WaitForJobChangesHelper(object):
-  """Helper class using inotify to wait for changes in a job file.
-
-  This class takes a previous job status and serial, and alerts the client when
-  the current job status has changed.
-
-  """
-  @staticmethod
-  def _CheckForChanges(counter, job_load_fn, check_fn):
-    if counter.next() > 0:
-      # If this isn't the first check the job is given some more time to change
-      # again. This gives better performance for jobs generating many
-      # changes/messages.
-      time.sleep(0.1)
-
-    job = job_load_fn()
-    if not job:
-      raise errors.JobLost()
-
-    result = check_fn(job)
-    if result is None:
-      raise utils.RetryAgain()
-
-    return result
-
-  def __call__(self, filename, job_load_fn,
-               fields, prev_job_info, prev_log_serial, timeout,
-               _waiter_cls=_JobChangesWaiter):
-    """Waits for changes on a job.
-
-    @type filename: string
-    @param filename: File on which to wait for changes
-    @type job_load_fn: callable
-    @param job_load_fn: Function to load job
-    @type fields: list of strings
-    @param fields: Which fields to check for changes
-    @type prev_job_info: list or None
-    @param prev_job_info: Last job information returned
-    @type prev_log_serial: int
-    @param prev_log_serial: Last job message serial number
-    @type timeout: float
-    @param timeout: maximum time to wait in seconds
-
-    """
-    counter = itertools.count()
-    try:
-      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
-      waiter = _waiter_cls(filename)
-      try:
-        return utils.Retry(compat.partial(self._CheckForChanges,
-                                          counter, job_load_fn, check_fn),
-                           utils.RETRY_REMAINING_TIME, timeout,
-                           wait_fn=waiter.Wait)
-      finally:
-        waiter.Close()
-    except errors.JobLost:
-      return None
-    except utils.RetryTimeout:
-      return constants.JOB_NOTCHANGED
-
-
 def _EncodeOpError(err):
   """Encodes an error which occurred while processing an opcode.
 
@@ -2167,38 +1944,6 @@ class JobQueue(object):
     logging.debug("Writing job %s to %s", job.id, filename)
     self._UpdateJobQueueFile(filename, data, replicate)
 
-  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
-                        timeout):
-    """Waits for changes in a job.
-
-    @type job_id: int
-    @param job_id: Job identifier
-    @type fields: list of strings
-    @param fields: Which fields to check for changes
-    @type prev_job_info: list or None
-    @param prev_job_info: Last job information returned
-    @type prev_log_serial: int
-    @param prev_log_serial: Last job message serial number
-    @type timeout: float
-    @param timeout: maximum time to wait in seconds
-    @rtype: tuple (job info, log entries)
-    @return: a tuple of the job information as required via
-        the fields parameter, and the log entries as a list
-
-        if the job has not changed and the timeout has expired,
-        we instead return a special value,
-        L{constants.JOB_NOTCHANGED}, which should be interpreted
-        as such by the clients
-
-    """
-    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
-                             writable=False)
-
-    helper = _WaitForJobChangesHelper()
-
-    return helper(self._GetJobPath(job_id), load_fn,
-                  fields, prev_job_info, prev_log_serial, timeout)
-
   def HasJobBeenFinalized(self, job_id):
     """Checks if a job has been finalized.
 
diff --git a/test/py/ganeti.jqueue_unittest.py 
b/test/py/ganeti.jqueue_unittest.py
index 78ade29..80e60f9 100755
--- a/test/py/ganeti.jqueue_unittest.py
+++ b/test/py/ganeti.jqueue_unittest.py
@@ -75,196 +75,6 @@ class _FakeJob:
     return self._log[newer_than:]
 
 
-class TestJobChangesChecker(unittest.TestCase):
-  def testStatus(self):
-    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
-    checker = jqueue._JobChangesChecker(["status"], None, None)
-    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
-
-    job.SetStatus(constants.JOB_STATUS_RUNNING)
-    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
-
-    job.SetStatus(constants.JOB_STATUS_SUCCESS)
-    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
-
-    # job.id is used by checker
-    self.assertEqual(job.id, 9094)
-
-  def testStatusWithPrev(self):
-    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
-    checker = jqueue._JobChangesChecker(["status"],
-                                        [constants.JOB_STATUS_QUEUED], None)
-    self.assert_(checker(job) is None)
-
-    job.SetStatus(constants.JOB_STATUS_RUNNING)
-    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
-
-  def testFinalStatus(self):
-    for status in constants.JOBS_FINALIZED:
-      job = _FakeJob(2178711, status)
-      checker = jqueue._JobChangesChecker(["status"], [status], None)
-      # There won't be any changes in this status, hence it should signal
-      # a change immediately
-      self.assertEqual(checker(job), ([status], []))
-
-  def testLog(self):
-    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
-    checker = jqueue._JobChangesChecker(["status"], None, None)
-    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
-
-    job.AddLogEntry("Hello World")
-    (job_info, log_entries) = checker(job)
-    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
-    self.assertEqual(log_entries, [[0, "Hello World"]])
-
-    checker2 = jqueue._JobChangesChecker(["status"], job_info, 
len(log_entries))
-    self.assert_(checker2(job) is None)
-
-    job.AddLogEntry("Foo Bar")
-    job.SetStatus(constants.JOB_STATUS_ERROR)
-
-    (job_info, log_entries) = checker2(job)
-    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
-    self.assertEqual(log_entries, [[1, "Foo Bar"]])
-
-    checker3 = jqueue._JobChangesChecker(["status"], None, None)
-    (job_info, log_entries) = checker3(job)
-    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
-    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
-
-
-class TestJobChangesWaiter(unittest.TestCase):
-  def setUp(self):
-    self.tmpdir = tempfile.mkdtemp()
-    self.filename = utils.PathJoin(self.tmpdir, "job-1")
-    utils.WriteFile(self.filename, data="")
-
-  def tearDown(self):
-    shutil.rmtree(self.tmpdir)
-
-  def _EnsureNotifierClosed(self, notifier):
-    try:
-      os.fstat(notifier._fd)
-    except EnvironmentError, err:
-      self.assertEqual(err.errno, errno.EBADF)
-    else:
-      self.fail("File descriptor wasn't closed")
-
-  def testClose(self):
-    for wait in [False, True]:
-      waiter = jqueue._JobFileChangesWaiter(self.filename)
-      try:
-        if wait:
-          waiter.Wait(0.001)
-      finally:
-        waiter.Close()
-
-      # Ensure file descriptor was closed
-      self._EnsureNotifierClosed(waiter._notifier)
-
-  def testChangingFile(self):
-    waiter = jqueue._JobFileChangesWaiter(self.filename)
-    try:
-      self.assertFalse(waiter.Wait(0.1))
-      utils.WriteFile(self.filename, data="changed")
-      self.assert_(waiter.Wait(60))
-    finally:
-      waiter.Close()
-
-    self._EnsureNotifierClosed(waiter._notifier)
-
-  def testChangingFile2(self):
-    waiter = jqueue._JobChangesWaiter(self.filename)
-    try:
-      self.assertFalse(waiter._filewaiter)
-      self.assert_(waiter.Wait(0.1))
-      self.assert_(waiter._filewaiter)
-
-      # File waiter is now used, but there have been no changes
-      self.assertFalse(waiter.Wait(0.1))
-      utils.WriteFile(self.filename, data="changed")
-      self.assert_(waiter.Wait(60))
-    finally:
-      waiter.Close()
-
-    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
-
-
-class _FailingWatchManager(pyinotify.WatchManager):
-  """Subclass of L{pyinotify.WatchManager} which always fails to register.
-
-  """
-  def add_watch(self, filename, mask):
-    assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] |
-                    pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"])
-
-    return {
-      filename: -1,
-      }
-
-
-class TestWaitForJobChangesHelper(unittest.TestCase):
-  def setUp(self):
-    self.tmpdir = tempfile.mkdtemp()
-    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
-    utils.WriteFile(self.filename, data="")
-
-  def tearDown(self):
-    shutil.rmtree(self.tmpdir)
-
-  def _LoadWaitingJob(self):
-    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
-
-  def _LoadLostJob(self):
-    return None
-
-  def testNoChanges(self):
-    wfjc = jqueue._WaitForJobChangesHelper()
-
-    # No change
-    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
-                          [constants.JOB_STATUS_WAITING], None, 0.1),
-                     constants.JOB_NOTCHANGED)
-
-    # No previous information
-    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
-                          ["status"], None, None, 1.0),
-                     ([constants.JOB_STATUS_WAITING], []))
-
-  def testLostJob(self):
-    wfjc = jqueue._WaitForJobChangesHelper()
-    self.assert_(wfjc(self.filename, self._LoadLostJob,
-                      ["status"], None, None, 1.0) is None)
-
-  def testNonExistentFile(self):
-    wfjc = jqueue._WaitForJobChangesHelper()
-
-    filename = utils.PathJoin(self.tmpdir, "does-not-exist")
-    self.assertFalse(os.path.exists(filename))
-
-    result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0,
-                  _waiter_cls=compat.partial(jqueue._JobChangesWaiter,
-                                             _waiter_cls=NotImplemented))
-    self.assertTrue(result is None)
-
-  def testInotifyError(self):
-    jobfile_waiter_cls = \
-      compat.partial(jqueue._JobFileChangesWaiter,
-                     _inotify_wm_cls=_FailingWatchManager)
-
-    jobchange_waiter_cls = \
-      compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls)
-
-    wfjc = jqueue._WaitForJobChangesHelper()
-
-    # Test if failing to watch a job file (e.g. due to
-    # fs.inotify.max_user_watches being too low) raises errors.InotifyError
-    self.assertRaises(errors.InotifyError, wfjc,
-                      self.filename, self._LoadWaitingJob,
-                      ["status"], [constants.JOB_STATUS_WAITING], None, 1.0,
-                      _waiter_cls=jobchange_waiter_cls)
-
-
 class TestEncodeOpError(unittest.TestCase):
   def test(self):
     encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
-- 
2.0.0.526.g5318336

Reply via email to