This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3111f490b94 Refactor CuratorZookeeperDistributedLock (#21852)
3111f490b94 is described below
commit 3111f490b94f8c0d40bf6afa4179fc4e47b0f069
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Oct 30 19:27:41 2022 +0800
Refactor CuratorZookeeperDistributedLock (#21852)
* Refactor CuratorZookeeperDistributedLock
* Refactor CuratorZookeeperDistributedLock
---
.../cluster/consul/lock/ConsulDistributedLock.java | 6 +++---
.../cluster/etcd/lock/EtcdDistributedLock.java | 24 ++++++++++++++--------
.../lock/CuratorZookeeperDistributedLock.java | 8 ++++++--
.../CuratorZookeeperDistributedLockHolder.java | 3 +--
.../zookeeper/CuratorZookeeperRepositoryTest.java | 7 ++-----
5 files changed, 28 insertions(+), 20 deletions(-)
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
index 90d9bf54a08..eb6204d6193 100644
---
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
+++
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
@@ -58,7 +58,7 @@ public final class ConsulDistributedLock implements
DistributedLock {
private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR =
new ScheduledThreadPoolExecutor(2);
- private final String lockName;
+ private final String lockKey;
private final ConsulClient client;
@@ -74,7 +74,7 @@ public final class ConsulDistributedLock implements
DistributedLock {
try {
long lockTime = timeoutMillis;
PutParams putParams = new PutParams();
- String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR +
lockName;
+ String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR +
lockKey;
while (true) {
String sessionId = createSession(lockPath);
putParams.setAcquireSession(sessionId);
@@ -181,7 +181,7 @@ public final class ConsulDistributedLock implements
DistributedLock {
PutParams putParams = new PutParams();
String sessionId = lockSessionMap.get();
putParams.setReleaseSession(sessionId);
- String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR +
lockName;
+ String lockPath = CONSUL_ROOT_PATH + CONSUL_PATH_SEPARATOR +
lockKey;
client.setKVValue(lockPath, DEFAULT_CONSUL_UNLOCK_VALUE,
putParams);
client.sessionDestroy(sessionId, null);
// CHECKSTYLE:OFF
diff --git
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/lock/EtcdDistributedLock.java
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/lock/EtcdDistributedLock.java
index 2923d56f675..e6252574c37 100644
---
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/lock/EtcdDistributedLock.java
+++
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/lock/EtcdDistributedLock.java
@@ -19,7 +19,8 @@ package
org.apache.shardingsphere.mode.repository.cluster.etcd.lock;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
-import lombok.RequiredArgsConstructor;
+import io.etcd.jetcd.Lease;
+import io.etcd.jetcd.Lock;
import
org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
import
org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdPropertyKey;
import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
@@ -30,20 +31,27 @@ import java.util.concurrent.TimeUnit;
/**
* Etcd distributed lock.
*/
-@RequiredArgsConstructor
public final class EtcdDistributedLock implements DistributedLock {
- private final String lockKey;
+ private final ByteSequence lockKey;
- private final Client client;
+ private final Lock lock;
- private final EtcdProperties props;
+ private final Lease lease;
+
+ private final int timeToLiveSeconds;
+
+ public EtcdDistributedLock(final String lockKey, final Client client,
final EtcdProperties props) {
+ this.lockKey = ByteSequence.from(lockKey, StandardCharsets.UTF_8);
+ lock = client.getLockClient();
+ lease = client.getLeaseClient();
+ timeToLiveSeconds =
props.getValue(EtcdPropertyKey.TIME_TO_LIVE_SECONDS);
+ }
@Override
public boolean tryLock(final long timeoutMillis) {
try {
- long leaseId =
client.getLeaseClient().grant(props.getValue(EtcdPropertyKey.TIME_TO_LIVE_SECONDS)).get().getID();
- client.getLockClient().lock(ByteSequence.from(lockKey,
StandardCharsets.UTF_8), leaseId).get(timeoutMillis, TimeUnit.MILLISECONDS);
+ lock.lock(lockKey,
lease.grant(timeToLiveSeconds).get().getID()).get(timeoutMillis,
TimeUnit.MILLISECONDS);
return true;
// CHECKSTYLE:OFF
} catch (final Exception ignored) {
@@ -55,7 +63,7 @@ public final class EtcdDistributedLock implements
DistributedLock {
@Override
public void unlock() {
try {
- client.getLockClient().unlock(ByteSequence.from(lockKey,
StandardCharsets.UTF_8)).get();
+ lock.unlock(lockKey).get();
// CHECKSTYLE:OFF
} catch (final Exception ignored) {
// CHECKSTYLE:ON
diff --git
a/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/lock/CuratorZookeeperDistributedLock.java
b/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/lock/CuratorZookeeperDistributedLock.java
index 4702ee24ebc..346d5a0e9d0 100644
---
a/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/lock/CuratorZookeeperDistributedLock.java
+++
b/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/lock/CuratorZookeeperDistributedLock.java
@@ -17,8 +17,9 @@
package org.apache.shardingsphere.mode.repository.cluster.zookeeper.lock;
-import lombok.RequiredArgsConstructor;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.CuratorZookeeperExceptionHandler;
@@ -27,11 +28,14 @@ import java.util.concurrent.TimeUnit;
/**
* Curator ZooKeeper distributed lock.
*/
-@RequiredArgsConstructor
public final class CuratorZookeeperDistributedLock implements DistributedLock {
private final InterProcessLock lock;
+ public CuratorZookeeperDistributedLock(final String lockKey, final
CuratorFramework client) {
+ lock = new InterProcessMutex(client, lockKey);
+ }
+
@Override
public boolean tryLock(final long timeoutMillis) {
try {
diff --git
a/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/lock/CuratorZookeeperDistributedLockHolder.java
b/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/lock/CuratorZookeeperDistributedLockHolder.java
index 91b4184f767..eec2b19f31f 100644
---
a/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/lock/CuratorZookeeperDistributedLockHolder.java
+++
b/mode/type/cluster/repository/provider/zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/lock/CuratorZookeeperDistributedLockHolder.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.mode.repository.cluster.zookeeper.lock;
import lombok.RequiredArgsConstructor;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
import
org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLockHolder;
@@ -40,7 +39,7 @@ public final class CuratorZookeeperDistributedLockHolder
implements DistributedL
public synchronized DistributedLock getDistributedLock(final String
lockKey) {
CuratorZookeeperDistributedLock result = locks.get(lockKey);
if (null == result) {
- result = new CuratorZookeeperDistributedLock(new
InterProcessMutex(client, lockKey));
+ result = new CuratorZookeeperDistributedLock(lockKey, client);
locks.put(lockKey, result);
}
return result;
diff --git
a/mode/type/cluster/repository/provider/zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
b/mode/type/cluster/repository/provider/zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
index 7f4a65cd7e5..91952526cec 100644
---
a/mode/type/cluster/repository/provider/zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
+++
b/mode/type/cluster/repository/provider/zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
@@ -34,7 +34,6 @@ import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
-import org.apache.curator.framework.recipes.locks.InterProcessLock;
import
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -70,6 +69,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -114,9 +114,6 @@ public final class CuratorZookeeperRepositoryTest {
@Mock
private Builder builder;
- @Mock
- private InterProcessLock interProcessLock;
-
@Before
public void init() {
mockClient();
@@ -149,7 +146,7 @@ public final class CuratorZookeeperRepositoryTest {
CuratorZookeeperDistributedLockHolder distributedLockHolder = new
CuratorZookeeperDistributedLockHolder(client);
Field locksFiled =
CuratorZookeeperDistributedLockHolder.class.getDeclaredField("locks");
locksFiled.setAccessible(true);
- locksFiled.set(distributedLockHolder,
Collections.singletonMap("/locks/glock", new
CuratorZookeeperDistributedLock(interProcessLock)));
+ locksFiled.set(distributedLockHolder,
Collections.singletonMap("/locks/glock",
mock(CuratorZookeeperDistributedLock.class)));
distributedLockHolderField.set(REPOSITORY, distributedLockHolder);
}