Nir Soffer has uploaded a new change for review.

Change subject: schedule: Introduce scheduling libary
......................................................................

schedule: Introduce scheduling libary

This moudule provides a Scheduler class scheduling execution of
callables on a background thread.

This should be part of the new scalable vm sampling implemntation, and
can be used also whenever you like to perform a short task on a
background thread, without waiting for the completion of the task.

See the module docstring and tests for usage examples.

Change-Id: Ie3764806d93bd37c3b5924080eb5ae4d29e4f4e0
Signed-off-by: Nir Soffer <[email protected]>
---
M debian/vdsm-python.install
M lib/vdsm/Makefile.am
A lib/vdsm/schedule.py
M tests/Makefile.am
A tests/scheduleTests.py
M vdsm.spec.in
6 files changed, 346 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/07/29607/1

diff --git a/debian/vdsm-python.install b/debian/vdsm-python.install
index 2d4bba6..1775241 100644
--- a/debian/vdsm-python.install
+++ b/debian/vdsm-python.install
@@ -16,6 +16,7 @@
 ./usr/lib/python2.7/dist-packages/vdsm/netlink/link.py
 ./usr/lib/python2.7/dist-packages/vdsm/profile.py
 ./usr/lib/python2.7/dist-packages/vdsm/qemuimg.py
+./usr/lib/python2.7/dist-packages/vdsm/schedule.py
 ./usr/lib/python2.7/dist-packages/vdsm/sslutils.py
 ./usr/lib/python2.7/dist-packages/vdsm/tool/__init__.py
 ./usr/lib/python2.7/dist-packages/vdsm/tool/dummybr.py
diff --git a/lib/vdsm/Makefile.am b/lib/vdsm/Makefile.am
index c074bb3..89a5573 100644
--- a/lib/vdsm/Makefile.am
+++ b/lib/vdsm/Makefile.am
@@ -32,6 +32,7 @@
        netinfo.py \
        profile.py \
        qemuimg.py \
+       schedule.py \
        SecureXMLRPCServer.py \
        sslutils.py \
        utils.py \
diff --git a/lib/vdsm/schedule.py b/lib/vdsm/schedule.py
new file mode 100644
index 0000000..bd75924
--- /dev/null
+++ b/lib/vdsm/schedule.py
@@ -0,0 +1,213 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+"""
+This module provides a Scheduler class scheduling execution of
+a callable on a background thread.
+
+To use a scheduler, create an instance:
+
+    scheduler = schedule.Scheduler()
+
+When you want to schedule some callable:
+
+    def task():
+        print '30 seconds passed'
+
+    scheduler.schedule(30.0, task)
+
+task will be called after 30.0 seconds on the scheduler background thread.
+
+If you need to cancel a scheduled call, keep the ScheduledCall object returned
+from Scheduler.schedule(), and cancel the task:
+
+    scheduled_call = scheduler.schedule(30.0, call)
+    ...
+    scheduled_call.cancel()
+
+Finally, when the scheduler is not needed any more:
+
+    scheduler.cancel()
+
+This will cancel any pending calls and terminate the scheduler thread.
+"""
+
+import heapq
+import logging
+import threading
+import time
+
+from . import utils
+
+
+class Scheduler(object):
+    """
+    Schedule calls for future execution in a background thread.
+
+    This class is thread safe; multiple threads can schedule calls or cancel
+    the scheudler.
+    """
+
+    DEFAULT_DELAY = 30.0  # Used if no timeout are scheduled
+
+    _log = logging.getLogger("vds.Scheduler")
+
+    def __init__(self):
+        self._log.debug("Starting scheduler")
+        self._cond = threading.Condition(threading.Lock())
+        self._running = True
+        self._timeouts = []
+        t = threading.Thread(target=self._run)
+        t.daemon = True
+        t.start()
+
+    def schedule(self, delay, callee):
+        """
+        Schedule callee to be called after delay seconds on the scheduler
+        thread.
+
+        Callee must not block or take excessive time to complete. It it does
+        not finish quickly, it may delay other scheduled calls on the scheduler
+        thread.
+
+        Returns a ScheduledCall that may be canceled if callee was not called
+        yet.
+        """
+        deadline = time.time() + delay
+        timeout = _Timeout(deadline, callee)
+        self._log.debug("Schedulng %s", timeout)
+        with self._cond:
+            if self._running:
+                heapq.heappush(self._timeouts, timeout)
+                self._cond.notify()
+            else:
+                timeout.cancel()
+        return ScheduledCall(timeout)
+
+    def cancel(self):
+        """
+        Cancel all schedueld calls and invalidate the scheduler. Calls
+        scheduled after a scheduler was cancel will never be called.
+        """
+        self._log.debug("Canceling scheduler")
+        with self._cond:
+            self._running = False
+            self._cond.notify()
+
+    @utils.traceback(on=_log.name)
+    def _run(self):
+        try:
+            self._log.debug("started")
+            self._loop()
+            self._log.debug("canceled")
+        finally:
+            self._cleanup()
+
+    def _loop(self):
+        while True:
+            with self._cond:
+                if not self._running:
+                    return
+                delay = self._time_until_deadline()
+                if delay > 0.0:
+                    self._cond.wait(delay)
+                    if not self._running:
+                        return
+                expired = self._pop_expired_timeouts()
+            for timeout in expired:
+                timeout.fire()
+
+    def _time_until_deadline(self):
+        if self._timeouts:
+            return self._timeouts[0].deadline - time.time()
+        return self.DEFAULT_DELAY
+
+    def _pop_expired_timeouts(self):
+        now = time.time()
+        expired = []
+        while self._timeouts:
+            timeout = self._timeouts[0]
+            if timeout.deadline > now:
+                break
+            heapq.heappop(self._timeouts)
+            expired.append(timeout)
+        return expired
+
+    def _cleanup(self):
+        # Help the garbage collector by breaking reference cycles
+        with self._cond:
+            for timeout in self._timeouts:
+                timeout.cancel()
+
+
+class ScheduledCall(object):
+    """
+    Returned when a callable is scheduled to be called after delay. The caller
+    may cancel the call if it was not called yet.
+
+    This class is thread safe; any thread can cacnel a call.
+    """
+
+    _log = logging.getLogger("vds.Scheduler")
+
+    def __init__(self, timeout):
+        self._timeout = timeout
+
+    @property
+    def deadline(self):
+        return self._timeout.deadline
+
+    def cancel(self):
+        self._log.debug("Canceling %s", self)
+        self._timeout.cancel()
+
+
+# Sentinel for marking timeouts as invalid. Callable so we can invaliate a
+# timeout in a thread safe manner without locks.
+def _INVALID():
+    pass
+
+
+class _Timeout(object):
+    """
+    Created for each scheduled call.
+    """
+
+    _log = logging.getLogger('vds.Timeout')
+
+    def __init__(self, deadline, callee):
+        self.deadline = deadline
+        self.callee = callee
+
+    def fire(self):
+        if self.callee is _INVALID:
+            return
+        try:
+            self.callee()
+        except Exception:
+            self._log.exception("Unhandled exception in scheduled call")
+        finally:
+            self.callee = _INVALID
+
+    def cancel(self):
+        self.callee = _INVALID
+
+    def __cmp__(self, other):
+        return cmp(self.deadline, other.deadline)
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 6507165..786dea4 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -65,6 +65,7 @@
        remoteFileHandlerTests.py \
        resourceManagerTests.py \
        samplingTests.py \
+       scheduleTests.py \
        schemaTests.py \
        securableTests.py \
        sslTests.py \
diff --git a/tests/scheduleTests.py b/tests/scheduleTests.py
new file mode 100644
index 0000000..0c370d9
--- /dev/null
+++ b/tests/scheduleTests.py
@@ -0,0 +1,129 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import threading
+import time
+
+from vdsm import schedule
+from testrunner import VdsmTestCase
+
+
+class SchedulerTests(VdsmTestCase):
+
+    # Time  to wait for completion, so test will not fail on overloaded
+    # machines. If tests fails on CI, increase this value.
+    GRACETIME = 0.1
+
+    MAX_TASKS = 1000
+
+    def setUp(self):
+        self.scheduler = schedule.Scheduler()
+
+    def tearDown(self):
+        self.scheduler.cancel()
+
+    def test_schedule(self):
+        delay = 0.3
+        task1 = Task()
+        task2 = Task()
+        timeout1 = self.scheduler.schedule(delay, task1)
+        self.scheduler.schedule(10, task2)
+        task1.wait(delay + self.GRACETIME)
+        self.assertTrue(timeout1.deadline <= task1.call_time)
+        self.assertTrue(task1.call_time < timeout1.deadline + self.GRACETIME)
+        self.assertEquals(task2.call_time, None)
+
+    def test_schedule_many(self):
+        delay = 0.3
+        tasks = []
+        for i in range(self.MAX_TASKS):
+            task = Task()
+            timeout = self.scheduler.schedule(delay, task)
+            tasks.append((task, timeout))
+        last_task = tasks[-1][0]
+        last_task.wait(delay + self.GRACETIME)
+        for task, timeout in tasks:
+            self.assertTrue(timeout.deadline <= task.call_time)
+            self.assertTrue(task.call_time < timeout.deadline + self.GRACETIME)
+
+    def test_continue_after_failures(self):
+        self.scheduler.schedule(0.3, FailingTask())
+        task = Task()
+        self.scheduler.schedule(0.4, task)
+        task.wait(0.4 + self.GRACETIME)
+        self.assertTrue(task.call_time is not None)
+
+    def test_cancel_timeout(self):
+        delay = 0.3
+        task = Task()
+        timeout = self.scheduler.schedule(delay, task)
+        timeout.cancel()
+        task.wait(delay + self.GRACETIME)
+        self.assertEquals(task.call_time, None)
+
+    def test_cancel_many(self):
+        delay = 0.3
+        tasks = []
+        for i in range(self.MAX_TASKS):
+            task = Task()
+            timeout = self.scheduler.schedule(delay, task)
+            tasks.append((task, timeout))
+        for task, timeout in tasks:
+            timeout.cancel()
+        last_task = tasks[-1][0]
+        last_task.wait(delay + self.GRACETIME)
+        for task, timeout in tasks:
+            self.assertEquals(task.call_time, None)
+
+    def test_cancel(self):
+        delay = 0.3
+        tasks = []
+        for i in range(self.MAX_TASKS):
+            task = Task()
+            timeout = self.scheduler.schedule(delay, task)
+            tasks.append((task, timeout))
+        self.scheduler.cancel()
+        last_task = tasks[-1][0]
+        last_task.wait(delay + self.GRACETIME)
+        for task, timeout in tasks:
+            self.assertEquals(task.call_time, None)
+
+
+class Task(object):
+
+    def __init__(self):
+        self.cond = threading.Condition(threading.Lock())
+        self.call_time = None
+
+    def __call__(self):
+        with self.cond:
+            self.call_time = time.time()
+            self.cond.notify()
+
+    def wait(self, timeout):
+        with self.cond:
+            if self.call_time is None:
+                self.cond.wait(timeout)
+
+
+class FailingTask(object):
+
+    def __call__(self):
+        raise Exception("This task is broken")
diff --git a/vdsm.spec.in b/vdsm.spec.in
index dfca5bd..8ba0477 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -1163,6 +1163,7 @@
 %{python_sitearch}/%{vdsm_name}/qemuimg.py*
 %{python_sitearch}/%{vdsm_name}/SecureXMLRPCServer.py*
 %{python_sitearch}/%{vdsm_name}/netconfpersistence.py*
+%{python_sitearch}/%{vdsm_name}/schedule.py*
 %{python_sitearch}/%{vdsm_name}/sslutils.py*
 %{python_sitearch}/%{vdsm_name}/utils.py*
 %{python_sitearch}/%{vdsm_name}/vdscli.py*


-- 
To view, visit http://gerrit.ovirt.org/29607
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie3764806d93bd37c3b5924080eb5ae4d29e4f4e0
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to