Nir Soffer has uploaded a new change for review. Change subject: rwlock: Add simpler RWLock supporting acquire timeout ......................................................................
rwlock: Add simpler RWLock supporting acquire timeout We have RWLock implementation in storage.misc, used for by the resource manager. We want to simplify the resource manager, which is way too complex to understand or maintain. Patch https://gerrit.ovirt.org/42773 suggests to add a non-blocking acquire to the current RWLock, needed for the new simple lock manager. However, adding more code to the current RWLock is not a good idea. Instead, this patch replace the current implementation with a simpler one. Differences from the current RWLock: - Support acquire timeout - with timeout=0, can be used to implement non-blocking acquire. With bigger timeout, make it easy to fail a request after some timeout, instead of leaving a blocked thread forever. - Recursive lock not supported - I'm not sure we need this support, and usually having a recursive lock is a design smell. We will add it later only if required. - 100% test coverage (previously 0%) Change-Id: I87d0ad33c9d8bf8b93a1ce9a9fed86a4affef089 Signed-off-by: Nir Soffer <[email protected]> --- M debian/vdsm-python.install M lib/vdsm/Makefile.am A lib/vdsm/rwlock.py M tests/Makefile.am A tests/rwlock_test.py M vdsm.spec.in 6 files changed, 316 insertions(+), 0 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/49/42849/1 diff --git a/debian/vdsm-python.install b/debian/vdsm-python.install index 57d5033..3047d9f 100644 --- a/debian/vdsm-python.install +++ b/debian/vdsm-python.install @@ -28,6 +28,7 @@ ./usr/lib/python2.7/dist-packages/vdsm/pthread.py ./usr/lib/python2.7/dist-packages/vdsm/qemuimg.py ./usr/lib/python2.7/dist-packages/vdsm/response.py +./usr/lib/python2.7/dist-packages/vdsm/rwlock.py ./usr/lib/python2.7/dist-packages/vdsm/schedule.py ./usr/lib/python2.7/dist-packages/vdsm/sslutils.py ./usr/lib/python2.7/dist-packages/vdsm/tool/__init__.py diff --git a/lib/vdsm/Makefile.am b/lib/vdsm/Makefile.am index 95e236f..45cef02 100644 --- a/lib/vdsm/Makefile.am +++ b/lib/vdsm/Makefile.am @@ -38,6 +38,7 @@ pthread.py \ qemuimg.py \ response.py \ + rwlock.py \ schedule.py \ sslutils.py \ sysctl.py \ diff --git a/lib/vdsm/rwlock.py b/lib/vdsm/rwlock.py new file mode 100644 index 0000000..0e9b9ec --- /dev/null +++ b/lib/vdsm/rwlock.py @@ -0,0 +1,75 @@ +import threading + + +class Timeout(Exception): + pass + + +class RWLock(object): + + def __init__(self): + self._lock = threading.Lock() + self._waiters = [] + self._readers = set() + self._writer = None + + def acquire_read(self, timeout=None): + with self._lock: + if self._writer or self._waiters: + self._wait(timeout, False) + self._readers.add(threading.current_thread()) + + def acquire_write(self, timeout=None): + with self._lock: + if self._writer or self._readers or self._waiters: + self._wait(timeout, True) + self._writer = threading.current_thread() + + def release(self): + me = threading.current_thread() + with self._lock: + if self._writer: + if self._writer is not me: + raise AssertionError("Thread %s attempted to release a " + "write lock held by thread %s" + % (me, self._writer)) + self._writer = None + else: + if me not in self._readers: + raise AssertionError("Thread %s attempted to release a " + "read lock it does not hold" + % (me,)) + self._readers.remove(me) + if self._waiters: + self._wakeup_waiter() + + def _wait(self, timeout, wants_write): + waiter = Waiter(wants_write) + self._waiters.append(waiter) + try: + self._lock.release() + try: + waiter.wait(timeout) + finally: + self._lock.acquire() + finally: + self._waiters.remove(waiter) + + def _wakeup_waiter(self): + if self._readers and self._waiters[0].wants_write: + return + self._waiters[0].wakeup() + + +class Waiter(object): + + def __init__(self, wants_write): + self.wants_write = wants_write + self._event = threading.Event() + + def wait(self, timeout): + if not self._event.wait(timeout): + raise Timeout() + + def wakeup(self): + self._event.set() diff --git a/tests/Makefile.am b/tests/Makefile.am index 39154c8..5128e87 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -87,6 +87,7 @@ remoteFileHandlerTests.py \ resourceManagerTests.py \ responseTests.py \ + rwlock_test.py \ samplingTests.py \ scheduleTests.py \ schemaTests.py \ diff --git a/tests/rwlock_test.py b/tests/rwlock_test.py new file mode 100644 index 0000000..3155123 --- /dev/null +++ b/tests/rwlock_test.py @@ -0,0 +1,237 @@ +from __future__ import print_function +import threading +import time + +from testlib import VdsmTestCase +from testValidation import slowtest, stresstest + +from vdsm import rwlock + + +class RWLockTests(VdsmTestCase): + + def test_writer_blocks_other_writer(self): + lock = rwlock.RWLock() + lock.acquire_write() + try: + log = [] + start_thread(writer, lock, log, timeout=0).join() + self.assertEqual(log, ["writer timeout"]) + finally: + lock.release() + + def test_writer_blocks_other_reader(self): + lock = rwlock.RWLock() + lock.acquire_write() + try: + log = [] + start_thread(reader, lock, log, timeout=0).join() + self.assertEqual(log, ["reader timeout"]) + finally: + lock.release() + + def test_reader_blocks_other_writer(self): + lock = rwlock.RWLock() + lock.acquire_read() + try: + log = [] + start_thread(writer, lock, log, timeout=0).join() + self.assertEqual(log, ["writer timeout"]) + finally: + lock.release() + + @slowtest + def test_concurrent_readers(self): + lock = rwlock.RWLock() + readers = [] + log = [] + with self.assertElapsed(0.1, 0.1): + for i in range(5): + t = start_thread(reader, lock, log, hold=0.1) + readers.append(t) + for t in readers: + t.join() + self.assertEqual(log, ["reader acquired"] * 5) + + @slowtest + def test_wakeup_blocked_writer(self): + lock = rwlock.RWLock() + readers = [] + log = [] + try: + for i in range(5): + t = start_thread(reader, lock, log, hold=0.2) + readers.append(t) + time.sleep(0.1) + # All readers should hold the lock now + with self.assertElapsed(0.1, 0.02): + # Should block until last reader release the lock + lock.acquire_write(0.2) + lock.release() + finally: + for t in readers: + t.join() + + @slowtest + def test_wakeup_blocked_readers(self): + lock = rwlock.RWLock() + readers = [] + log = [] + lock.acquire_write() + try: + for i in range(10): + t = start_thread(reader, lock, log) + readers.append(t) + time.sleep(0.1) + # All readers should be blocked now + finally: + lock.release() + # Should wake up all readers, taking a read lock + for t in readers: + t.join() + self.assertEqual(log, ["reader acquired"] * 10) + + @slowtest + def test_release_other_thread_write_lock(self): + lock = rwlock.RWLock() + t = start_thread(writer, lock, [], hold=0.2) + try: + time.sleep(0.1) + self.assertRaises(AssertionError, lock.release) + finally: + t.join() + + @slowtest + def test_release_other_thread_read_lock(self): + lock = rwlock.RWLock() + t = start_thread(reader, lock, [], hold=0.2) + try: + time.sleep(0.1) + self.assertRaises(AssertionError, lock.release) + finally: + t.join() + + @slowtest + def test_fifo(self): + lock = rwlock.RWLock() + log = [] + lock.acquire_read() + try: + # t1 is waiting on the first reader... + t1 = start_thread(writer, lock, log) + time.sleep(0.1) + # t2 joined after t1, so it should get the lock after it + t2 = start_thread(reader, lock, log) + time.sleep(0.1) + finally: + lock.release() + t1.join() + t2.join() + self.assertEqual(log, ["writer acquired", "reader acquired"]) + + @stresstest + def test_fairness(self): + lock = rwlock.RWLock() + ready = threading.Event() + done = threading.Event() + reads = [0] + writes = [0] + + def read(): + ready.wait() + while not done.is_set(): + lock.acquire_read() + try: + reads[0] += 1 + finally: + lock.release() + + def write(): + ready.wait() + while not done.is_set(): + lock.acquire_write() + try: + writes[0] += 1 + finally: + lock.release() + + t1 = start_thread(read) + t2 = start_thread(write) + try: + time.sleep(0.1) + ready.set() + time.sleep(1) + finally: + done.set() + t1.join() + t2.join() + + print('reads: ', reads[0], 'writes:', writes[0]) + self.assertTrue(abs(reads[0] - writes[0]) < reads[0] / 10) + + +class HelpersTests(VdsmTestCase): + """ + Test helpers used in the RWLock tests. + """ + + def test_writer(self): + lock = rwlock.RWLock() + log = [] + start_thread(writer, lock, log).join() + self.assertEqual(log, ["writer acquired"]) + + @slowtest + def test_writer_hold(self): + lock = rwlock.RWLock() + log = [] + with self.assertElapsed(0.1, 0.05): + start_thread(writer, lock, log, hold=0.1).join() + self.assertEqual(log, ["writer acquired"]) + + def test_reader(self): + lock = rwlock.RWLock() + log = [] + start_thread(reader, lock, log, hold=0).join() + self.assertEqual(log, ["reader acquired"]) + + @slowtest + def test_reader_hold(self): + lock = rwlock.RWLock() + log = [] + with self.assertElapsed(0.1, 0.05): + start_thread(reader, lock, log, hold=0.1).join() + self.assertEqual(log, ["reader acquired"]) + + +def writer(lock, log, timeout=1, hold=0): + try: + lock.acquire_write(timeout) + except rwlock.Timeout: + log.append("writer timeout") + else: + try: + log.append("writer acquired") + time.sleep(hold) + finally: + lock.release() + + +def reader(lock, log, timeout=1, hold=0): + try: + lock.acquire_read(timeout) + except rwlock.Timeout: + log.append("reader timeout") + else: + try: + log.append("reader acquired") + time.sleep(hold) + finally: + lock.release() + + +def start_thread(func, *args, **kwargs): + t = threading.Thread(target=func, args=args, kwargs=kwargs) + t.daemon = True + t.start() + return t diff --git a/vdsm.spec.in b/vdsm.spec.in index 2b41835..35a9670 100644 --- a/vdsm.spec.in +++ b/vdsm.spec.in @@ -1263,6 +1263,7 @@ %{python_sitelib}/%{vdsm_name}/pthread.py* %{python_sitelib}/%{vdsm_name}/qemuimg.py* %{python_sitelib}/%{vdsm_name}/response.py* +%{python_sitelib}/%{vdsm_name}/rwlock.py* %{python_sitelib}/%{vdsm_name}/netconfpersistence.py* %{python_sitelib}/%{vdsm_name}/schedule.py* %{python_sitelib}/%{vdsm_name}/sslutils.py* -- To view, visit https://gerrit.ovirt.org/42849 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I87d0ad33c9d8bf8b93a1ce9a9fed86a4affef089 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Nir Soffer <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
