This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new a2737c620 Lattice MetaClient Distributed Semaphore Implementation
(#2515)
a2737c620 is described below
commit a2737c6200a1cb9a6cbaa157118d163805de0674
Author: Marcos Rico Peng <[email protected]>
AuthorDate: Fri Jun 23 19:12:23 2023 -0400
Lattice MetaClient Distributed Semaphore Implementation (#2515)
Co-authored-by: mapeng <[email protected]>
---
.../recipes/lock/DistributedSemaphore.java | 123 ++++++++++++++++++---
.../helix/metaclient/recipes/lock/LockClient.java | 8 +-
.../recipes/lock/DistributedSemaphoreTest.java | 105 ++++++++++++++++++
3 files changed, 221 insertions(+), 15 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java
index 0cccb130f..7b16e78fd 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java
@@ -22,18 +22,45 @@ package org.apache.helix.metaclient.recipes.lock;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
public class DistributedSemaphore {
+ private final MetaClientInterface<DataRecord> _metaClient;
+ private String _path;
+ private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+ private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+ private static final long DEFAULT_REMAINING_CAPACITY = -1;
+ private static final Logger LOG =
LoggerFactory.getLogger(DistributedSemaphore.class);
/**
* Create a distributed semaphore client with the given configuration.
* @param config configuration of the client
*/
public DistributedSemaphore(MetaClientConfig config) {
- throw new NotImplementedException("Not implemented yet.");
+ if (config == null) {
+ throw new MetaClientException("Configuration cannot be null");
+ }
+ LOG.info("Creating DistributedSemaphore Client");
+ if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+ ZkMetaClientConfig zkMetaClientConfig = new
ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+ .setConnectionAddress(config.getConnectionAddress())
+ .setZkSerializer(new DataRecordSerializer()) // Currently only
support ZNRecordSerializer.
+ // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+ .build();
+ _metaClient = new
ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+ _metaClient.connect();
+ } else {
+ throw new MetaClientException("Unsupported store type: " +
config.getStoreType());
+ }
}
/**
@@ -41,7 +68,17 @@ public class DistributedSemaphore {
* @param client client to connect to
*/
public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
- throw new NotImplementedException("Not implemented yet.");
+ if (client == null) {
+ throw new MetaClientException("Client cannot be null");
+ }
+ LOG.info("Connecting to existing DistributedSemaphore Client");
+ _metaClient = client;
+ try {
+ _metaClient.connect();
+ // TODO: Differentiate exception catch between already connected and
already closed.
+ } catch (IllegalStateException e) {
+ // Ignore as it either has already been connected or already been closed.
+ }
}
/**
@@ -50,7 +87,22 @@ public class DistributedSemaphore {
* @param capacity capacity of the semaphore
*/
public void createSemaphore(String path, int capacity) {
- throw new NotImplementedException("Not implemented yet.");
+ if (capacity <= 0) {
+ throw new MetaClientException("Capacity must be positive");
+ }
+ if (path == null || path.isEmpty()) {
+ throw new MetaClientException("Invalid path to create semaphore");
+ }
+ if (_metaClient.exists(path) != null) {
+ throw new MetaClientException("Semaphore already exists");
+ }
+ if (_metaClient.exists(path) == null) {
+ DataRecord dataRecord = new DataRecord(path);
+ dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity);
+ dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity);
+ _metaClient.create(path, dataRecord);
+ _path = path;
+ }
}
/**
@@ -58,7 +110,13 @@ public class DistributedSemaphore {
* @param path path of the semaphore
*/
public void connectSemaphore(String path) {
- throw new NotImplementedException("Not implemented yet.");
+ if (path == null || path.isEmpty()) {
+ throw new MetaClientException("Invalid path to connect semaphore");
+ }
+ if (_metaClient.exists(path) == null) {
+ throw new MetaClientException("Semaphore does not exist");
+ }
+ _path = path;
}
/**
@@ -66,7 +124,13 @@ public class DistributedSemaphore {
* @return a permit
*/
public Permit acquire() {
- throw new NotImplementedException("Not implemented yet.");
+ try {
+ updateAcquirePermit(1);
+ return retrievePermit(_path);
+ } catch (MetaClientException e) {
+ LOG.error("Failed to acquire permit.", e);
+ return null;
+ }
}
@@ -76,7 +140,17 @@ public class DistributedSemaphore {
* @return a collection of permits
*/
public Collection<Permit> acquire(int count) {
- throw new NotImplementedException("Not implemented yet.");
+ try {
+ updateAcquirePermit(count);
+ Collection<Permit> permits = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ permits.add(retrievePermit(_path));
+ }
+ return permits;
+ } catch (MetaClientException e) {
+ LOG.error("Failed to acquire permits.", e);
+ return null;
+ }
}
/**
@@ -96,7 +170,7 @@ public class DistributedSemaphore {
* @return remaining capacity
*/
public long getRemainingCapacity() {
- throw new NotImplementedException("Not implemented yet.");
+ return getSemaphore().getLongField(REMAINING_CAPACITY_NAME,
DEFAULT_REMAINING_CAPACITY);
}
/**
@@ -104,14 +178,22 @@ public class DistributedSemaphore {
* @return semaphore data record
*/
private DataRecord getSemaphore() {
- throw new NotImplementedException("Not implemented yet.");
+ if (_metaClient.exists(_path) == null) {
+ throw new MetaClientException("Semaphore does not exist at path: " +
_path + ". Please create it first.");
+ }
+ return new DataRecord(_metaClient.get(_path));
}
/**
* Return a permit. If the permit is already returned, log and return void.
*/
public void returnPermit(Permit permit) {
- throw new NotImplementedException("Not implemented yet.");
+ if (permit.isReleased()) {
+ LOG.info("The permit has already been released");
+ } else {
+ updateReturnPermit();
+ permit.releasePermit();
+ }
}
/**
@@ -119,7 +201,9 @@ public class DistributedSemaphore {
* log and return void.
*/
public void returnAllPermits(Collection<Permit> permits) {
- throw new NotImplementedException("Not implemented yet.");
+ for (Permit permit : permits) {
+ returnPermit(permit);
+ }
}
/**
@@ -128,7 +212,8 @@ public class DistributedSemaphore {
* @return a permit
*/
private Permit retrievePermit(String path) {
- throw new NotImplementedException("Not implemented yet.");
+ MetaClientInterface.Stat stat = _metaClient.exists(path);
+ return new Permit(getSemaphore(), stat);
}
/**
@@ -136,13 +221,25 @@ public class DistributedSemaphore {
* @param count number of permits to acquire
*/
private void updateAcquirePermit(int count) {
- throw new NotImplementedException("Not implemented yet.");
+ _metaClient.update(_path, record -> {
+ long permitsAvailable = record.getLongField(REMAINING_CAPACITY_NAME,
DEFAULT_REMAINING_CAPACITY);
+ if (permitsAvailable < count) {
+ throw new MetaClientException("No sufficient permits available.
Attempt to acquire " + count + " permits, but only "
+ + permitsAvailable + " permits available");
+ }
+ record.setLongField(REMAINING_CAPACITY_NAME, permitsAvailable - count);
+ return record;
+ });
}
/**
* Update the remaining capacity of the semaphore after returning a permit.
*/
private void updateReturnPermit() {
- throw new NotImplementedException("Not implemented yet.");
+ _metaClient.update(_path, record -> {
+ long permitsAvailable = record.getLongField(REMAINING_CAPACITY_NAME,
DEFAULT_REMAINING_CAPACITY);
+ record.setLongField(REMAINING_CAPACITY_NAME, permitsAvailable + 1);
+ return record;
+ });
}
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java
index cd5c3c297..8a9c489e3 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java
@@ -64,8 +64,12 @@ public class LockClient implements LockClientInterface,
AutoCloseable {
throw new IllegalArgumentException("MetaClient cannot be null.");
}
_metaClient = client;
- LOG.info("Connecting to existing MetaClient for LockClient");
- _metaClient.connect();
+ try {
+ LOG.info("Connecting to existing MetaClient for LockClient");
+ _metaClient.connect();
+ } catch (IllegalStateException e) {
+ // Ignore as it either has already been connected or already been closed.
+ }
}
@Override
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphoreTest.java
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphoreTest.java
new file mode 100644
index 000000000..db874cd33
--- /dev/null
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphoreTest.java
@@ -0,0 +1,105 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collection;
+
+public class DistributedSemaphoreTest extends ZkMetaClientTestBase {
+
+ public DistributedSemaphore createSemaphoreClientAndSemaphore(String path,
int capacity) {
+
+ MetaClientConfig.StoreType storeType =
MetaClientConfig.StoreType.ZOOKEEPER;
+ MetaClientConfig config = new
MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR)
+ .setStoreType(storeType).build();
+ DistributedSemaphore client = new DistributedSemaphore(config);
+ client.createSemaphore(path, capacity);
+ return client;
+ }
+
+ @Test
+ public void testAcquirePermit() {
+ final String key = "/TestSemaphore_testAcquirePermit";
+ int capacity = 5;
+ DistributedSemaphore semaphoreClient =
createSemaphoreClientAndSemaphore(key, capacity);
+ Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+
+ Permit permit = semaphoreClient.acquire();
+ Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity - 1);
+ }
+
+ @Test
+ public void testAcquireMultiplePermits() {
+ final String key = "/TestSemaphore_testAcquireMultiplePermits";
+ int capacity = 5;
+ int count = 4;
+ DistributedSemaphore semaphoreClient =
createSemaphoreClientAndSemaphore(key, capacity);
+ Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+
+ Collection<Permit> permits = semaphoreClient.acquire(count);
+ Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity - 4);
+ Assert.assertEquals(permits.size(), count);
+ Assert.assertNull(semaphoreClient.acquire(count));
+ }
+
+ @Test
+ public void testReturnPermit() {
+ final String key = "/TestSemaphore_testReturnPermit";
+ int capacity = 5;
+ DistributedSemaphore semaphoreClient =
createSemaphoreClientAndSemaphore(key, capacity);
+ Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+
+ Permit permit = semaphoreClient.acquire();
+ Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity - 1);
+
+ semaphoreClient.returnPermit(permit);
+ Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+
+ // return the same permit again. Should not fail but capacity remains same.
+ semaphoreClient.returnPermit(permit);
+ Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+ }
+
+ @Test
+ public void testReturnMultiplePermits() {
+ final String key = "/TestSemaphore_testReturnMultiplePermits";
+ int capacity = 5;
+ int count = 4;
+ DistributedSemaphore semaphoreClient =
createSemaphoreClientAndSemaphore(key, capacity);
+ Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+
+ Collection<Permit> permits = semaphoreClient.acquire(count);
+ Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity - 4);
+ Assert.assertEquals(permits.size(), count);
+
+ semaphoreClient.returnAllPermits(permits);
+ Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity);
+ }
+
+ @Test
+ public void testTryAcquirePermit() {
+ // Not implemented
+ }
+
+}