This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 14283032f2897cef581fbe9473e304e0bbc2e012
Author: Marcos Rico Peng <[email protected]>
AuthorDate: Fri Jun 23 19:12:23 2023 -0400

    Lattice MetaClient Distributed Semaphore Implementation (#2515)
    
    Co-authored-by: mapeng <[email protected]>
---
 .../recipes/lock/DistributedSemaphore.java         | 123 ++++++++++++++++++---
 .../helix/metaclient/recipes/lock/LockClient.java  |   8 +-
 .../recipes/lock/DistributedSemaphoreTest.java     | 105 ++++++++++++++++++
 3 files changed, 221 insertions(+), 15 deletions(-)

diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java
index 0cccb130f..7b16e78fd 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java
@@ -22,18 +22,45 @@ package org.apache.helix.metaclient.recipes.lock;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = 
LoggerFactory.getLogger(DistributedSemaphore.class);
 
   /**
    * Create a distributed semaphore client with the given configuration.
    * @param config configuration of the client
    */
   public DistributedSemaphore(MetaClientConfig config) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (config == null) {
+      throw new MetaClientException("Configuration cannot be null");
+    }
+    LOG.info("Creating DistributedSemaphore Client");
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new 
ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only 
support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new 
ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + 
config.getStoreType());
+    }
   }
 
   /**
@@ -41,7 +68,17 @@ public class DistributedSemaphore {
    * @param client client to connect to
    */
   public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (client == null) {
+      throw new MetaClientException("Client cannot be null");
+    }
+    LOG.info("Connecting to existing DistributedSemaphore Client");
+    _metaClient = client;
+    try {
+      _metaClient.connect();
+      // TODO: Differentiate exception catch between already connected and 
already closed.
+    } catch (IllegalStateException e) {
+      // Ignore as it either has already been connected or already been closed.
+    }
   }
 
   /**
@@ -50,7 +87,22 @@ public class DistributedSemaphore {
    * @param capacity capacity of the semaphore
    */
   public void createSemaphore(String path, int capacity) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (capacity <= 0) {
+      throw new MetaClientException("Capacity must be positive");
+    }
+    if (path == null || path.isEmpty()) {
+      throw new MetaClientException("Invalid path to create semaphore");
+    }
+    if (_metaClient.exists(path) != null) {
+      throw new MetaClientException("Semaphore already exists");
+    }
+    if (_metaClient.exists(path) == null) {
+      DataRecord dataRecord = new DataRecord(path);
+      dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity);
+      dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity);
+      _metaClient.create(path, dataRecord);
+      _path = path;
+    }
   }
 
   /**
@@ -58,7 +110,13 @@ public class DistributedSemaphore {
    * @param path path of the semaphore
    */
   public void connectSemaphore(String path) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (path == null || path.isEmpty()) {
+      throw new MetaClientException("Invalid path to connect semaphore");
+    }
+    if (_metaClient.exists(path) == null) {
+      throw new MetaClientException("Semaphore does not exist");
+    }
+    _path = path;
   }
 
   /**
@@ -66,7 +124,13 @@ public class DistributedSemaphore {
    * @return a permit
    */
   public Permit acquire() {
-    throw new NotImplementedException("Not implemented yet.");
+    try {
+      updateAcquirePermit(1);
+      return retrievePermit(_path);
+    } catch (MetaClientException e) {
+      LOG.error("Failed to acquire permit.", e);
+      return null;
+    }
   }
 
 
@@ -76,7 +140,17 @@ public class DistributedSemaphore {
    * @return a collection of permits
    */
   public Collection<Permit> acquire(int count) {
-    throw new NotImplementedException("Not implemented yet.");
+    try {
+      updateAcquirePermit(count);
+      Collection<Permit> permits = new ArrayList<>();
+      for (int i = 0; i < count; i++) {
+        permits.add(retrievePermit(_path));
+      }
+      return permits;
+    } catch (MetaClientException e) {
+      LOG.error("Failed to acquire permits.", e);
+      return null;
+    }
   }
 
   /**
@@ -96,7 +170,7 @@ public class DistributedSemaphore {
    * @return remaining capacity
    */
   public long getRemainingCapacity() {
-    throw new NotImplementedException("Not implemented yet.");
+    return getSemaphore().getLongField(REMAINING_CAPACITY_NAME, 
DEFAULT_REMAINING_CAPACITY);
   }
 
   /**
@@ -104,14 +178,22 @@ public class DistributedSemaphore {
    * @return semaphore data record
    */
   private DataRecord getSemaphore() {
-    throw new NotImplementedException("Not implemented yet.");
+    if (_metaClient.exists(_path) == null) {
+      throw new MetaClientException("Semaphore does not exist at path: " + 
_path + ". Please create it first.");
+    }
+    return new DataRecord(_metaClient.get(_path));
   }
 
   /**
    * Return a permit. If the permit is already returned, log and return void.
    */
   public void returnPermit(Permit permit) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (permit.isReleased()) {
+      LOG.info("The permit has already been released");
+    } else {
+      updateReturnPermit();
+      permit.releasePermit();
+    }
   }
 
   /**
@@ -119,7 +201,9 @@ public class DistributedSemaphore {
    * log and return void.
    */
   public void returnAllPermits(Collection<Permit> permits) {
-    throw new NotImplementedException("Not implemented yet.");
+    for (Permit permit : permits) {
+      returnPermit(permit);
+    }
   }
 
   /**
@@ -128,7 +212,8 @@ public class DistributedSemaphore {
    * @return a permit
    */
   private Permit retrievePermit(String path) {
-    throw new NotImplementedException("Not implemented yet.");
+    MetaClientInterface.Stat stat = _metaClient.exists(path);
+    return new Permit(getSemaphore(), stat);
   }
 
   /**
@@ -136,13 +221,25 @@ public class DistributedSemaphore {
    * @param count number of permits to acquire
    */
   private void updateAcquirePermit(int count) {
-    throw new NotImplementedException("Not implemented yet.");
+    _metaClient.update(_path, record -> {
+      long permitsAvailable = record.getLongField(REMAINING_CAPACITY_NAME, 
DEFAULT_REMAINING_CAPACITY);
+      if (permitsAvailable < count) {
+        throw new MetaClientException("No sufficient permits available. 
Attempt to acquire " + count + " permits, but only "
+            + permitsAvailable + " permits available");
+      }
+      record.setLongField(REMAINING_CAPACITY_NAME, permitsAvailable - count);
+      return record;
+    });
   }
 
   /**
    * Update the remaining capacity of the semaphore after returning a permit.
    */
   private void updateReturnPermit() {
-    throw new NotImplementedException("Not implemented yet.");
+    _metaClient.update(_path, record -> {
+      long permitsAvailable = record.getLongField(REMAINING_CAPACITY_NAME, 
DEFAULT_REMAINING_CAPACITY);
+      record.setLongField(REMAINING_CAPACITY_NAME, permitsAvailable + 1);
+      return record;
+    });
   }
 }
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java
index cd5c3c297..8a9c489e3 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java
@@ -64,8 +64,12 @@ public class LockClient implements LockClientInterface, 
AutoCloseable {
       throw new IllegalArgumentException("MetaClient cannot be null.");
     }
     _metaClient = client;
-    LOG.info("Connecting to existing MetaClient for LockClient");
-    _metaClient.connect();
+    try {
+      LOG.info("Connecting to existing MetaClient for LockClient");
+      _metaClient.connect();
+    } catch (IllegalStateException e) {
+      // Ignore as it either has already been connected or already been closed.
+    }
   }
 
   @Override
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphoreTest.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphoreTest.java
new file mode 100644
index 000000000..db874cd33
--- /dev/null
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphoreTest.java
@@ -0,0 +1,105 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collection;
+
+public class DistributedSemaphoreTest extends ZkMetaClientTestBase {
+
+  public DistributedSemaphore createSemaphoreClientAndSemaphore(String path, 
int capacity) {
+
+    MetaClientConfig.StoreType storeType = 
MetaClientConfig.StoreType.ZOOKEEPER;
+    MetaClientConfig config = new 
MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR)
+        .setStoreType(storeType).build();
+    DistributedSemaphore client = new DistributedSemaphore(config);
+    client.createSemaphore(path, capacity);
+    return client;
+  }
+
+  @Test
+  public void testAcquirePermit() {
+    final String key = "/TestSemaphore_testAcquirePermit";
+    int capacity = 5;
+    DistributedSemaphore semaphoreClient = 
createSemaphoreClientAndSemaphore(key, capacity);
+    Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+
+    Permit permit = semaphoreClient.acquire();
+    Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity - 1);
+  }
+
+  @Test
+  public void testAcquireMultiplePermits() {
+    final String key = "/TestSemaphore_testAcquireMultiplePermits";
+    int capacity = 5;
+    int count = 4;
+    DistributedSemaphore semaphoreClient = 
createSemaphoreClientAndSemaphore(key, capacity);
+    Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+
+    Collection<Permit> permits = semaphoreClient.acquire(count);
+    Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity - 4);
+    Assert.assertEquals(permits.size(), count);
+    Assert.assertNull(semaphoreClient.acquire(count));
+  }
+
+  @Test
+  public void testReturnPermit() {
+    final String key = "/TestSemaphore_testReturnPermit";
+    int capacity = 5;
+    DistributedSemaphore semaphoreClient = 
createSemaphoreClientAndSemaphore(key, capacity);
+    Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+
+    Permit permit = semaphoreClient.acquire();
+    Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity - 1);
+
+    semaphoreClient.returnPermit(permit);
+    Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+
+    // return the same permit again. Should not fail but capacity remains same.
+    semaphoreClient.returnPermit(permit);
+    Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+  }
+
+  @Test
+  public void testReturnMultiplePermits() {
+    final String key = "/TestSemaphore_testReturnMultiplePermits";
+    int capacity = 5;
+    int count = 4;
+    DistributedSemaphore semaphoreClient = 
createSemaphoreClientAndSemaphore(key, capacity);
+    Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+
+    Collection<Permit> permits = semaphoreClient.acquire(count);
+    Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity - 4);
+    Assert.assertEquals(permits.size(), count);
+
+    semaphoreClient.returnAllPermits(permits);
+    Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+  }
+
+  @Test
+  public void testTryAcquirePermit() {
+    // Not implemented
+  }
+
+}

Reply via email to