This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 79412ed2a63 [HUDI-6186] Fix lock identity in InProcessLockProvider (#8658) 79412ed2a63 is described below commit 79412ed2a634fb188da402370694c2baef7d1dfc Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Sun May 7 20:53:06 2023 -0700 [HUDI-6186] Fix lock identity in InProcessLockProvider (#8658) This commit fixes a bug introduced by #6847. #6847 extends the InProcessLockProvider to support multiple tables in the same process, by having an in-memory static final map storing the mapping of the table base path to the read-write reentrant lock, so that the writer uses the corresponding lock based on the base path. When closing the lock provider, close() removes the lock entry. Since close() is called when closing the write client, the lock is removed and subsequent concurrent wri [...] --- .../transaction/lock/InProcessLockProvider.java | 1 - .../transaction/TestInProcessLockProvider.java | 117 +++++++++++++++++++++ 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java index 9eab061ddfa..8e57190d1a9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java @@ -124,7 +124,6 @@ public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLoc lock.writeLock().unlock(); } LOG.info(getLogMessage(LockState.ALREADY_RELEASED)); - LOCK_INSTANCE_PER_BASEPATH.remove(basePath); } private String getLogMessage(LockState state) { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java index 8d39b8b5f2d..d1d43d7f3ae 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java @@ -24,12 +24,15 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieLockException; +import junit.framework.AssertionFailedError; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -52,6 +55,120 @@ public class TestInProcessLockProvider { lockConfiguration2 = new LockConfiguration(properties); } + @Test + public void testLockIdentity() throws InterruptedException { + // The lifecycle of an InProcessLockProvider should not affect the singleton lock + // for a single table, i.e., all three writers should hold the same underlying lock instance + // on the same table. + // Writer 1: lock |----------------| unlock and close + // Writer 2: try lock | ... lock |------| unlock and close + // Writer 3: try lock | ... lock |------| unlock and close + List<InProcessLockProvider> lockProviderList = new ArrayList<>(); + InProcessLockProvider lockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + lockProviderList.add(lockProvider1); + AtomicBoolean writer1Completed = new AtomicBoolean(false); + AtomicBoolean writer2TryLock = new AtomicBoolean(false); + AtomicBoolean writer2Locked = new AtomicBoolean(false); + AtomicBoolean writer2Completed = new AtomicBoolean(false); + AtomicBoolean writer3TryLock = new AtomicBoolean(false); + AtomicBoolean writer3Completed = new AtomicBoolean(false); + + // Writer 1 + assertDoesNotThrow(() -> { + LOG.info("Writer 1 tries to acquire the lock."); + lockProvider1.lock(); + LOG.info("Writer 1 acquires the lock."); + }); + // Writer 2 thread in parallel, should block + // and later acquire the lock once it is released + Thread writer2 = new Thread(() -> { + InProcessLockProvider lockProvider2 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + lockProviderList.add(lockProvider2); + assertDoesNotThrow(() -> { + LOG.info("Writer 2 tries to acquire the lock."); + writer2TryLock.set(true); + lockProvider2.lock(); + LOG.info("Writer 2 acquires the lock."); + }); + writer2Locked.set(true); + + while (!writer3TryLock.get()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + assertDoesNotThrow(() -> { + lockProvider2.unlock(); + LOG.info("Writer 2 releases the lock."); + }); + lockProvider2.close(); + LOG.info("Writer 2 closes the lock provider."); + writer2Completed.set(true); + }); + + Thread writer3 = new Thread(() -> { + while (!writer2Locked.get() || !writer1Completed.get()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + // Lock instance of Writer 3 should be held by Writer 2 + InProcessLockProvider lockProvider3 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + lockProviderList.add(lockProvider3); + boolean isLocked = lockProvider3.getLock().isWriteLocked(); + if (!isLocked) { + writer3TryLock.set(true); + throw new AssertionFailedError("The lock instance in Writer 3 should be held by Writer 2: " + + lockProvider3.getLock()); + } + assertDoesNotThrow(() -> { + LOG.info("Writer 3 tries to acquire the lock."); + writer3TryLock.set(true); + lockProvider3.lock(); + LOG.info("Writer 3 acquires the lock."); + }); + + assertDoesNotThrow(() -> { + lockProvider3.unlock(); + LOG.info("Writer 3 releases the lock."); + }); + lockProvider3.close(); + LOG.info("Writer 3 closes the lock provider."); + writer3Completed.set(true); + }); + + writer2.start(); + writer3.start(); + + while (!writer2TryLock.get()) { + Thread.sleep(100); + } + + assertDoesNotThrow(() -> { + lockProvider1.unlock(); + LOG.info("Writer 1 releases the lock."); + lockProvider1.close(); + LOG.info("Writer 1 closes the lock provider."); + writer1Completed.set(true); + }); + + try { + writer2.join(); + writer3.join(); + } catch (InterruptedException e) { + // Ignore any exception + } + Assertions.assertTrue(writer2Completed.get()); + Assertions.assertTrue(writer3Completed.get()); + Assertions.assertEquals(lockProviderList.get(0).getLock(), lockProviderList.get(1).getLock()); + Assertions.assertEquals(lockProviderList.get(1).getLock(), lockProviderList.get(2).getLock()); + } + @Test public void testLockAcquisition() { InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);