Milan Zamazal has posted comments on this change.

Change subject: virt: Limit the number of workers in executor
......................................................................


Patch Set 17:

(11 comments)

https://gerrit.ovirt.org/#/c/57754/16/tests/executorTests.py
File tests/executorTests.py:

Line 183
Line 184
Line 185
Line 186
Line 187
> Better change to wait on an event, so we can have block tasks, and release 
That would require touching the tests unrelated to this patch, so I'd leave it 
for a followup patch or patches.


Line 35: 
Line 36:     def setUp(self):
Line 37:         self.scheduler = schedule.Scheduler()
Line 38:         self.scheduler.start()
Line 39:         self.max_tasks = 20
> Why did you change max_task to be lower then max_workers?
I didn't change the original max_tasks value, I just extracted it to an 
attribute, since it is used in the tests.

As for max_workers, I selected higher value than max_tasks not to mess with 
other tests. But it seems all the tests should work with max_workers == 15, so 
let's use that value (getting rid of waiting on the queue is a good idea). An 
alternative would be to make a separate test (sub)class, but it's not needed 
now.
Line 40:         self.max_workers = 15
Line 41:         self.executor = executor.Executor('test',
Line 42:                                           workers_count=10,
Line 43:                                           max_tasks=self.max_tasks,


Line 46:         self.executor.start()
Line 47:         time.sleep(0.1)  # Give time to start all threads
Line 48: 
Line 49:     def tearDown(self):
Line 50:         self.executor.stop()
> This is not a good idea. We want to clean up properly after each test.
I agree, but in case there is a bug in the executor code that leaves a stuck 
task somewhere then we hang here. This is not what we want. Do you have a 
suggestion how to cleanup properly here even when a stuck task is present?

I removed the wait parameter here and added a regular cleanup to the tests but 
we should still fix the problem in a followup patch. If we don't then we should 
improve the cleanup in the tests to make sure we don't hang on failed 
assertions, which may be a bit complex (especially with barriers).
Line 51:         self.scheduler.stop()
Line 52: 
Line 53:     def test_dispatch_not_running(self):
Line 54:         self.executor.stop()


Line 137:         time.sleep(0.3)
Line 138:         for task in tasks:
Line 139:             self.assertTrue(task.executed.is_set())
Line 140: 
Line 141:     @slowtest
> We can remove this if we fix max_tasks.
Done
Line 142:     def test_max_workers(self):
Line 143:         limit = self.max_workers
Line 144:         blocked_forever = threading.Event()
Line 145:         blocked = threading.Event()


Line 158: 
Line 159:         # Try to run new tasks on the executor now, when the maximum 
number of
Line 160:         # workers is reached
Line 161:         n_extra_tasks = 2
Line 162:         extra_tasks = [Task() for i in range(n_extra_tasks)]
> Using bigger max_tasks will make this unneeded.
Done
Line 163:         for t in extra_tasks:
Line 164:             self.executor.dispatch(t, 0)
Line 165: 
Line 166:         # Check that none of the new tasks got executed (the number 
of the


Line 169: 
Line 170:         # Unblock one of the tasks and check the new tasks run
Line 171:         blocked.set()
Line 172:         # The last task, the only unblocked one, should be executed 
now
Line 173:         self.assertTrue(tasks[-1].executed.wait(1))
> Same, not needed.
Done
Line 174: 
Line 175:         # The other tasks shouldn't be unblocked and executed, let's 
check
Line 176:         # things go as expected before proceeding (however we don't 
want to
Line 177:         # stop and wait on each of the tasks, the first one is enough)


Line 170:         # Unblock one of the tasks and check the new tasks run
Line 171:         blocked.set()
Line 172:         # The last task, the only unblocked one, should be executed 
now
Line 173:         self.assertTrue(tasks[-1].executed.wait(1))
Line 174: 
> Add blank line to separate logical blocks.
Done
Line 175:         # The other tasks shouldn't be unblocked and executed, let's 
check
Line 176:         # things go as expected before proceeding (however we don't 
want to
Line 177:         # stop and wait on each of the tasks, the first one is enough)
Line 178:         self.assertFalse(tasks[0].executed.wait(1))


Line 178:         self.assertFalse(tasks[0].executed.wait(1))
Line 179:         self.assertEqual([t for t in tasks if t.executed.is_set()],
Line 180:                          [tasks[-1]])
Line 181: 
Line 182:         # Extra tasks are not blocking, they were blocked just by the 
overflown
> Add blank line to make it easier to follow.
Done
Line 183:         # executor, so they should be all executed now when there is 
one free
Line 184:         # worker
Line 185:         self.assertEqual([t for t in extra_tasks if not 
t.executed.wait(1)],
Line 186:                          [])


Line 184:         # worker
Line 185:         self.assertEqual([t for t in extra_tasks if not 
t.executed.wait(1)],
Line 186:                          [])
Line 187: 
Line 188:         # Cleanup: Finish all the executor jobs
> Add blank line.
Done
Line 189:         blocked_forever.set()
Line 190: 
Line 191:     @slowtest
Line 192:     def test_max_workers_many_tasks(self):


Line 197:         barrier = concurrent.Barrier(self.max_workers + 1)
Line 198: 
Line 199:         # Exhaust workers
Line 200:         for i in range(self.max_workers):
Line 201:             task = Task(event=blocked, start_barrier=barrier)
> Lets wait instead until the task has started:
I reworked the test completely.
Line 202:             self.executor.dispatch(task, 0)
Line 203:         barrier.wait(3)
Line 204: 
Line 205:         # Task queue should accept further tasks up to its capacity


Line 252:             done.wait()
Line 253: 
Line 254:         self.assertEqual(sorted(names),
Line 255:                          ["bar/0", "bar/1", "foo/0", "foo/1"])
Line 256: 
> Lets use an event here, so we can wait until task is stated inside the exec
I changed it to a simple value since we currently don't use it for anything but 
__repr__. But yes, it can be an event in case we need it, so done.
Line 257: 
Line 258: class Task(object):
Line 259: 
Line 260:     def __init__(self, wait=None, error=None, event=None, 
start_barrier=None):


-- 
To view, visit https://gerrit.ovirt.org/57754
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Iba56d91474c6b14a1cfe2db827b6fd61843a1db2
Gerrit-PatchSet: 17
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Milan Zamazal <mzama...@redhat.com>
Gerrit-Reviewer: Francesco Romani <from...@redhat.com>
Gerrit-Reviewer: Jenkins CI
Gerrit-Reviewer: Martin Polednik <mpoled...@redhat.com>
Gerrit-Reviewer: Milan Zamazal <mzama...@redhat.com>
Gerrit-Reviewer: Nir Soffer <nsof...@redhat.com>
Gerrit-Reviewer: gerrit-hooks <automat...@ovirt.org>
Gerrit-HasComments: Yes
_______________________________________________
vdsm-patches mailing list
vdsm-patches@lists.fedorahosted.org
https://lists.fedorahosted.org/admin/lists/vdsm-patches@lists.fedorahosted.org

Reply via email to