This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new daa1f82e672 Move mode consul to ShardingSphere plugin repository
(#30288)
daa1f82e672 is described below
commit daa1f82e672da6f74e1c10f6ea644e36b707fa9e
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Mon Feb 26 12:40:05 2024 +0800
Move mode consul to ShardingSphere plugin repository (#30288)
---
docs/document/content/dev-manual/mode.cn.md | 1 -
docs/document/content/dev-manual/mode.en.md | 1 -
.../builtin-algorithm/metadata-repository.cn.md | 23 --
.../builtin-algorithm/metadata-repository.en.md | 23 --
.../optional-plugins/_index.cn.md | 1 -
.../optional-plugins/_index.en.md | 1 -
.../optional-plugins/_index.cn.md | 2 -
.../optional-plugins/_index.en.md | 2 -
.../cluster/repository/provider/consul/pom.xml | 78 -------
.../cluster/consul/ConsulRepository.java | 254 ---------------------
.../cluster/consul/ShardingSphereConsulClient.java | 36 ---
.../cluster/consul/ShardingSphereQueryParams.java | 48 ----
.../cluster/consul/lock/ConsulDistributedLock.java | 166 --------------
.../consul/lock/ConsulDistributedLockCreator.java | 39 ----
.../cluster/consul/props/ConsulProperties.java | 31 ---
.../cluster/consul/props/ConsulPropertyKey.java | 46 ----
...ode.repository.cluster.ClusterPersistRepository | 18 --
...ory.cluster.lock.creator.DistributedLockCreator | 18 --
.../cluster/consul/ConsulRepositoryTest.java | 237 -------------------
.../cluster/consul/props/ConsulPropertiesTest.java | 41 ----
mode/type/cluster/repository/provider/pom.xml | 1 -
pom.xml | 2 -
22 files changed, 1069 deletions(-)
diff --git a/docs/document/content/dev-manual/mode.cn.md
b/docs/document/content/dev-manual/mode.cn.md
index edcda0e8cba..bed02f8f536 100644
--- a/docs/document/content/dev-manual/mode.cn.md
+++ b/docs/document/content/dev-manual/mode.cn.md
@@ -37,4 +37,3 @@ chapter = true
|-----------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| ZooKeeper | 基于 ZooKeeper 的持久化 |
[`org.apache.shardingsphere.mode.repository.cluster.zookeeper.ZookeeperRepository`](https://github.com/apache/shardingsphere/blob/master/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java)
|
| etcd | 基于 Etcd 的持久化 |
[`org.apache.shardingsphere.mode.repository.cluster.etcd.EtcdRepository`](https://github.com/apache/shardingsphere/blob/master/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java)
|
-| Consul | 基于 Consul 的持久化 |
[`org.apache.shardingsphere.mode.repository.cluster.consul.ConsulRepository`](https://github.com/apache/shardingsphere/blob/master/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java)
|
diff --git a/docs/document/content/dev-manual/mode.en.md
b/docs/document/content/dev-manual/mode.en.md
index 0fab555b840..69ac17f2e86 100644
--- a/docs/document/content/dev-manual/mode.en.md
+++ b/docs/document/content/dev-manual/mode.en.md
@@ -37,4 +37,3 @@ Cluster mode configuration information persistence definition
|----------------------|-----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| ZooKeeper | ZooKeeper based persistence |
[`org.apache.shardingsphere.mode.repository.cluster.zookeeper.ZookeeperRepository`](https://github.com/apache/shardingsphere/blob/master/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java)
|
| etcd | Etcd based persistence |
[`org.apache.shardingsphere.mode.repository.cluster.etcd.EtcdRepository`](https://github.com/apache/shardingsphere/blob/master/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java)
|
-| Consul | Consul based persistence |
[`org.apache.shardingsphere.mode.repository.cluster.consul.ConsulRepository`](https://github.com/apache/shardingsphere/blob/master/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java)
|
diff --git
a/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.cn.md
b/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.cn.md
index dd30ff479d2..d3880773678 100644
---
a/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.cn.md
+++
b/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.cn.md
@@ -54,29 +54,6 @@ Apache ShardingSphere 为不同的运行模式提供了不同的元数据持久
| timeToLiveSeconds | long | 临时数据失效的秒数 | 30 |
| connectionTimeout | long | 连接超时秒数 | 30 |
-### Consul 持久化
-
-受 `com.ecwid.consul:consul-api:1.4.5` 的 Maven 模块的限制,使用者无法通过 gRPC 端口来连接到
Consul Agent。
-
-`Consul` 实现的 `serverLists` 属性受设计使然,仅可通过 HTTP 端点连接到单个 Consul Agent。
-`serverLists` 使用了宽松的 URL 匹配原则。
-1. 当 `serverLists` 为空时,将解析到 `http://127.0.0.1:8500` 的 Consul Agent 实例。
-2. 当 `serverLists` 为 `hostName` 时,将解析到 `http://hostName:8500` 的 Consul Agent
实例。
-3. 当 `serverLists` 为 `hostName:port` 时,将解析到 `http://hostName:port` 的 Consul
Agent 实例。
-4. 当 `serverLists` 为 `http://hostName:port` 时,将解析到 `http://hostName:port` 的
Consul Agent 实例。
-5. 当 `serverLists` 为 `https://hostName:port` 时,将解析到 `https://hostName:port` 的
Consul Agent 实例。
-
-类型:Consul
-
-适用模式:Cluster
-
-可配置属性:
-
-| *名称* | *数据类型* | *说明* | *默认值* |
-|-------------------------|--------|-----------|-------|
-| timeToLiveSeconds | String | 临时实例失效的秒数 | 30s |
-| blockQueryTimeToSeconds | long | 查询请求超时秒数 | 60 |
-
## 操作步骤
1. 在 global.yaml 中配置 Mode 运行模式
diff --git
a/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.en.md
b/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.en.md
index 3ab17de54bd..4d7d56aa092 100644
---
a/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.en.md
+++
b/docs/document/content/user-manual/common-config/builtin-algorithm/metadata-repository.en.md
@@ -54,29 +54,6 @@ Attributes:
| timeToLiveSeconds | long | Seconds of ephemeral data live | 30
|
| connectionTimeout | long | Seconds of connection timeout | 30
|
-### Consul Repository
-
-Due to the limitation of the Maven module of
`com.ecwid.consul:consul-api:1.4.5`, users cannot connect to the Consul Agent
through the gRPC port.
-
-The `serverLists` property of the `Consul` implementation is by design and can
only be connected to a single Consul Agent via an HTTP endpoint.
-`serverLists` uses relaxed URL matching principles.
-1. When `serverLists` is empty, it will be resolved to the Consul Agent
instance of `http://127.0.0.1:8500`.
-2. When `serverLists` is `hostName`, it will be resolved to the Consul Agent
instance of `http://hostName:8500`.
-3. When `serverLists` is `hostName:port`, it will be resolved to the Consul
Agent instance of `http://hostName:port`.
-4. When `serverLists` is `http://hostName:port`, it will be resolved to the
Consul Agent instance of `http://hostName:port`.
-5. When `serverLists` is `https://hostName:port`, it will be resolved to the
Consul Agent instance of `https://hostName:port`.
-
-Type: Consul
-
-Mode: Cluster
-
-Attributes:
-
-| *Name* | *Type* | *Description* |
*Default Value* |
-|-------------------------|--------|------------------------------------|-----------------|
-| timeToLiveSeconds | String | Seconds of ephemeral instance live | 30s
|
-| blockQueryTimeToSeconds | long | Seconds of query timeout | 60
|
-
## Procedure
1. Configure running mode in global.yaml.
diff --git
a/docs/document/content/user-manual/shardingsphere-jdbc/optional-plugins/_index.cn.md
b/docs/document/content/user-manual/shardingsphere-jdbc/optional-plugins/_index.cn.md
index a884f47178f..e75561542eb 100644
---
a/docs/document/content/user-manual/shardingsphere-jdbc/optional-plugins/_index.cn.md
+++
b/docs/document/content/user-manual/shardingsphere-jdbc/optional-plugins/_index.cn.md
@@ -45,7 +45,6 @@ ShardingSphere 默认情况下仅包含核心 SPI 的实现,在 Git Source 存
- 集群模式配置信息持久化定义
-
`org.apache.shardingsphere:shardingsphere-cluster-mode-repository-zookeeper`,基于
Zookeeper 的持久化实现
- `org.apache.shardingsphere:shardingsphere-cluster-mode-repository-etcd`,基于
Etcd 的持久化实现
- -
`org.apache.shardingsphere:shardingsphere-cluster-mode-repository-consul`,基于
Consul 的持久化实现
- XA 分布式事务管理器
- `org.apache.shardingsphere:shardingsphere-transaction-xa-narayana`,基于
Narayana 的 XA 分布式事务管理器
- SQL 翻译
diff --git
a/docs/document/content/user-manual/shardingsphere-jdbc/optional-plugins/_index.en.md
b/docs/document/content/user-manual/shardingsphere-jdbc/optional-plugins/_index.en.md
index a8dc0adf92d..77d3b98fd40 100644
---
a/docs/document/content/user-manual/shardingsphere-jdbc/optional-plugins/_index.en.md
+++
b/docs/document/content/user-manual/shardingsphere-jdbc/optional-plugins/_index.en.md
@@ -45,7 +45,6 @@ All optional plugins are listed below in the form of
`groupId:artifactId`.
- Cluster mode configuration information persistence definition
-
`org.apache.shardingsphere:shardingsphere-cluster-mode-repository-zookeeper`,
Zookeeper based persistence
- `org.apache.shardingsphere:shardingsphere-cluster-mode-repository-etcd`,
Etcd based persistence
- - `org.apache.shardingsphere:shardingsphere-cluster-mode-repository-consul`,
Consul based persistence
- XA transaction manager provider definition
- `org.apache.shardingsphere:shardingsphere-transaction-xa-narayana`, XA
distributed transaction manager based on Narayana
- SQL translator
diff --git
a/docs/document/content/user-manual/shardingsphere-proxy/optional-plugins/_index.cn.md
b/docs/document/content/user-manual/shardingsphere-proxy/optional-plugins/_index.cn.md
index 6101808612d..cc20769825b 100644
---
a/docs/document/content/user-manual/shardingsphere-proxy/optional-plugins/_index.cn.md
+++
b/docs/document/content/user-manual/shardingsphere-proxy/optional-plugins/_index.cn.md
@@ -32,8 +32,6 @@ ShardingSphere 默认情况下仅包含核心 SPI 的实现,在 Git Source 存
- 单机模式配置信息持久化定义
-
`org.apache.shardingsphere:shardingsphere-standalone-mode-repository-jdbc`,基于
JDBC 的持久化
-- 集群模式配置信息持久化定义
- -
`org.apache.shardingsphere:shardingsphere-cluster-mode-repository-consul`,基于
Consul 的持久化实现
- XA 分布式事务管理器
- `org.apache.shardingsphere:shardingsphere-transaction-xa-narayana`,基于
Narayana 的 XA 分布式事务管理器
- SQL 翻译
diff --git
a/docs/document/content/user-manual/shardingsphere-proxy/optional-plugins/_index.en.md
b/docs/document/content/user-manual/shardingsphere-proxy/optional-plugins/_index.en.md
index 01b17c1c166..992cdf05eb8 100644
---
a/docs/document/content/user-manual/shardingsphere-proxy/optional-plugins/_index.en.md
+++
b/docs/document/content/user-manual/shardingsphere-proxy/optional-plugins/_index.en.md
@@ -32,8 +32,6 @@ All optional plugins are listed below in the form of
`groupId:artifactId`.
- Standalone mode configuration information persistence definition
-
`org.apache.shardingsphere:shardingsphere-standalone-mode-repository-jdbc`,
JDBC based persistence
-- Cluster mode configuration information persistence definition
- - `org.apache.shardingsphere:shardingsphere-cluster-mode-repository-consul`,
Consul based persistence
- XA transaction manager provider definition
- `org.apache.shardingsphere:shardingsphere-transaction-xa-narayana`, XA
distributed transaction manager based on Narayana
- SQL translator
diff --git a/mode/type/cluster/repository/provider/consul/pom.xml
b/mode/type/cluster/repository/provider/consul/pom.xml
deleted file mode 100644
index 641b2d2162c..00000000000
--- a/mode/type/cluster/repository/provider/consul/pom.xml
+++ /dev/null
@@ -1,78 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.shardingsphere</groupId>
-
<artifactId>shardingsphere-cluster-mode-repository-provider</artifactId>
- <version>5.4.2-SNAPSHOT</version>
- </parent>
- <artifactId>shardingsphere-cluster-mode-repository-consul</artifactId>
- <name>${project.artifactId}</name>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>com.ecwid.consul</groupId>
- <artifactId>consul-api</artifactId>
- <version>${consul.api.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>${httpclient.version}</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere-cluster-mode-repository-api</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere-test-util</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.ecwid.consul</groupId>
- <artifactId>consul-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.awaitility</groupId>
- <artifactId>awaitility</artifactId>
- </dependency>
- </dependencies>
-</project>
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
deleted file mode 100644
index d87f4d3903d..00000000000
---
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.repository.cluster.consul;
-
-import com.ecwid.consul.transport.HttpResponse;
-import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.ConsulRawClient;
-import com.ecwid.consul.v1.QueryParams;
-import com.ecwid.consul.v1.Response;
-import com.ecwid.consul.v1.kv.model.GetValue;
-import com.ecwid.consul.v1.kv.model.PutParams;
-import com.ecwid.consul.v1.session.model.NewSession;
-import com.ecwid.consul.v1.session.model.Session;
-import com.google.common.base.Strings;
-import lombok.Getter;
-import org.apache.http.HttpStatus;
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
-import
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
-import
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
-import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
-import
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Registry repository of Consul.
- */
-public final class ConsulRepository implements ClusterPersistRepository {
-
- private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR =
new ScheduledThreadPoolExecutor(2);
-
- private ShardingSphereConsulClient consulClient;
-
- private ConsulProperties consulProps;
-
- @Getter
- private DistributedLockHolder distributedLockHolder;
-
- private Map<String, Collection<String>> watchKeyMap;
-
- @Override
- public void init(final ClusterPersistRepositoryConfiguration config) {
- consulProps = new ConsulProperties(config.getProps());
- ConsulRawClient rawClient =
createConsulRawClient(config.getServerLists());
- consulClient = new ShardingSphereConsulClient(rawClient);
- distributedLockHolder = new DistributedLockHolder(getType(),
consulClient, consulProps);
- watchKeyMap = new HashMap<>(6, 1F);
- }
-
- @Override
- public String getDirectly(final String key) {
- Response<GetValue> response = consulClient.getKVValue(key);
- if (null == response) {
- return null;
- }
- GetValue value = response.getValue();
- return null == value ? null : value.getValue();
- }
-
- @Override
- public List<String> getChildrenKeys(final String key) {
- Response<List<String>> response = consulClient.getKVKeysOnly(key);
- if (null == response) {
- return Collections.emptyList();
- }
- List<String> value = response.getValue();
- return null == value ? Collections.emptyList() : value;
- }
-
- @Override
- public boolean isExisted(final String key) {
- return null != consulClient.getKVValue(key).getValue();
- }
-
- @Override
- public void persist(final String key, final String value) {
- consulClient.setKVValue(key, value);
- }
-
- @Override
- public void update(final String key, final String value) {
- consulClient.setKVValue(key, value);
- }
-
- @Override
- public void delete(final String key) {
- consulClient.deleteKVValue(key);
- }
-
- /**
- * {@link ConsulRawClient} is a wrapper of blocking HTTP client and does
not have a close method.
- * Using such a Client does not necessarily conform to the implementation
of the relevant SPI. ShardingSphere needs to
- * consider solutions similar to <a
href="https://github.com/spring-cloud/spring-cloud-consul/issues/475">spring-cloud/spring-cloud-consul#475</a>.
- *
- * @see ConsulRawClient
- */
- @Override
- public void close() {
- }
-
- @Override
- public void persistEphemeral(final String key, final String value) {
- Response<String> response =
consulClient.sessionCreate(createNewSession(key), QueryParams.DEFAULT);
- String sessionId = response.getValue();
- PutParams putParams = new PutParams();
- putParams.setAcquireSession(sessionId);
- consulClient.setKVValue(key, value, putParams);
- generatorFlushSessionTtlTask(consulClient, sessionId);
- verifyConsulAgentRunning();
- }
-
- @SuppressWarnings("HttpUrlsUsage")
- private ConsulRawClient createConsulRawClient(final String serverLists) {
- if (Strings.isNullOrEmpty(serverLists)) {
- return new ConsulRawClient();
- }
- URL serverUrl;
- try {
- serverUrl = new URL(!serverLists.startsWith("https://") &&
!serverLists.startsWith("http://") ? "http://" + serverLists : serverLists);
- } catch (MalformedURLException e) {
- throw new RuntimeException(e);
- }
- if (-1 == serverUrl.getPort()) {
- return new ConsulRawClient(serverUrl.getHost());
- }
- return new ConsulRawClient(serverUrl.getHost(), serverUrl.getPort());
- }
-
- private NewSession createNewSession(final String key) {
- NewSession result = new NewSession();
- result.setName(key);
- result.setBehavior(Session.Behavior.DELETE);
-
result.setTtl(consulProps.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
- return result;
- }
-
- @Override
- public void persistExclusiveEphemeral(final String key, final String
value) {
- persistEphemeral(key, value);
- }
-
- @Override
- public void watch(final String key, final DataChangedEventListener
listener) {
- Thread watchThread = new Thread(() -> watchChildKeyChangeEvent(key,
listener));
- watchThread.setDaemon(true);
- watchThread.start();
- }
-
- private void watchChildKeyChangeEvent(final String key, final
DataChangedEventListener listener) {
- AtomicBoolean running = new AtomicBoolean(true);
- long currentIndex = 0;
- while (running.get()) {
- Response<List<GetValue>> response = consulClient.getKVValues(key,
new
QueryParams(consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS),
currentIndex));
- List<GetValue> value = response.getValue();
- if (null == value) {
- continue;
- }
- Long index = response.getConsulIndex();
- if (null != index && 0 == currentIndex) {
- currentIndex = index;
- if (!watchKeyMap.containsKey(key)) {
- watchKeyMap.put(key, new HashSet<>());
- }
- Collection<String> watchKeys = watchKeyMap.get(key);
- for (GetValue each : value) {
- watchKeys.add(each.getKey());
- }
- continue;
- }
- if (null != index && index > currentIndex) {
- currentIndex = index;
- Collection<String> newKeys = new HashSet<>(value.size(), 1F);
- Collection<String> watchKeys = watchKeyMap.get(key);
- for (GetValue each : value) {
- newKeys.add(each.getKey());
- if (!watchKeys.contains(each.getKey())) {
- watchKeys.add(each.getKey());
- fireDataChangeEvent(each, listener,
DataChangedEvent.Type.ADDED);
- } else if (watchKeys.contains(each.getKey()) &&
each.getModifyIndex() >= currentIndex) {
- fireDataChangeEvent(each, listener,
DataChangedEvent.Type.UPDATED);
- }
- }
- for (String each : watchKeys) {
- if (!newKeys.contains(each)) {
- GetValue getValue = new GetValue();
- getValue.setKey(each);
- fireDataChangeEvent(getValue, listener,
DataChangedEvent.Type.DELETED);
- }
- }
- watchKeyMap.put(key, newKeys);
- } else if (null != index && index < currentIndex) {
- currentIndex = 0;
- }
- }
- }
-
- private void fireDataChangeEvent(final GetValue getValue, final
DataChangedEventListener listener, final DataChangedEvent.Type type) {
- listener.onChange(new DataChangedEvent(getValue.getKey(),
getValue.getValue(), type));
- }
-
- /**
- * Flush session by update TTL.
- *
- * @param consulClient consul client
- * @param sessionId session id
- */
- public void generatorFlushSessionTtlTask(final ConsulClient consulClient,
final String sessionId) {
- SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() ->
consulClient.renewSession(sessionId, QueryParams.DEFAULT), 1L, 10L,
TimeUnit.SECONDS);
- }
-
- /**
- * See <a
href="https://developer.hashicorp.com/consul/api-docs/v1.17.x/status">Status
HTTP API</a> .
- *
- * @throws RuntimeException Unable to connect to Consul Agent.
- */
- private void verifyConsulAgentRunning() {
- HttpResponse httpResponse =
consulClient.getRawClient().makeGetRequest("/v1/status/leader");
- if (HttpStatus.SC_OK != httpResponse.getStatusCode()) {
- throw new RuntimeException("Unable to connect to Consul Agent and
StatusCode is " + httpResponse.getStatusCode() + ".");
- }
- }
-
- @Override
- public String getType() {
- return "Consul";
- }
-}
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereConsulClient.java
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereConsulClient.java
deleted file mode 100644
index 7b101742cc5..00000000000
---
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereConsulClient.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.repository.cluster.consul;
-
-import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.ConsulRawClient;
-import lombok.Getter;
-
-/**
- * ShardingSphere consul client support use raw client.
- */
-@Getter
-public final class ShardingSphereConsulClient extends ConsulClient {
-
- private final ConsulRawClient rawClient;
-
- public ShardingSphereConsulClient(final ConsulRawClient rawClient) {
- super(rawClient);
- this.rawClient = rawClient;
- }
-}
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java
deleted file mode 100644
index 80715ffb5af..00000000000
---
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ShardingSphereQueryParams.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.repository.cluster.consul;
-
-import com.ecwid.consul.UrlParameters;
-import lombok.RequiredArgsConstructor;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * ShardingSphere query params.
- */
-@RequiredArgsConstructor
-public final class ShardingSphereQueryParams implements UrlParameters {
-
- private final long waitMillis;
-
- private final long index;
-
- @Override
- public List<String> toUrlParameters() {
- List<String> result = new ArrayList<>(2);
- if (-1 != waitMillis) {
- result.add(String.format("wait=%dms",
TimeUnit.MILLISECONDS.toMillis(waitMillis)));
- }
- if (-1 != index) {
- result.add(String.format("index=%s",
Long.toUnsignedString(index)));
- }
- return result;
- }
-}
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
deleted file mode 100644
index a91b8130243..00000000000
---
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLock.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.repository.cluster.consul.lock;
-
-import com.ecwid.consul.ConsulException;
-import com.ecwid.consul.transport.HttpResponse;
-import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.OperationException;
-import com.ecwid.consul.v1.QueryParams;
-import com.ecwid.consul.v1.Response;
-import com.ecwid.consul.v1.kv.model.GetValue;
-import com.ecwid.consul.v1.kv.model.PutParams;
-import com.ecwid.consul.v1.session.model.NewSession;
-import com.ecwid.consul.v1.session.model.Session.Behavior;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Strings;
-import org.apache.shardingsphere.infra.util.json.JsonUtils;
-import
org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereConsulClient;
-import
org.apache.shardingsphere.mode.repository.cluster.consul.ShardingSphereQueryParams;
-import
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
-import
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
-import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
-
-import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Consul distributed lock.
- */
-public final class ConsulDistributedLock implements DistributedLock {
-
- private static final String LOCK_PATH_PATTERN = "lock/%s";
-
- private static final String LOCK_VALUE = "LOCKED";
-
- private static final String UNLOCK_VALUE = "UNLOCKED";
-
- private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR =
new ScheduledThreadPoolExecutor(2);
-
- private final String lockPath;
-
- private final ConsulClient client;
-
- private final String timeToLiveSeconds;
-
- private final ThreadLocal<String> lockSessionId;
-
- public ConsulDistributedLock(final String lockKey, final ConsulClient
client, final ConsulProperties props) {
- lockPath = String.format(LOCK_PATH_PATTERN, lockKey);
- this.client = client;
- timeToLiveSeconds =
props.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS);
- lockSessionId = new ThreadLocal<>();
- }
-
- @Override
- public boolean tryLock(final long timeoutMillis) {
- if (!Strings.isNullOrEmpty(lockSessionId.get())) {
- return true;
- }
- PutParams putParams = new PutParams();
- long remainingMillis = timeoutMillis;
- while (true) {
- String sessionId = createSessionId();
- putParams.setAcquireSession(sessionId);
- Response<Boolean> response = client.setKVValue(lockPath,
LOCK_VALUE, putParams);
- if (response.getValue()) {
- return tryLock(sessionId);
- }
- client.sessionDestroy(sessionId, null);
- long waitingMillis = waitUntilRelease(response.getConsulIndex(),
remainingMillis);
- if (waitingMillis >= remainingMillis) {
- return false;
- }
- remainingMillis -= waitingMillis;
- }
- }
-
- private boolean tryLock(final String sessionId) {
- lockSessionId.set(sessionId);
- SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() ->
client.renewSession(sessionId, QueryParams.DEFAULT), 5L, 10L, TimeUnit.SECONDS);
- return true;
- }
-
- private String createSessionId() {
- NewSession session = new NewSession();
- session.setName(lockPath);
- session.setTtl(timeToLiveSeconds);
- session.setBehavior(Behavior.RELEASE);
- return client.sessionCreate(session, null).getValue();
- }
-
- private long waitUntilRelease(final long valueIndex, final long
timeoutMillis) {
- long currentIndex = valueIndex < 0 ? 0 : valueIndex;
- long spentMillis = 0L;
- long timeoutTime = System.currentTimeMillis() + timeoutMillis;
- long remainingMillis = timeoutMillis;
- while (true) {
- long startTime = System.currentTimeMillis();
- if (startTime >= timeoutTime) {
- return timeoutMillis;
- }
- Response<GetValue> response = getResponse(
- ((ShardingSphereConsulClient)
client).getRawClient().makeGetRequest(String.format("/v1/kv/%s", lockPath),
null, new ShardingSphereQueryParams(remainingMillis, currentIndex)));
- spentMillis += System.currentTimeMillis() - startTime;
- remainingMillis -= spentMillis;
- Long index = response.getConsulIndex();
- if (null != index && index >= currentIndex) {
- if (0 != currentIndex && (null == response.getValue() || null
== response.getValue().getValue() ||
lockPath.equals(response.getValue().getKey()))) {
- return spentMillis;
- }
- currentIndex = index;
- continue;
- }
- if (null != index) {
- currentIndex = 0;
- }
- }
- }
-
- private Response<GetValue> getResponse(final HttpResponse rawResponse) {
- if (200 == rawResponse.getStatusCode()) {
- List<GetValue> value =
JsonUtils.fromJsonString(rawResponse.getContent(), new
TypeReference<List<GetValue>>() {
- });
- if (value.isEmpty()) {
- return new Response<>(null, rawResponse);
- }
- if (1 == value.size()) {
- return new Response<>(value.get(0), rawResponse);
- }
- throw new ConsulException("Strange response (list size=" +
value.size() + ")");
- }
- if (404 == rawResponse.getStatusCode()) {
- return new Response<>(null, rawResponse);
- }
- throw new OperationException(rawResponse);
- }
-
- @Override
- public void unlock() {
- String sessionId = lockSessionId.get();
- PutParams putParams = new PutParams();
- putParams.setReleaseSession(sessionId);
- try {
- client.setKVValue(lockPath, UNLOCK_VALUE, putParams);
- client.sessionDestroy(sessionId, null);
- } finally {
- lockSessionId.remove();
- }
- }
-}
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockCreator.java
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockCreator.java
deleted file mode 100644
index cd98e7792fb..00000000000
---
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/lock/ConsulDistributedLockCreator.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.repository.cluster.consul.lock;
-
-import com.ecwid.consul.v1.ConsulClient;
-import
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
-import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
-import
org.apache.shardingsphere.mode.repository.cluster.lock.creator.DistributedLockCreator;
-
-/**
- * Consul distributed lock creator.
- */
-public final class ConsulDistributedLockCreator implements
DistributedLockCreator<ConsulClient, ConsulProperties> {
-
- @Override
- public DistributedLock create(final String lockKey, final ConsulClient
client, final ConsulProperties props) {
- return new ConsulDistributedLock(lockKey, client, props);
- }
-
- @Override
- public String getType() {
- return "Consul";
- }
-}
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulProperties.java
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulProperties.java
deleted file mode 100644
index 7e19fcd92ee..00000000000
---
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulProperties.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.repository.cluster.consul.props;
-
-import org.apache.shardingsphere.infra.props.TypedProperties;
-import java.util.Properties;
-
-/**
- * Typed properties of Consul.
- */
-public final class ConsulProperties extends TypedProperties<ConsulPropertyKey>
{
-
- public ConsulProperties(final Properties props) {
- super(ConsulPropertyKey.class, props);
- }
-}
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertyKey.java
b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertyKey.java
deleted file mode 100644
index b6cb2536e0a..00000000000
---
a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertyKey.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.repository.cluster.consul.props;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.props.TypedPropertyKey;
-
-/**
- * Typed property key of Consul.
- */
-@RequiredArgsConstructor
-@Getter
-public enum ConsulPropertyKey implements TypedPropertyKey {
-
- /**
- * Time to live seconds.
- */
- TIME_TO_LIVE_SECONDS("timeToLiveSeconds", "30s", String.class),
-
- /**
- * Block query time seconds.
- */
- BLOCK_QUERY_TIME_TO_SECONDS("blockQueryTimeToSeconds", "60", long.class);
-
- private final String key;
-
- private final String defaultValue;
-
- private final Class<?> type;
-}
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository
b/mode/type/cluster/repository/provider/consul/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository
deleted file mode 100644
index d3581d39e0b..00000000000
---
a/mode/type/cluster/repository/provider/consul/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.mode.repository.cluster.consul.ConsulRepository
diff --git
a/mode/type/cluster/repository/provider/consul/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.lock.creator.DistributedLockCreator
b/mode/type/cluster/repository/provider/consul/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.lock.creator.DistributedLockCreator
deleted file mode 100644
index 2d2664d4099..00000000000
---
a/mode/type/cluster/repository/provider/consul/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.lock.creator.DistributedLockCreator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.mode.repository.cluster.consul.lock.ConsulDistributedLockCreator
diff --git
a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
deleted file mode 100644
index e4fa64e4487..00000000000
---
a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepositoryTest.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.repository.cluster.consul;
-
-import com.ecwid.consul.transport.HttpResponse;
-import com.ecwid.consul.v1.ConsulRawClient;
-import com.ecwid.consul.v1.QueryParams;
-import com.ecwid.consul.v1.Response;
-import com.ecwid.consul.v1.kv.model.GetValue;
-import com.ecwid.consul.v1.kv.model.PutParams;
-import com.ecwid.consul.v1.session.model.NewSession;
-import lombok.SneakyThrows;
-import org.apache.http.HttpStatus;
-import
org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
-import
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.exceptions.base.MockitoException;
-import org.mockito.internal.configuration.plugins.Plugins;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.plugins.MemberAccessor;
-import org.mockito.quality.Strictness;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.LENIENT)
-class ConsulRepositoryTest {
-
- private final ConsulRepository repository = new ConsulRepository();
-
- @Mock
- private ShardingSphereConsulClient client;
-
- @Mock
- private Response<GetValue> response;
-
- @Mock
- private Response<List<String>> responseList;
-
- @Mock
- private Response<List<GetValue>> responseGetValueList;
-
- @Mock
- private Response<Boolean> responseBoolean;
-
- @Mock
- private Response<String> sessionResponse;
-
- @Mock
- private GetValue getValue;
-
- @Mock
- private List<GetValue> getValueList;
-
- @Mock
- private ConsulRawClient consulRawClient;
-
- @Mock
- private HttpResponse httpResponse;
-
- private long index = 123456L;
-
- @BeforeEach
- void setUp() {
- setClient();
- setProperties();
- }
-
- @SneakyThrows(ReflectiveOperationException.class)
- private void setClient() {
- when(client.getKVValue(any(String.class))).thenReturn(response);
- when(response.getValue()).thenReturn(getValue);
- when(client.getKVValues(any(String.class),
any(QueryParams.class))).thenReturn(responseGetValueList);
- when(client.getKVKeysOnly(any(String.class))).thenReturn(responseList);
- when(client.sessionCreate(any(NewSession.class),
any(QueryParams.class))).thenReturn(sessionResponse);
- when(sessionResponse.getValue()).thenReturn("12323ddsf3sss");
- when(responseGetValueList.getConsulIndex()).thenReturn(index++);
- when(responseGetValueList.getValue()).thenReturn(getValueList);
- when(client.setKVValue(any(String.class),
any(String.class))).thenReturn(responseBoolean);
-
Plugins.getMemberAccessor().set(repository.getClass().getDeclaredField("consulClient"),
repository, client);
-
Plugins.getMemberAccessor().set(repository.getClass().getDeclaredField("distributedLockHolder"),
repository, mock(DistributedLockHolder.class));
- }
-
- @SneakyThrows(ReflectiveOperationException.class)
- private void setProperties() {
- MemberAccessor accessor = Plugins.getMemberAccessor();
- accessor.set(repository.getClass().getDeclaredField("consulProps"),
repository, new ConsulProperties(new Properties()));
- accessor.set(repository.getClass().getDeclaredField("watchKeyMap"),
repository, new HashMap<>(4, 1F));
- }
-
- @Test
- void assertDirectlyKey() {
- repository.getDirectly("key");
- verify(client).getKVValue("key");
- verify(response).getValue();
- }
-
- @Test
- void assertGetChildrenKeys() {
- final String key = "/key";
- String k1 = "/key/key1/key1-1";
- String v1 = "value1";
- client.setKVValue(k1, v1);
- String k2 = "/key/key2";
- String v2 = "value2";
- client.setKVValue(k2, v2);
- List<String> getValues = Arrays.asList(k1, k2);
- when(responseList.getValue()).thenReturn(getValues);
- List<String> actual = repository.getChildrenKeys(key);
- assertThat(actual.size(), is(2));
- Iterator<String> iterator = actual.iterator();
- assertThat(iterator.next(), is("/key/key1/key1-1"));
- assertThat(iterator.next(), is("/key/key2"));
- }
-
- @Test
- void assertPersistEphemeral() {
- when(client.getRawClient()).thenReturn(consulRawClient);
-
when(consulRawClient.makeGetRequest(any(String.class))).thenReturn(httpResponse);
- when(httpResponse.getStatusCode()).thenReturn(HttpStatus.SC_OK);
- repository.persistEphemeral("key1", "value1");
- verify(client).sessionCreate(any(NewSession.class),
any(QueryParams.class));
- verify(client).setKVValue(any(String.class), any(String.class),
any(PutParams.class));
- }
-
- @Test
- void assertWatchUpdate() {
- final String key = "sharding/key";
- final String k1 = "sharding/key/key1";
- final String v1 = "value1";
- client.setKVValue(k1, v1);
- GetValue getValue1 = new GetValue();
- getValue1.setKey(k1);
- getValue1.setValue(v1);
-
when(responseGetValueList.getValue()).thenReturn(Collections.singletonList(getValue1));
- repository.watch(key, event -> {
- });
- client.setKVValue(k1, "value1-1");
- while (true) {
- Awaitility.await().pollDelay(100L, TimeUnit.MILLISECONDS).until(()
-> true);
- try {
- verify(client, atLeastOnce()).getKVValues(any(String.class),
any(QueryParams.class));
- break;
- } catch (final MockitoException ignored) {
- }
- }
- }
-
- @Test
- void assertWatchDelete() {
- final String key = "sharding/key";
- final String k1 = "sharding/key/key1";
- final String v1 = "value1";
- final String k2 = "sharding/key/key2";
- final String v2 = "value1";
- client.setKVValue(k1, v1);
- client.setKVValue(k2, v2);
- GetValue getValue1 = new GetValue();
- getValue1.setKey(k1);
- getValue1.setValue(v1);
-
when(responseGetValueList.getValue()).thenReturn(Collections.singletonList(getValue1));
- repository.watch(key, event -> {
- });
- client.deleteKVValue(k2);
- while (true) {
- Awaitility.await().pollDelay(100L, TimeUnit.MILLISECONDS).until(()
-> true);
- try {
- verify(client, atLeastOnce()).getKVValues(any(String.class),
any(QueryParams.class));
- break;
- } catch (final MockitoException ignored) {
- }
- }
- }
-
- @Test
- void assertDelete() {
- repository.delete("key");
- verify(client).deleteKVValue(any(String.class));
- }
-
- @Test
- void assertPersist() {
- repository.persist("key1", "value1");
- verify(client).setKVValue(any(String.class), any(String.class));
- }
-
- @Test
- void assertNullResponse() {
- when(response.getValue()).thenReturn(null);
- final String key = "/key";
- assertDoesNotThrow(() -> {
- repository.getDirectly(key);
- repository.getChildrenKeys(key);
- });
- when(responseGetValueList.getValue()).thenReturn(null);
- assertDoesNotThrow(() -> {
- repository.watch(key, event -> {
- });
- client.setKVValue(key, "value");
- });
- }
-}
diff --git
a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertiesTest.java
b/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertiesTest.java
deleted file mode 100644
index 888a7cf6f98..00000000000
---
a/mode/type/cluster/repository/provider/consul/src/test/java/org/apache/shardingsphere/mode/repository/cluster/consul/props/ConsulPropertiesTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.repository.cluster.consul.props;
-
-import org.apache.shardingsphere.test.util.PropertiesBuilder;
-import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
-import org.junit.jupiter.api.Test;
-
-import java.util.Properties;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-class ConsulPropertiesTest {
-
- @Test
- void assertGetValue() {
- assertThat(new ConsulProperties(PropertiesBuilder.build(new
Property(ConsulPropertyKey.TIME_TO_LIVE_SECONDS.getKey(),
"50"))).getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS),
- is(60L));
- }
-
- @Test
- void assertGetDefaultValue() {
- assertThat(new ConsulProperties(new
Properties()).getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS), is("30s"));
- }
-}
diff --git a/mode/type/cluster/repository/provider/pom.xml
b/mode/type/cluster/repository/provider/pom.xml
index 68d78a260d2..429c62569ec 100644
--- a/mode/type/cluster/repository/provider/pom.xml
+++ b/mode/type/cluster/repository/provider/pom.xml
@@ -30,6 +30,5 @@
<modules>
<module>zookeeper</module>
<module>etcd</module>
- <module>consul</module>
</modules>
</project>
diff --git a/pom.xml b/pom.xml
index 09a9e94ccaa..3469b1abdb2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,11 +104,9 @@
<audience-annotations.version>0.12.0</audience-annotations.version>
<jetcd.version>0.7.7</jetcd.version>
<vertx.version>4.5.1</vertx.version>
- <consul.api.version>1.4.5</consul.api.version>
<grpc.version>1.58.0</grpc.version>
<protobuf.version>3.21.12</protobuf.version>
- <httpclient.version>4.5.14</httpclient.version>
<okhttp.version>4.12.0</okhttp.version>
<elasticjob.version>3.0.4</elasticjob.version>