Nir Soffer has posted comments on this change.

Change subject: virt: Limit the number of workers in executor
......................................................................


Patch Set 16:

(13 comments)

https://gerrit.ovirt.org/#/c/57754/16/tests/executorTests.py
File tests/executorTests.py:

Line 36
Line 37
Line 38
Line 39
Line 40
Please keep this for the new tests unless there is a reason to change.


Line 183
Line 184
Line 185
Line 186
Line 187
Better change to wait on an event, so we can have block tasks, and release them 
when the test ends.

    t = Task(wait=100)
    executor.dispatch(t)
    try: 
        assert not t.started.wait(1)
    finally:
        t.waiter.set()


Line 35: 
Line 36:     def setUp(self):
Line 37:         self.scheduler = schedule.Scheduler()
Line 38:         self.scheduler.start()
Line 39:         self.max_tasks = 20
Why did you change max_task to be lower then max_workers?

This seems to cause trouble for your tests, having to wait to prevent too many 
tasks error.

I don't like changing this for the old tests - please explain why this needed 
for the new tests.
Line 40:         self.max_workers = 30
Line 41:         self.executor = executor.Executor('test',
Line 42:                                           workers_count=10,
Line 43:                                           max_tasks=self.max_tasks,


Line 46:         self.executor.start()
Line 47:         time.sleep(0.1)  # Give time to start all threads
Line 48: 
Line 49:     def tearDown(self):
Line 50:         self.executor.stop(wait=False)
This is not a good idea. We want to clean up properly after each test.

Tests should not leave blocked threads in the executor, so we should be able to 
wait for the executor.
Line 51:         self.scheduler.stop()
Line 52: 
Line 53:     def test_dispatch_not_running(self):
Line 54:         self.executor.stop()


Line 137:         time.sleep(0.3)
Line 138:         for task in tasks:
Line 139:             self.assertTrue(task.executed.is_set())
Line 140: 
Line 141:     def _wait_for_queue(self):
We can remove this if we fix max_tasks.
Line 142:         timeout = 1
Line 143:         queue = self.executor._tasks
Line 144:         start = utils.monotonic_time()
Line 145:         while len(queue._tasks) >= queue._max_tasks:


Line 158:         tasks = [Task(event=blocked_forever, 
start_barrier=start_barrier)
Line 159:                  for n in range(limit - 1)]
Line 160:         tasks.append(Task(event=blocked, start_barrier=start_barrier))
Line 161:         for t in tasks:
Line 162:             self._wait_for_queue()  # prevent TooManyTasks error
Using bigger max_tasks will make this unneeded.
Line 163:             self.executor.dispatch(t, 0)
Line 164:         # Wait until all tasks are started, i.e. the executor reaches 
its
Line 165:         # maximum number of workers
Line 166:         start_barrier.wait(timeout=3)


Line 169:         # workers is reached
Line 170:         n_extra_tasks = 2
Line 171:         extra_tasks = [Task() for i in range(n_extra_tasks)]
Line 172:         for t in extra_tasks:
Line 173:             self._wait_for_queue()  # prevent TooManyTasks error
Same, not needed.
Line 174:             self.executor.dispatch(t, 0)
Line 175:         # Check that none of the new tasks got executed (the number 
of the
Line 176:         # executor workers is at the maximum limit, so nothing more 
may be run)
Line 177:         self.assertEqual([t for t in extra_tasks if 
t.executed.wait(1)], [])


Line 170:         n_extra_tasks = 2
Line 171:         extra_tasks = [Task() for i in range(n_extra_tasks)]
Line 172:         for t in extra_tasks:
Line 173:             self._wait_for_queue()  # prevent TooManyTasks error
Line 174:             self.executor.dispatch(t, 0)
Add blank line to separate logical blocks.
Line 175:         # Check that none of the new tasks got executed (the number 
of the
Line 176:         # executor workers is at the maximum limit, so nothing more 
may be run)
Line 177:         self.assertEqual([t for t in extra_tasks if 
t.executed.wait(1)], [])
Line 178: 


Line 178: 
Line 179:         # Unblock one of the tasks and check the new tasks run
Line 180:         blocked.set()
Line 181:         # The last task, the only unblocked one, should be executed 
now
Line 182:         self.assertTrue(tasks[-1].executed.wait(1))
Add blank line to make it easier to follow.
Line 183:         # The other tasks shouldn't be unblocked and executed, let's 
check
Line 184:         # things go as expected before proceeding (however we don't 
want to
Line 185:         # stop and wait on each of the tasks, the first one is enough)
Line 186:         self.assertFalse(tasks[0].executed.wait(1))


Line 184:         # things go as expected before proceeding (however we don't 
want to
Line 185:         # stop and wait on each of the tasks, the first one is enough)
Line 186:         self.assertFalse(tasks[0].executed.wait(1))
Line 187:         self.assertEqual([t for t in tasks if t.executed.is_set()],
Line 188:                          [tasks[-1]])
Add blank line.
Line 189:         # Extra tasks are not blocking, they were blocked just by the 
overflown
Line 190:         # executor, so they should be all executed now when there is 
one free
Line 191:         # worker
Line 192:         self.assertEqual([t for t in extra_tasks if not 
t.executed.wait(1)],


Line 189:         # Extra tasks are not blocking, they were blocked just by the 
overflown
Line 190:         # executor, so they should be all executed now when there is 
one free
Line 191:         # worker
Line 192:         self.assertEqual([t for t in extra_tasks if not 
t.executed.wait(1)],
Line 193:                          [])
Nice!
Line 194: 
Line 195:     @slowtest
Line 196:     def test_max_workers_many_tasks(self):
Line 197:         # Check we don't get TooManyTasks exception after reaching 
the limit on


Line 197:         # Check we don't get TooManyTasks exception after reaching 
the limit on
Line 198:         # the total number of workers if TaskQueue is not full.
Line 199:         number_of_tasks = self.max_workers + self.max_tasks
Line 200:         for i in range(number_of_tasks):
Line 201:             self._wait_for_queue()
Lets wait instead until the task has started:

    t = Task(wait=100)
    self.exectuor.dispatch(t, 0)
    assert t.started.wait(1)
Line 202:             self.executor.dispatch(Task(wait=100), 0)
Line 203: 
Line 204: 
Line 205: class TestWorkerSystemNames(TestCaseBase):


Line 252:         self.started = False
Line 253:         self.executed = threading.Event()
Line 254: 
Line 255:     def __call__(self):
Line 256:         self.started = True
Lets use an event here, so we can wait until task is stated inside the executor:

    t = Task()
    executor.dispatch(t)
    t.started.wait(1)
Line 257:         if self.start_barrier is not None:
Line 258:             self.start_barrier.wait()
Line 259:         if self.wait:
Line 260:             time.sleep(self.wait)


-- 
To view, visit https://gerrit.ovirt.org/57754
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Iba56d91474c6b14a1cfe2db827b6fd61843a1db2
Gerrit-PatchSet: 16
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Milan Zamazal <[email protected]>
Gerrit-Reviewer: Francesco Romani <[email protected]>
Gerrit-Reviewer: Jenkins CI
Gerrit-Reviewer: Martin Polednik <[email protected]>
Gerrit-Reviewer: Milan Zamazal <[email protected]>
Gerrit-Reviewer: Nir Soffer <[email protected]>
Gerrit-Reviewer: gerrit-hooks <[email protected]>
Gerrit-HasComments: Yes
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/admin/lists/[email protected]

Reply via email to