This is an automated email from the ASF dual-hosted git repository. nanda pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 9700e20 HDDS-2223. Support ReadWrite lock in LockManager. (#1564) 9700e20 is described below commit 9700e2003aa1b7e2c4072a2a08d8827acc5aa779 Author: Nanda kumar <na...@apache.org> AuthorDate: Fri Oct 4 08:32:43 2019 +0530 HDDS-2223. Support ReadWrite lock in LockManager. (#1564) --- .../org/apache/hadoop/ozone/lock/ActiveLock.java | 63 ++++++-- .../org/apache/hadoop/ozone/lock/LockManager.java | 166 ++++++++++++++++++--- .../apache/hadoop/ozone/lock/TestLockManager.java | 145 +++++++++++++++--- 3 files changed, 323 insertions(+), 51 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java index c302084..49efad0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java @@ -18,22 +18,22 @@ package org.apache.hadoop.ozone.lock; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Lock implementation which also maintains counter. */ public final class ActiveLock { - private Lock lock; + private ReadWriteLock lock; private AtomicInteger count; /** * Use ActiveLock#newInstance to create instance. */ private ActiveLock() { - this.lock = new ReentrantLock(); + this.lock = new ReentrantReadWriteLock(); this.count = new AtomicInteger(0); } @@ -47,21 +47,58 @@ public final class ActiveLock { } /** - * Acquires the lock. + * Acquires read lock. * - * <p>If the lock is not available then the current thread becomes - * disabled for thread scheduling purposes and lies dormant until the - * lock has been acquired. + * <p>Acquires the read lock if the write lock is not held by + * another thread and returns immediately. + * + * <p>If the write lock is held by another thread then + * the current thread becomes disabled for thread scheduling + * purposes and lies dormant until the read lock has been acquired. + */ + void readLock() { + lock.readLock().lock(); + } + + /** + * Attempts to release the read lock. + * + * <p>If the number of readers is now zero then the lock + * is made available for write lock attempts. + */ + void readUnlock() { + lock.readLock().unlock(); + } + + /** + * Acquires write lock. + * + * <p>Acquires the write lock if neither the read nor write lock + * are held by another thread + * and returns immediately, setting the write lock hold count to + * one. + * + * <p>If the current thread already holds the write lock then the + * hold count is incremented by one and the method returns + * immediately. + * + * <p>If the lock is held by another thread then the current + * thread becomes disabled for thread scheduling purposes and + * lies dormant until the write lock has been acquired. */ - public void lock() { - lock.lock(); + void writeLock() { + lock.writeLock().lock(); } /** - * Releases the lock. + * Attempts to release the write lock. + * + * <p>If the current thread is the holder of this lock then + * the hold count is decremented. If the hold count is now + * zero then the lock is released. */ - public void unlock() { - lock.unlock(); + void writeUnlock() { + lock.writeLock().unlock(); } /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java index 5f76bd6..670d4d1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java @@ -25,42 +25,156 @@ import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; /** * Manages the locks on a given resource. A new lock is created for each * and every unique resource. Uniqueness of resource depends on the * {@code equals} implementation of it. */ -public class LockManager<T> { +public class LockManager<R> { private static final Logger LOG = LoggerFactory.getLogger(LockManager.class); - private final Map<T, ActiveLock> activeLocks = new ConcurrentHashMap<>(); + private final Map<R, ActiveLock> activeLocks = new ConcurrentHashMap<>(); private final GenericObjectPool<ActiveLock> lockPool = new GenericObjectPool<>(new PooledLockFactory()); /** - * Creates new LockManager instance. + * Creates new LockManager instance with the given Configuration. * * @param conf Configuration object */ - public LockManager(Configuration conf) { - int maxPoolSize = conf.getInt(HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY, + public LockManager(final Configuration conf) { + final int maxPoolSize = conf.getInt( + HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY, HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY_DEFAULT); lockPool.setMaxTotal(maxPoolSize); } - /** * Acquires the lock on given resource. * * <p>If the lock is not available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until the * lock has been acquired. + * + * @param resource on which the lock has to be acquired + * @deprecated Use {@link LockManager#writeLock} instead + */ + public void lock(final R resource) { + writeLock(resource); + } + + /** + * Releases the lock on given resource. + * + * @param resource for which the lock has to be released + * @deprecated Use {@link LockManager#writeUnlock} instead + */ + public void unlock(final R resource) { + writeUnlock(resource); + } + + /** + * Acquires the read lock on given resource. + * + * <p>Acquires the read lock on resource if the write lock is not held by + * another thread and returns immediately. + * + * <p>If the write lock on resource is held by another thread then + * the current thread becomes disabled for thread scheduling + * purposes and lies dormant until the read lock has been acquired. + * + * @param resource on which the read lock has to be acquired + */ + public void readLock(final R resource) { + acquire(resource, ActiveLock::readLock); + } + + /** + * Releases the read lock on given resource. + * + * @param resource for which the read lock has to be released + * @throws IllegalMonitorStateException if the current thread does not + * hold this lock + */ + public void readUnlock(final R resource) throws IllegalMonitorStateException { + release(resource, ActiveLock::readUnlock); + } + + /** + * Acquires the write lock on given resource. + * + * <p>Acquires the write lock on resource if neither the read nor write lock + * are held by another thread and returns immediately. + * + * <p>If the current thread already holds the write lock then the + * hold count is incremented by one and the method returns + * immediately. + * + * <p>If the lock is held by another thread then the current + * thread becomes disabled for thread scheduling purposes and + * lies dormant until the write lock has been acquired. + * + * @param resource on which the lock has to be acquired */ - public void lock(T resource) { - activeLocks.compute(resource, (k, v) -> { - ActiveLock lock; + public void writeLock(final R resource) { + acquire(resource, ActiveLock::writeLock); + } + + /** + * Releases the write lock on given resource. + * + * @param resource for which the lock has to be released + * @throws IllegalMonitorStateException if the current thread does not + * hold this lock + */ + public void writeUnlock(final R resource) + throws IllegalMonitorStateException { + release(resource, ActiveLock::writeUnlock); + } + + /** + * Acquires the lock on given resource using the provided lock function. + * + * @param resource on which the lock has to be acquired + * @param lockFn function to acquire the lock + */ + private void acquire(final R resource, final Consumer<ActiveLock> lockFn) { + lockFn.accept(getLockForLocking(resource)); + } + + /** + * Releases the lock on given resource using the provided release function. + * + * @param resource for which the lock has to be released + * @param releaseFn function to release the lock + */ + private void release(final R resource, final Consumer<ActiveLock> releaseFn) { + final ActiveLock lock = getLockForReleasing(resource); + releaseFn.accept(lock); + decrementActiveLockCount(resource); + } + + /** + * Returns {@link ActiveLock} instance for the given resource, + * on which the lock can be acquired. + * + * @param resource on which the lock has to be acquired + * @return {@link ActiveLock} instance + */ + private ActiveLock getLockForLocking(final R resource) { + /* + * While getting a lock object for locking we should + * atomically increment the active count of the lock. + * + * This is to avoid cases where the selected lock could + * be removed from the activeLocks map and returned to + * the object pool. + */ + return activeLocks.compute(resource, (k, v) -> { + final ActiveLock lock; try { if (v == null) { lock = lockPool.borrowObject(); @@ -73,22 +187,34 @@ public class LockManager<T> { throw new RuntimeException(ex); } return lock; - }).lock(); + }); } /** - * Releases the lock on given resource. + * Returns {@link ActiveLock} instance for the given resource, + * for which the lock has to be released. + * + * @param resource for which the lock has to be released + * @return {@link ActiveLock} instance */ - public void unlock(T resource) { - ActiveLock lock = activeLocks.get(resource); - if (lock == null) { - // Someone is releasing a lock which was never acquired. Log and return. - LOG.error("Trying to release the lock on {}, which was never acquired.", - resource); - throw new IllegalMonitorStateException("Releasing lock on resource " - + resource + " without acquiring lock"); + private ActiveLock getLockForReleasing(final R resource) { + if (activeLocks.containsKey(resource)) { + return activeLocks.get(resource); } - lock.unlock(); + // Someone is releasing a lock which was never acquired. + LOG.error("Trying to release the lock on {}, which was never acquired.", + resource); + throw new IllegalMonitorStateException("Releasing lock on resource " + + resource + " without acquiring lock"); + } + + /** + * Decrements the active lock count and returns the {@link ActiveLock} + * object to pool if the active count is 0. + * + * @param resource resource to which the ActiveLock is associated + */ + private void decrementActiveLockCount(final R resource) { activeLocks.computeIfPresent(resource, (k, v) -> { v.decrementActiveCount(); if (v.getActiveLockCount() != 0) { diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java index fa3030d..e88b1bb 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java @@ -29,34 +29,143 @@ import java.util.concurrent.atomic.AtomicBoolean; public class TestLockManager { @Test(timeout = 1000) - public void testWithDifferentResource() { - LockManager<String> manager = new LockManager<>(new OzoneConfiguration()); - manager.lock("/resourceOne"); + public void testWriteLockWithDifferentResource() { + final LockManager<String> manager = + new LockManager<>(new OzoneConfiguration()); + manager.writeLock("/resourceOne"); // This should work, as they are different resource. - manager.lock("/resourceTwo"); - manager.unlock("/resourceOne"); - manager.unlock("/resourceTwo"); + manager.writeLock("/resourceTwo"); + manager.writeUnlock("/resourceOne"); + manager.writeUnlock("/resourceTwo"); Assert.assertTrue(true); } @Test - public void testWithSameResource() throws Exception { - LockManager<String> manager = new LockManager<>(new OzoneConfiguration()); - manager.lock("/resourceOne"); - AtomicBoolean gotLock = new AtomicBoolean(false); + public void testWriteLockWithSameResource() throws Exception { + final LockManager<String> manager = + new LockManager<>(new OzoneConfiguration()); + final AtomicBoolean gotLock = new AtomicBoolean(false); + manager.writeLock("/resourceOne"); new Thread(() -> { - manager.lock("/resourceOne"); + manager.writeLock("/resourceOne"); gotLock.set(true); - manager.unlock("/resourceOne"); + manager.writeUnlock("/resourceOne"); }).start(); - // Let's give some time for the new thread to run + // Let's give some time for the other thread to run Thread.sleep(100); - // Since the new thread is trying to get lock on same object, it will wait. + // Since the other thread is trying to get write lock on same object, + // it will wait. Assert.assertFalse(gotLock.get()); - manager.unlock("/resourceOne"); - // Since we have released the lock, the new thread should have the lock - // now - // Let's give some time for the new thread to run + manager.writeUnlock("/resourceOne"); + // Since we have released the write lock, the other thread should have + // the lock now + // Let's give some time for the other thread to run + Thread.sleep(100); + Assert.assertTrue(gotLock.get()); + } + + @Test(timeout = 1000) + public void testReadLockWithDifferentResource() { + final LockManager<String> manager = + new LockManager<>(new OzoneConfiguration()); + manager.readLock("/resourceOne"); + manager.readLock("/resourceTwo"); + manager.readUnlock("/resourceOne"); + manager.readUnlock("/resourceTwo"); + Assert.assertTrue(true); + } + + @Test + public void testReadLockWithSameResource() throws Exception { + final LockManager<String> manager = + new LockManager<>(new OzoneConfiguration()); + final AtomicBoolean gotLock = new AtomicBoolean(false); + manager.readLock("/resourceOne"); + new Thread(() -> { + manager.readLock("/resourceOne"); + gotLock.set(true); + manager.readUnlock("/resourceOne"); + }).start(); + // Let's give some time for the other thread to run + Thread.sleep(100); + // Since the new thread is trying to get read lock, it should work. + Assert.assertTrue(gotLock.get()); + manager.readUnlock("/resourceOne"); + } + + @Test + public void testWriteReadLockWithSameResource() throws Exception { + final LockManager<String> manager = + new LockManager<>(new OzoneConfiguration()); + final AtomicBoolean gotLock = new AtomicBoolean(false); + manager.writeLock("/resourceOne"); + new Thread(() -> { + manager.readLock("/resourceOne"); + gotLock.set(true); + manager.readUnlock("/resourceOne"); + }).start(); + // Let's give some time for the other thread to run + Thread.sleep(100); + // Since the other thread is trying to get read lock on same object, + // it will wait. + Assert.assertFalse(gotLock.get()); + manager.writeUnlock("/resourceOne"); + // Since we have released the write lock, the other thread should have + // the lock now + // Let's give some time for the other thread to run + Thread.sleep(100); + Assert.assertTrue(gotLock.get()); + } + + @Test + public void testReadWriteLockWithSameResource() throws Exception { + final LockManager<String> manager = + new LockManager<>(new OzoneConfiguration()); + final AtomicBoolean gotLock = new AtomicBoolean(false); + manager.readLock("/resourceOne"); + new Thread(() -> { + manager.writeLock("/resourceOne"); + gotLock.set(true); + manager.writeUnlock("/resourceOne"); + }).start(); + // Let's give some time for the other thread to run + Thread.sleep(100); + // Since the other thread is trying to get write lock on same object, + // it will wait. + Assert.assertFalse(gotLock.get()); + manager.readUnlock("/resourceOne"); + // Since we have released the read lock, the other thread should have + // the lock now + // Let's give some time for the other thread to run + Thread.sleep(100); + Assert.assertTrue(gotLock.get()); + } + + @Test + public void testMultiReadWriteLockWithSameResource() throws Exception { + final LockManager<String> manager = + new LockManager<>(new OzoneConfiguration()); + final AtomicBoolean gotLock = new AtomicBoolean(false); + manager.readLock("/resourceOne"); + manager.readLock("/resourceOne"); + new Thread(() -> { + manager.writeLock("/resourceOne"); + gotLock.set(true); + manager.writeUnlock("/resourceOne"); + }).start(); + // Let's give some time for the other thread to run + Thread.sleep(100); + // Since the other thread is trying to get write lock on same object, + // it will wait. + Assert.assertFalse(gotLock.get()); + manager.readUnlock("/resourceOne"); + //We have only released one read lock, we still hold another read lock. + Thread.sleep(100); + Assert.assertFalse(gotLock.get()); + manager.readUnlock("/resourceOne"); + // Since we have released the read lock, the other thread should have + // the lock now + // Let's give some time for the other thread to run Thread.sleep(100); Assert.assertTrue(gotLock.get()); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org