Nir Soffer has posted comments on this change. Change subject: virt: Limit the number of workers in executor ......................................................................
Patch Set 16: (13 comments) https://gerrit.ovirt.org/#/c/57754/16/tests/executorTests.py File tests/executorTests.py: Line 36 Line 37 Line 38 Line 39 Line 40 Please keep this for the new tests unless there is a reason to change. Line 183 Line 184 Line 185 Line 186 Line 187 Better change to wait on an event, so we can have block tasks, and release them when the test ends. t = Task(wait=100) executor.dispatch(t) try: assert not t.started.wait(1) finally: t.waiter.set() Line 35: Line 36: def setUp(self): Line 37: self.scheduler = schedule.Scheduler() Line 38: self.scheduler.start() Line 39: self.max_tasks = 20 Why did you change max_task to be lower then max_workers? This seems to cause trouble for your tests, having to wait to prevent too many tasks error. I don't like changing this for the old tests - please explain why this needed for the new tests. Line 40: self.max_workers = 30 Line 41: self.executor = executor.Executor('test', Line 42: workers_count=10, Line 43: max_tasks=self.max_tasks, Line 46: self.executor.start() Line 47: time.sleep(0.1) # Give time to start all threads Line 48: Line 49: def tearDown(self): Line 50: self.executor.stop(wait=False) This is not a good idea. We want to clean up properly after each test. Tests should not leave blocked threads in the executor, so we should be able to wait for the executor. Line 51: self.scheduler.stop() Line 52: Line 53: def test_dispatch_not_running(self): Line 54: self.executor.stop() Line 137: time.sleep(0.3) Line 138: for task in tasks: Line 139: self.assertTrue(task.executed.is_set()) Line 140: Line 141: def _wait_for_queue(self): We can remove this if we fix max_tasks. Line 142: timeout = 1 Line 143: queue = self.executor._tasks Line 144: start = utils.monotonic_time() Line 145: while len(queue._tasks) >= queue._max_tasks: Line 158: tasks = [Task(event=blocked_forever, start_barrier=start_barrier) Line 159: for n in range(limit - 1)] Line 160: tasks.append(Task(event=blocked, start_barrier=start_barrier)) Line 161: for t in tasks: Line 162: self._wait_for_queue() # prevent TooManyTasks error Using bigger max_tasks will make this unneeded. Line 163: self.executor.dispatch(t, 0) Line 164: # Wait until all tasks are started, i.e. the executor reaches its Line 165: # maximum number of workers Line 166: start_barrier.wait(timeout=3) Line 169: # workers is reached Line 170: n_extra_tasks = 2 Line 171: extra_tasks = [Task() for i in range(n_extra_tasks)] Line 172: for t in extra_tasks: Line 173: self._wait_for_queue() # prevent TooManyTasks error Same, not needed. Line 174: self.executor.dispatch(t, 0) Line 175: # Check that none of the new tasks got executed (the number of the Line 176: # executor workers is at the maximum limit, so nothing more may be run) Line 177: self.assertEqual([t for t in extra_tasks if t.executed.wait(1)], []) Line 170: n_extra_tasks = 2 Line 171: extra_tasks = [Task() for i in range(n_extra_tasks)] Line 172: for t in extra_tasks: Line 173: self._wait_for_queue() # prevent TooManyTasks error Line 174: self.executor.dispatch(t, 0) Add blank line to separate logical blocks. Line 175: # Check that none of the new tasks got executed (the number of the Line 176: # executor workers is at the maximum limit, so nothing more may be run) Line 177: self.assertEqual([t for t in extra_tasks if t.executed.wait(1)], []) Line 178: Line 178: Line 179: # Unblock one of the tasks and check the new tasks run Line 180: blocked.set() Line 181: # The last task, the only unblocked one, should be executed now Line 182: self.assertTrue(tasks[-1].executed.wait(1)) Add blank line to make it easier to follow. Line 183: # The other tasks shouldn't be unblocked and executed, let's check Line 184: # things go as expected before proceeding (however we don't want to Line 185: # stop and wait on each of the tasks, the first one is enough) Line 186: self.assertFalse(tasks[0].executed.wait(1)) Line 184: # things go as expected before proceeding (however we don't want to Line 185: # stop and wait on each of the tasks, the first one is enough) Line 186: self.assertFalse(tasks[0].executed.wait(1)) Line 187: self.assertEqual([t for t in tasks if t.executed.is_set()], Line 188: [tasks[-1]]) Add blank line. Line 189: # Extra tasks are not blocking, they were blocked just by the overflown Line 190: # executor, so they should be all executed now when there is one free Line 191: # worker Line 192: self.assertEqual([t for t in extra_tasks if not t.executed.wait(1)], Line 189: # Extra tasks are not blocking, they were blocked just by the overflown Line 190: # executor, so they should be all executed now when there is one free Line 191: # worker Line 192: self.assertEqual([t for t in extra_tasks if not t.executed.wait(1)], Line 193: []) Nice! Line 194: Line 195: @slowtest Line 196: def test_max_workers_many_tasks(self): Line 197: # Check we don't get TooManyTasks exception after reaching the limit on Line 197: # Check we don't get TooManyTasks exception after reaching the limit on Line 198: # the total number of workers if TaskQueue is not full. Line 199: number_of_tasks = self.max_workers + self.max_tasks Line 200: for i in range(number_of_tasks): Line 201: self._wait_for_queue() Lets wait instead until the task has started: t = Task(wait=100) self.exectuor.dispatch(t, 0) assert t.started.wait(1) Line 202: self.executor.dispatch(Task(wait=100), 0) Line 203: Line 204: Line 205: class TestWorkerSystemNames(TestCaseBase): Line 252: self.started = False Line 253: self.executed = threading.Event() Line 254: Line 255: def __call__(self): Line 256: self.started = True Lets use an event here, so we can wait until task is stated inside the executor: t = Task() executor.dispatch(t) t.started.wait(1) Line 257: if self.start_barrier is not None: Line 258: self.start_barrier.wait() Line 259: if self.wait: Line 260: time.sleep(self.wait) -- To view, visit https://gerrit.ovirt.org/57754 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: Iba56d91474c6b14a1cfe2db827b6fd61843a1db2 Gerrit-PatchSet: 16 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Milan Zamazal <[email protected]> Gerrit-Reviewer: Francesco Romani <[email protected]> Gerrit-Reviewer: Jenkins CI Gerrit-Reviewer: Martin Polednik <[email protected]> Gerrit-Reviewer: Milan Zamazal <[email protected]> Gerrit-Reviewer: Nir Soffer <[email protected]> Gerrit-Reviewer: gerrit-hooks <[email protected]> Gerrit-HasComments: Yes _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/admin/lists/[email protected]
