Nir Soffer has uploaded a new change for review.

Change subject: tests: Improve executor tests
......................................................................

tests: Improve executor tests

The previous tests were not good enough, and missed a bug calling the
scheduler with the wrong order of paramaters.

Changes:

- Use now the real scheduler so we can observe the behavior of the real
  executor. If the scheduler will fail, it will cause these tests to
  fail, but the scheduler have good unit tests.
- Use Task object so we can test now handling of multiple tasks.
- Use threading.Event to allow waiting until tasks finish, instead of
  sleeping
- Give time for scheduler and executor threads to start, as we want to
  test them fully started.
- Test more flows that were not tested before
- Mark slow tests so users can skip them if needed

Change-Id: Ia096df2ca80e9a6d98eea507573e4e9a9191a370
Signed-off-by: Nir Soffer <[email protected]>
---
M tests/executorTests.py
1 file changed, 103 insertions(+), 40 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/45/30745/1

diff --git a/tests/executorTests.py b/tests/executorTests.py
index 37a5cb5..516ff07 100644
--- a/tests/executorTests.py
+++ b/tests/executorTests.py
@@ -21,61 +21,124 @@
 import threading
 import time
 
-from vdsm.utils import running
 from vdsm import executor
+from vdsm import schedule
 
+from testValidation import slowtest
 from testlib import VdsmTestCase as TestCaseBase
 
 
-class TestsExecutor(TestCaseBase):
+class ExecutorTests(TestCaseBase):
+
     def setUp(self):
-        self._ecu = executor.Executor(workers_count=10, max_tasks=1000,
-                                      scheduler=_FakeScheduler())
-        self._task_called = False
+        self.scheduler = schedule.Scheduler()
+        self.scheduler.start()
+        self.executor = executor.Executor(workers_count=10, max_tasks=20,
+                                          scheduler=self.scheduler)
+        self.executor.start()
+        time.sleep(0.1)  # Give time to start all threads
 
-    def _task(self):
-        self._task_called = True
+    def tearDown(self):
+        self.executor.stop()
+        self.scheduler.stop()
 
-    def _waiting_task(self):
-        time.sleep(0.25)
-        # s0.25s is till bearable, but it would be nice to get rid of this
-        self._task()
-
-    def _faulty_task(self, *args):
-        raise RuntimeError('Fake fault!')
-
-    def test_dispatch_not_started(self):
+    def test_dispatch_not_running(self):
+        self.executor.stop()
         self.assertRaises(executor.NotRunning,
-                          self._ecu.dispatch,
-                          self._task)
+                          self.executor.dispatch,
+                          Task())
 
     def test_start_twice(self):
-        with running(self._ecu):
-            self.assertRaises(executor.AlreadyStarted,
-                              self._ecu.start)
+        self.assertRaises(executor.AlreadyStarted,
+                          self.executor.start)
 
     def test_dispatch(self):
-        with running(self._ecu):
-            self._ecu.dispatch(self._task)
-        self.assertTrue(self._task_called)
+        task = Task()
+        self.executor.dispatch(task)
+        task.executed.wait(0.1)
+        self.assertTrue(task.executed.is_set())
 
-    def test_run_after_fault(self):
-        with running(self._ecu):
-            self._ecu.dispatch(self._faulty_task)
-            self._ecu.dispatch(self._task)
-        self.assertTrue(self._task_called)
+    def test_dispatch_after_fault(self):
+        faulty_task = Task(error=RuntimeError("fake error"))
+        self.executor.dispatch(faulty_task)
+        faulty_task.executed.wait(0.1)
+        task = Task()
+        self.executor.dispatch(task)
+        task.executed.wait(0.1)
+        self.assertTrue(task.executed.is_set())
 
-    def test_discard_task(self):
-        with running(self._ecu):
-            self._ecu.dispatch(self._waiting_task, 0.1)
-            time.sleep(0.5)
-        self.assertTrue(self._task_called)  # task must have executed!
+    @slowtest
+    def test_dispatch_with_timeout(self):
+        task = Task(wait=0.2)
+        self.executor.dispatch(task, 0.1)
+        task.executed.wait(0.3)
+        self.assertTrue(task.executed.is_set())  # task must have executed!
+
+    def test_too_many_tasks(self):
+        tasks = [Task(wait=0.1) for n in xrange(31)]
+        with self.assertRaises(executor.TooManyTasks):
+            for task in tasks:
+                self.executor.dispatch(task)
+
+    @slowtest
+    def test_concurrency(self):
+        tasks = [Task(wait=0.1) for n in xrange(20)]
+        for task in tasks:
+            self.executor.dispatch(task, 1.0)
+        time.sleep(0.3)
+        for task in tasks:
+            self.assertTrue(task.executed.is_set())
+
+    @slowtest
+    def test_blocked_workers(self):
+        slow_tasks = [Task(wait=0.4) for n in xrange(5)]
+        for task in slow_tasks:
+            self.executor.dispatch(task, 1.0)
+        # Slow tasks block half of the workers
+        tasks = [Task(wait=0.1) for n in xrange(20)]
+        for task in tasks:
+            self.executor.dispatch(task, 1.0)
+        time.sleep(0.5)
+        for task in tasks:
+            self.assertTrue(task.executed.is_set())
+        for task in slow_tasks:
+            self.assertTrue(task.executed.is_set())
+
+    @slowtest
+    def test_discarded_workers(self):
+        slow_tasks = [Task(wait=0.4) for n in xrange(10)]
+        for task in slow_tasks:
+            self.executor.dispatch(task, 0.1)
+        # All workers are blocked on slow tasks
+        time.sleep(0.1)
+        # Blocked workers should be replaced with new workers
+        tasks = [Task(wait=0.1) for n in xrange(20)]
+        for task in tasks:
+            self.executor.dispatch(task, 1.0)
+        time.sleep(0.3)
+        for task in tasks:
+            self.assertTrue(task.executed.is_set())
+        for task in slow_tasks:
+            self.assertTrue(task.executed.is_set())
+        # Discarded workers should exit, executor should operate normally
+        tasks = [Task(wait=0.1) for n in xrange(20)]
+        for task in tasks:
+            self.executor.dispatch(task, 1.0)
+        time.sleep(0.3)
+        for task in tasks:
+            self.assertTrue(task.executed.is_set())
 
 
-class _FakeScheduler(object):
-    def __init__(self):
-        self._timer = None
+class Task(object):
 
-    def schedule(self, timeout, callable):
-        self._timer = threading.Timer(timeout, callable)
-        self._timer.start()
+    def __init__(self, wait=None, error=None):
+        self.wait = wait
+        self.error = error
+        self.executed = threading.Event()
+
+    def __call__(self):
+        if self.wait:
+            time.sleep(self.wait)
+        self.executed.set()
+        if self.error:
+            raise self.error


-- 
To view, visit http://gerrit.ovirt.org/30745
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia096df2ca80e9a6d98eea507573e4e9a9191a370
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to