This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 79412ed2a63 [HUDI-6186] Fix lock identity in InProcessLockProvider 
(#8658)
79412ed2a63 is described below

commit 79412ed2a634fb188da402370694c2baef7d1dfc
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Sun May 7 20:53:06 2023 -0700

    [HUDI-6186] Fix lock identity in InProcessLockProvider (#8658)
    
    This commit fixes a bug introduced by #6847. #6847 extends the 
InProcessLockProvider to support multiple tables in the same process, by having 
an in-memory static final map storing the mapping of the table base path to the 
read-write reentrant lock, so that the writer uses the corresponding lock based 
on the base path. When closing the lock provider, close() removes the lock 
entry. Since close() is called when closing the write client, the lock is 
removed and subsequent concurrent wri [...]
---
 .../transaction/lock/InProcessLockProvider.java    |   1 -
 .../transaction/TestInProcessLockProvider.java     | 117 +++++++++++++++++++++
 2 files changed, 117 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
index 9eab061ddfa..8e57190d1a9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
@@ -124,7 +124,6 @@ public class InProcessLockProvider implements 
LockProvider<ReentrantReadWriteLoc
       lock.writeLock().unlock();
     }
     LOG.info(getLogMessage(LockState.ALREADY_RELEASED));
-    LOCK_INSTANCE_PER_BASEPATH.remove(basePath);
   }
 
   private String getLogMessage(LockState state) {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
index 8d39b8b5f2d..d1d43d7f3ae 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
@@ -24,12 +24,15 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieLockException;
 
+import junit.framework.AssertionFailedError;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,6 +55,120 @@ public class TestInProcessLockProvider {
     lockConfiguration2 = new LockConfiguration(properties);
   }
 
+  @Test
+  public void testLockIdentity() throws InterruptedException {
+    // The lifecycle of an InProcessLockProvider should not affect the 
singleton lock
+    // for a single table, i.e., all three writers should hold the same 
underlying lock instance
+    // on the same table.
+    // Writer 1:   lock |----------------| unlock and close
+    // Writer 2:   try lock   |      ...    lock |------| unlock and close
+    // Writer 3:                          try lock  | ...  lock |------| 
unlock and close
+    List<InProcessLockProvider> lockProviderList = new ArrayList<>();
+    InProcessLockProvider lockProvider1 = new 
InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
+    lockProviderList.add(lockProvider1);
+    AtomicBoolean writer1Completed = new AtomicBoolean(false);
+    AtomicBoolean writer2TryLock = new AtomicBoolean(false);
+    AtomicBoolean writer2Locked = new AtomicBoolean(false);
+    AtomicBoolean writer2Completed = new AtomicBoolean(false);
+    AtomicBoolean writer3TryLock = new AtomicBoolean(false);
+    AtomicBoolean writer3Completed = new AtomicBoolean(false);
+
+    // Writer 1
+    assertDoesNotThrow(() -> {
+      LOG.info("Writer 1 tries to acquire the lock.");
+      lockProvider1.lock();
+      LOG.info("Writer 1 acquires the lock.");
+    });
+    // Writer 2 thread in parallel, should block
+    // and later acquire the lock once it is released
+    Thread writer2 = new Thread(() -> {
+      InProcessLockProvider lockProvider2 = new 
InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
+      lockProviderList.add(lockProvider2);
+      assertDoesNotThrow(() -> {
+        LOG.info("Writer 2 tries to acquire the lock.");
+        writer2TryLock.set(true);
+        lockProvider2.lock();
+        LOG.info("Writer 2 acquires the lock.");
+      });
+      writer2Locked.set(true);
+
+      while (!writer3TryLock.get()) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+
+      assertDoesNotThrow(() -> {
+        lockProvider2.unlock();
+        LOG.info("Writer 2 releases the lock.");
+      });
+      lockProvider2.close();
+      LOG.info("Writer 2 closes the lock provider.");
+      writer2Completed.set(true);
+    });
+
+    Thread writer3 = new Thread(() -> {
+      while (!writer2Locked.get() || !writer1Completed.get()) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      // Lock instance of Writer 3 should be held by Writer 2
+      InProcessLockProvider lockProvider3 = new 
InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
+      lockProviderList.add(lockProvider3);
+      boolean isLocked = lockProvider3.getLock().isWriteLocked();
+      if (!isLocked) {
+        writer3TryLock.set(true);
+        throw new AssertionFailedError("The lock instance in Writer 3 should 
be held by Writer 2: "
+            + lockProvider3.getLock());
+      }
+      assertDoesNotThrow(() -> {
+        LOG.info("Writer 3 tries to acquire the lock.");
+        writer3TryLock.set(true);
+        lockProvider3.lock();
+        LOG.info("Writer 3 acquires the lock.");
+      });
+
+      assertDoesNotThrow(() -> {
+        lockProvider3.unlock();
+        LOG.info("Writer 3 releases the lock.");
+      });
+      lockProvider3.close();
+      LOG.info("Writer 3 closes the lock provider.");
+      writer3Completed.set(true);
+    });
+
+    writer2.start();
+    writer3.start();
+
+    while (!writer2TryLock.get()) {
+      Thread.sleep(100);
+    }
+
+    assertDoesNotThrow(() -> {
+      lockProvider1.unlock();
+      LOG.info("Writer 1 releases the lock.");
+      lockProvider1.close();
+      LOG.info("Writer 1 closes the lock provider.");
+      writer1Completed.set(true);
+    });
+
+    try {
+      writer2.join();
+      writer3.join();
+    } catch (InterruptedException e) {
+      // Ignore any exception
+    }
+    Assertions.assertTrue(writer2Completed.get());
+    Assertions.assertTrue(writer3Completed.get());
+    Assertions.assertEquals(lockProviderList.get(0).getLock(), 
lockProviderList.get(1).getLock());
+    Assertions.assertEquals(lockProviderList.get(1).getLock(), 
lockProviderList.get(2).getLock());
+  }
+
   @Test
   public void testLockAcquisition() {
     InProcessLockProvider inProcessLockProvider = new 
InProcessLockProvider(lockConfiguration1, hadoopConfiguration);

Reply via email to