LGTM

On Tue, Jun 24, 2014 at 1:55 PM, 'Klaus Aehlig' via ganeti-devel <
[email protected]> wrote:

> All requests to watch a job for changes are now
> handled by luxid. So this code is unused in the
> Python job queue.
>
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
>  lib/jqueue/__init__.py            | 255
> --------------------------------------
>  test/py/ganeti.jqueue_unittest.py | 190 ----------------------------
>  2 files changed, 445 deletions(-)
>
> diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
> index 81d7cfc..370e957 100644
> --- a/lib/jqueue/__init__.py
> +++ b/lib/jqueue/__init__.py
> @@ -101,25 +101,6 @@ def _CallJqUpdate(runner, names, file_name, content):
>    return runner.call_jobqueue_update(names, virt_file_name, content)
>
>
> -class _SimpleJobQuery:
> -  """Wrapper for job queries.
> -
> -  Instance keeps list of fields cached, useful e.g. in
> L{_JobChangesChecker}.
> -
> -  """
> -  def __init__(self, fields):
> -    """Initializes this class.
> -
> -    """
> -    self._query = query.Query(query.JOB_FIELDS, fields)
> -
> -  def __call__(self, job):
> -    """Executes a job query using cached field list.
> -
> -    """
> -    return self._query.OldStyleQuery([(job.id, job)],
> sort_by_name=False)[0]
> -
> -
>  class _QueuedOpCode(object):
>    """Encapsulates an opcode object.
>
> @@ -666,210 +647,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
>      return self._queue.SubmitManyJobs(jobs)
>
>
> -class _JobChangesChecker(object):
> -  def __init__(self, fields, prev_job_info, prev_log_serial):
> -    """Initializes this class.
> -
> -    @type fields: list of strings
> -    @param fields: Fields requested by LUXI client
> -    @type prev_job_info: string
> -    @param prev_job_info: previous job info, as passed by the LUXI client
> -    @type prev_log_serial: string
> -    @param prev_log_serial: previous job serial, as passed by the LUXI
> client
> -
> -    """
> -    self._squery = _SimpleJobQuery(fields)
> -    self._prev_job_info = prev_job_info
> -    self._prev_log_serial = prev_log_serial
> -
> -  def __call__(self, job):
> -    """Checks whether job has changed.
> -
> -    @type job: L{_QueuedJob}
> -    @param job: Job object
> -
> -    """
> -    assert not job.writable, "Expected read-only job"
> -
> -    status = job.CalcStatus()
> -    job_info = self._squery(job)
> -    log_entries = job.GetLogEntries(self._prev_log_serial)
> -
> -    # Serializing and deserializing data can cause type changes (e.g. from
> -    # tuple to list) or precision loss. We're doing it here so that we get
> -    # the same modifications as the data received from the client. Without
> -    # this, the comparison afterwards might fail without the data being
> -    # significantly different.
> -    # TODO: we just deserialized from disk, investigate how to make sure
> that
> -    # the job info and log entries are compatible to avoid this further
> step.
> -    # TODO: Doing something like in testutils.py:UnifyValueType might be
> more
> -    # efficient, though floats will be tricky
> -    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
> -    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
> -
> -    # Don't even try to wait if the job is no longer running, there will
> be
> -    # no changes.
> -    if (status not in (constants.JOB_STATUS_QUEUED,
> -                       constants.JOB_STATUS_RUNNING,
> -                       constants.JOB_STATUS_WAITING) or
> -        job_info != self._prev_job_info or
> -        (log_entries and self._prev_log_serial != log_entries[0][0])):
> -      logging.debug("Job %s changed", job.id)
> -      return (job_info, log_entries)
> -
> -    return None
> -
> -
> -class _JobFileChangesWaiter(object):
> -  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
> -    """Initializes this class.
> -
> -    @type filename: string
> -    @param filename: Path to job file
> -    @raises errors.InotifyError: if the notifier cannot be setup
> -
> -    """
> -    self._wm = _inotify_wm_cls()
> -    self._inotify_handler = \
> -      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify,
> filename)
> -    self._notifier = \
> -      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
> -    try:
> -      self._inotify_handler.enable()
> -    except Exception:
> -      # pyinotify doesn't close file descriptors automatically
> -      self._notifier.stop()
> -      raise
> -
> -  def _OnInotify(self, notifier_enabled):
> -    """Callback for inotify.
> -
> -    """
> -    if not notifier_enabled:
> -      self._inotify_handler.enable()
> -
> -  def Wait(self, timeout):
> -    """Waits for the job file to change.
> -
> -    @type timeout: float
> -    @param timeout: Timeout in seconds
> -    @return: Whether there have been events
> -
> -    """
> -    assert timeout >= 0
> -    have_events = self._notifier.check_events(timeout * 1000)
> -    if have_events:
> -      self._notifier.read_events()
> -    self._notifier.process_events()
> -    return have_events
> -
> -  def Close(self):
> -    """Closes underlying notifier and its file descriptor.
> -
> -    """
> -    self._notifier.stop()
> -
> -
> -class _JobChangesWaiter(object):
> -  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
> -    """Initializes this class.
> -
> -    @type filename: string
> -    @param filename: Path to job file
> -
> -    """
> -    self._filewaiter = None
> -    self._filename = filename
> -    self._waiter_cls = _waiter_cls
> -
> -  def Wait(self, timeout):
> -    """Waits for a job to change.
> -
> -    @type timeout: float
> -    @param timeout: Timeout in seconds
> -    @return: Whether there have been events
> -
> -    """
> -    if self._filewaiter:
> -      return self._filewaiter.Wait(timeout)
> -
> -    # Lazy setup: Avoid inotify setup cost when job file has already
> changed.
> -    # If this point is reached, return immediately and let caller check
> the job
> -    # file again in case there were changes since the last check. This
> avoids a
> -    # race condition.
> -    self._filewaiter = self._waiter_cls(self._filename)
> -
> -    return True
> -
> -  def Close(self):
> -    """Closes underlying waiter.
> -
> -    """
> -    if self._filewaiter:
> -      self._filewaiter.Close()
> -
> -
> -class _WaitForJobChangesHelper(object):
> -  """Helper class using inotify to wait for changes in a job file.
> -
> -  This class takes a previous job status and serial, and alerts the
> client when
> -  the current job status has changed.
> -
> -  """
> -  @staticmethod
> -  def _CheckForChanges(counter, job_load_fn, check_fn):
> -    if counter.next() > 0:
> -      # If this isn't the first check the job is given some more time to
> change
> -      # again. This gives better performance for jobs generating many
> -      # changes/messages.
> -      time.sleep(0.1)
> -
> -    job = job_load_fn()
> -    if not job:
> -      raise errors.JobLost()
> -
> -    result = check_fn(job)
> -    if result is None:
> -      raise utils.RetryAgain()
> -
> -    return result
> -
> -  def __call__(self, filename, job_load_fn,
> -               fields, prev_job_info, prev_log_serial, timeout,
> -               _waiter_cls=_JobChangesWaiter):
> -    """Waits for changes on a job.
> -
> -    @type filename: string
> -    @param filename: File on which to wait for changes
> -    @type job_load_fn: callable
> -    @param job_load_fn: Function to load job
> -    @type fields: list of strings
> -    @param fields: Which fields to check for changes
> -    @type prev_job_info: list or None
> -    @param prev_job_info: Last job information returned
> -    @type prev_log_serial: int
> -    @param prev_log_serial: Last job message serial number
> -    @type timeout: float
> -    @param timeout: maximum time to wait in seconds
> -
> -    """
> -    counter = itertools.count()
> -    try:
> -      check_fn = _JobChangesChecker(fields, prev_job_info,
> prev_log_serial)
> -      waiter = _waiter_cls(filename)
> -      try:
> -        return utils.Retry(compat.partial(self._CheckForChanges,
> -                                          counter, job_load_fn, check_fn),
> -                           utils.RETRY_REMAINING_TIME, timeout,
> -                           wait_fn=waiter.Wait)
> -      finally:
> -        waiter.Close()
> -    except errors.JobLost:
> -      return None
> -    except utils.RetryTimeout:
> -      return constants.JOB_NOTCHANGED
> -
> -
>  def _EncodeOpError(err):
>    """Encodes an error which occurred while processing an opcode.
>
> @@ -2167,38 +1944,6 @@ class JobQueue(object):
>      logging.debug("Writing job %s to %s", job.id, filename)
>      self._UpdateJobQueueFile(filename, data, replicate)
>
> -  def WaitForJobChanges(self, job_id, fields, prev_job_info,
> prev_log_serial,
> -                        timeout):
> -    """Waits for changes in a job.
> -
> -    @type job_id: int
> -    @param job_id: Job identifier
> -    @type fields: list of strings
> -    @param fields: Which fields to check for changes
> -    @type prev_job_info: list or None
> -    @param prev_job_info: Last job information returned
> -    @type prev_log_serial: int
> -    @param prev_log_serial: Last job message serial number
> -    @type timeout: float
> -    @param timeout: maximum time to wait in seconds
> -    @rtype: tuple (job info, log entries)
> -    @return: a tuple of the job information as required via
> -        the fields parameter, and the log entries as a list
> -
> -        if the job has not changed and the timeout has expired,
> -        we instead return a special value,
> -        L{constants.JOB_NOTCHANGED}, which should be interpreted
> -        as such by the clients
> -
> -    """
> -    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
> -                             writable=False)
> -
> -    helper = _WaitForJobChangesHelper()
> -
> -    return helper(self._GetJobPath(job_id), load_fn,
> -                  fields, prev_job_info, prev_log_serial, timeout)
> -
>    def HasJobBeenFinalized(self, job_id):
>      """Checks if a job has been finalized.
>
> diff --git a/test/py/ganeti.jqueue_unittest.py b/test/py/
> ganeti.jqueue_unittest.py
> index 78ade29..80e60f9 100755
> --- a/test/py/ganeti.jqueue_unittest.py
> +++ b/test/py/ganeti.jqueue_unittest.py
> @@ -75,196 +75,6 @@ class _FakeJob:
>      return self._log[newer_than:]
>
>
> -class TestJobChangesChecker(unittest.TestCase):
> -  def testStatus(self):
> -    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
> -    checker = jqueue._JobChangesChecker(["status"], None, None)
> -    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
> -
> -    job.SetStatus(constants.JOB_STATUS_RUNNING)
> -    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
> -
> -    job.SetStatus(constants.JOB_STATUS_SUCCESS)
> -    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
> -
> -    # job.id is used by checker
> -    self.assertEqual(job.id, 9094)
> -
> -  def testStatusWithPrev(self):
> -    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
> -    checker = jqueue._JobChangesChecker(["status"],
> -                                        [constants.JOB_STATUS_QUEUED],
> None)
> -    self.assert_(checker(job) is None)
> -
> -    job.SetStatus(constants.JOB_STATUS_RUNNING)
> -    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
> -
> -  def testFinalStatus(self):
> -    for status in constants.JOBS_FINALIZED:
> -      job = _FakeJob(2178711, status)
> -      checker = jqueue._JobChangesChecker(["status"], [status], None)
> -      # There won't be any changes in this status, hence it should signal
> -      # a change immediately
> -      self.assertEqual(checker(job), ([status], []))
> -
> -  def testLog(self):
> -    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
> -    checker = jqueue._JobChangesChecker(["status"], None, None)
> -    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
> -
> -    job.AddLogEntry("Hello World")
> -    (job_info, log_entries) = checker(job)
> -    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
> -    self.assertEqual(log_entries, [[0, "Hello World"]])
> -
> -    checker2 = jqueue._JobChangesChecker(["status"], job_info,
> len(log_entries))
> -    self.assert_(checker2(job) is None)
> -
> -    job.AddLogEntry("Foo Bar")
> -    job.SetStatus(constants.JOB_STATUS_ERROR)
> -
> -    (job_info, log_entries) = checker2(job)
> -    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
> -    self.assertEqual(log_entries, [[1, "Foo Bar"]])
> -
> -    checker3 = jqueue._JobChangesChecker(["status"], None, None)
> -    (job_info, log_entries) = checker3(job)
> -    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
> -    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
> -
> -
> -class TestJobChangesWaiter(unittest.TestCase):
> -  def setUp(self):
> -    self.tmpdir = tempfile.mkdtemp()
> -    self.filename = utils.PathJoin(self.tmpdir, "job-1")
> -    utils.WriteFile(self.filename, data="")
> -
> -  def tearDown(self):
> -    shutil.rmtree(self.tmpdir)
> -
> -  def _EnsureNotifierClosed(self, notifier):
> -    try:
> -      os.fstat(notifier._fd)
> -    except EnvironmentError, err:
> -      self.assertEqual(err.errno, errno.EBADF)
> -    else:
> -      self.fail("File descriptor wasn't closed")
> -
> -  def testClose(self):
> -    for wait in [False, True]:
> -      waiter = jqueue._JobFileChangesWaiter(self.filename)
> -      try:
> -        if wait:
> -          waiter.Wait(0.001)
> -      finally:
> -        waiter.Close()
> -
> -      # Ensure file descriptor was closed
> -      self._EnsureNotifierClosed(waiter._notifier)
> -
> -  def testChangingFile(self):
> -    waiter = jqueue._JobFileChangesWaiter(self.filename)
> -    try:
> -      self.assertFalse(waiter.Wait(0.1))
> -      utils.WriteFile(self.filename, data="changed")
> -      self.assert_(waiter.Wait(60))
> -    finally:
> -      waiter.Close()
> -
> -    self._EnsureNotifierClosed(waiter._notifier)
> -
> -  def testChangingFile2(self):
> -    waiter = jqueue._JobChangesWaiter(self.filename)
> -    try:
> -      self.assertFalse(waiter._filewaiter)
> -      self.assert_(waiter.Wait(0.1))
> -      self.assert_(waiter._filewaiter)
> -
> -      # File waiter is now used, but there have been no changes
> -      self.assertFalse(waiter.Wait(0.1))
> -      utils.WriteFile(self.filename, data="changed")
> -      self.assert_(waiter.Wait(60))
> -    finally:
> -      waiter.Close()
> -
> -    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
> -
> -
> -class _FailingWatchManager(pyinotify.WatchManager):
> -  """Subclass of L{pyinotify.WatchManager} which always fails to register.
> -
> -  """
> -  def add_watch(self, filename, mask):
> -    assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] |
> -                    pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"])
> -
> -    return {
> -      filename: -1,
> -      }
> -
> -
> -class TestWaitForJobChangesHelper(unittest.TestCase):
> -  def setUp(self):
> -    self.tmpdir = tempfile.mkdtemp()
> -    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
> -    utils.WriteFile(self.filename, data="")
> -
> -  def tearDown(self):
> -    shutil.rmtree(self.tmpdir)
> -
> -  def _LoadWaitingJob(self):
> -    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
> -
> -  def _LoadLostJob(self):
> -    return None
> -
> -  def testNoChanges(self):
> -    wfjc = jqueue._WaitForJobChangesHelper()
> -
> -    # No change
> -    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
> -                          [constants.JOB_STATUS_WAITING], None, 0.1),
> -                     constants.JOB_NOTCHANGED)
> -
> -    # No previous information
> -    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
> -                          ["status"], None, None, 1.0),
> -                     ([constants.JOB_STATUS_WAITING], []))
> -
> -  def testLostJob(self):
> -    wfjc = jqueue._WaitForJobChangesHelper()
> -    self.assert_(wfjc(self.filename, self._LoadLostJob,
> -                      ["status"], None, None, 1.0) is None)
> -
> -  def testNonExistentFile(self):
> -    wfjc = jqueue._WaitForJobChangesHelper()
> -
> -    filename = utils.PathJoin(self.tmpdir, "does-not-exist")
> -    self.assertFalse(os.path.exists(filename))
> -
> -    result = wfjc(filename, self._LoadLostJob, ["status"], None, None,
> 1.0,
> -                  _waiter_cls=compat.partial(jqueue._JobChangesWaiter,
> -                                             _waiter_cls=NotImplemented))
> -    self.assertTrue(result is None)
> -
> -  def testInotifyError(self):
> -    jobfile_waiter_cls = \
> -      compat.partial(jqueue._JobFileChangesWaiter,
> -                     _inotify_wm_cls=_FailingWatchManager)
> -
> -    jobchange_waiter_cls = \
> -      compat.partial(jqueue._JobChangesWaiter,
> _waiter_cls=jobfile_waiter_cls)
> -
> -    wfjc = jqueue._WaitForJobChangesHelper()
> -
> -    # Test if failing to watch a job file (e.g. due to
> -    # fs.inotify.max_user_watches being too low) raises
> errors.InotifyError
> -    self.assertRaises(errors.InotifyError, wfjc,
> -                      self.filename, self._LoadWaitingJob,
> -                      ["status"], [constants.JOB_STATUS_WAITING], None,
> 1.0,
> -                      _waiter_cls=jobchange_waiter_cls)
> -
> -
>  class TestEncodeOpError(unittest.TestCase):
>    def test(self):
>      encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
> --
> 2.0.0.526.g5318336
>
>

Reply via email to