This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 4b7ec37fdfab95f569c2147acb58bb572a41f01d
Author: xyuanlu <xyua...@gmail.com>
AuthorDate: Wed Jul 24 12:23:02 2024 -0700

    Gateway util for per key lock and per key blocking queue executor(#2847)
    
    Gateway util for per key lock and per key blocking queue executor
---
 .../helix/gateway/util/PerKeyBlockingExecutor.java | 80 ++++++++++++++++++++++
 .../helix/gateway/util/PerKeyLockRegistry.java     | 51 ++++++++++++++
 .../helix/gateway/TestPerKeyBlockingExecutor.java  | 44 ++++++++++++
 .../helix/gateway/TestPerKeyLockRegistry.java      | 55 +++++++++++++++
 4 files changed, 230 insertions(+)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
new file mode 100644
index 000000000..7953996e9
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
@@ -0,0 +1,80 @@
+package org.apache.helix.gateway.util;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * A per-key blocking executor that ensures that only one event is running for 
a given key at a time.
+ */
+public class PerKeyBlockingExecutor {
+  private final ThreadPoolExecutor _executor;
+  private final Map<String, Queue<Runnable>> _pendingBlockedEvents;
+  private final ConcurrentHashMap.KeySetView<String, Boolean> _runningEvents;
+  private final Lock _queueLock;
+
+  public PerKeyBlockingExecutor(int maxWorkers) {
+    this._executor = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(maxWorkers);
+    this._pendingBlockedEvents = new HashMap<>();
+    this._queueLock = new ReentrantLock();
+    this._runningEvents = ConcurrentHashMap.newKeySet();
+  }
+
+  /**
+   * Offer an event to be executed. If an event is already running for the 
given key, the event will be queued.
+   * @param key
+   * @param event
+   */
+  public void offerEvent(String key, Runnable event) {
+    _queueLock.lock();
+    try {
+      if (!_runningEvents.contains(key)) {
+        _executor.execute(() -> runEvent(key, event));
+      } else {
+        _pendingBlockedEvents.computeIfAbsent(key, k -> new 
ConcurrentLinkedQueue<>());
+        _pendingBlockedEvents.get(key).offer(event);
+      }
+    } finally {
+      _queueLock.unlock();
+    }
+  }
+
+  private void runEvent(String key, Runnable event) {
+    try {
+      _runningEvents.add(key);
+      event.run();
+    } finally {
+      _queueLock.lock();
+      try {
+        _runningEvents.remove(key);
+        processQueue(key);
+      } finally {
+        _queueLock.unlock();
+      }
+    }
+  }
+
+  private void processQueue(String key) {
+    if (!_pendingBlockedEvents.containsKey(key)) {
+      return;
+    }
+    Runnable event = _pendingBlockedEvents.get(key).poll();
+    if (event != null) {
+      _executor.execute(() -> runEvent(key, event));
+    }
+  }
+
+  public void shutdown() {
+    _executor.shutdown();
+  }
+
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyLockRegistry.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyLockRegistry.java
new file mode 100644
index 000000000..5bfc3c949
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyLockRegistry.java
@@ -0,0 +1,51 @@
+package org.apache.helix.gateway.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A registry that manages locks per key.
+ */
+public class PerKeyLockRegistry {
+  private final ConcurrentHashMap<String, ReentrantLock> lockMap = new 
ConcurrentHashMap<>();
+
+  public void lock(String key) {
+    ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new 
ReentrantLock());
+    lock.lock();
+  }
+
+  public void unlock(String key) {
+    ReentrantLock lock = lockMap.get(key);
+    if (lock != null) {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Execute the action with the lock on the key
+   * @param key
+   * @param action
+   */
+  public void withLock(String key, Runnable action) {
+    lock(key);
+    try {
+      action.run();
+    } finally {
+      unlock(key);
+    }
+  }
+
+  /**
+   * Remove the lock if it is not being used.
+   * it must be called after the lock is required
+   * @param key
+   */
+  public boolean removeLock(String key) {
+    ReentrantLock lock = lockMap.get(key);
+    if (lock != null && lock.isHeldByCurrentThread() && 
!lock.hasQueuedThreads()) {
+      lockMap.remove(key, lock);
+      return true;
+    }
+    return false;
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java
new file mode 100644
index 000000000..457002cbd
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java
@@ -0,0 +1,44 @@
+package org.apache.helix.gateway;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+
+public class TestPerKeyBlockingExecutor {
+  @Test
+  public void testEventNotAddedIfPending() throws InterruptedException {
+    CountDownLatch latch1 = new CountDownLatch(1);
+    CountDownLatch latch2 = new CountDownLatch(1);
+    CountDownLatch latch3 = new CountDownLatch(1);
+
+    PerKeyBlockingExecutor perKeyBlockingExecutor = new 
PerKeyBlockingExecutor(3);
+
+    perKeyBlockingExecutor.offerEvent("key1", () -> {
+      try {
+        latch1.await(); // Wait for the test to release this latch
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    });
+
+    perKeyBlockingExecutor.offerEvent("key1", () -> {
+      latch2.countDown();
+    });
+
+    Thread.sleep(100); // Give time for the second event to be potentially 
processed
+
+    Assert.assertFalse(latch2.await(100, TimeUnit.MILLISECONDS)); // Event 2 
should not run yet
+    latch1.countDown(); // Release the first latch
+    Assert.assertTrue(latch2.await(1, TimeUnit.SECONDS)); // Event 2 should 
run now
+
+    perKeyBlockingExecutor.offerEvent("key1", () -> {
+      latch3.countDown();
+    });
+
+    Assert.assertTrue(latch3.await(1, TimeUnit.SECONDS)); // Event 3 should 
run after Event 2
+    perKeyBlockingExecutor.shutdown();
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java
new file mode 100644
index 000000000..89d2e0bc1
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java
@@ -0,0 +1,55 @@
+package org.apache.helix.gateway;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.gateway.util.PerKeyLockRegistry;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+
+public class TestPerKeyLockRegistry {
+  @Test
+  public void testConcurrentAccess() {
+    PerKeyLockRegistry lockRegistry = new PerKeyLockRegistry();
+    final AtomicInteger counter = new AtomicInteger(0);
+    final CountDownLatch startLatch = new CountDownLatch(1);
+    final CountDownLatch doneLatch = new CountDownLatch(2);
+
+    lockRegistry.withLock("key1", () -> {
+      counter.incrementAndGet();
+      // try to acquir the lock for another key
+      lockRegistry.withLock("key2", () -> {
+        counter.incrementAndGet();
+      });
+    });
+
+    // counter should be 2
+    Assert.assertEquals(2, counter.get());
+
+    // acquire the lock for key
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+    lockRegistry.lock("key1");
+    executor.submit(() -> {
+      lockRegistry.withLock("key1", () -> {
+        //try remove lock
+        Assert.assertFalse(lockRegistry.removeLock("key1"));
+      });
+    });
+    lockRegistry.unlock("key1");
+    executor.submit(() -> {
+      lockRegistry.withLock("key2", () -> {
+        //try remove lock, should fail because key1 is not locked
+        Assert.assertFalse(lockRegistry.removeLock("key1"));
+      });
+    });
+    executor.submit(() -> {
+      lockRegistry.withLock("key1", () -> {
+        //try remove lock, only this tiem it succeded
+        Assert.assertFalse(lockRegistry.removeLock("key1"));
+      });
+    });
+    executor.shutdown();
+  }
+}

Reply via email to