This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new be01fb010c optimize: use curator instead of zkclient in config model
(#6779)
be01fb010c is described below
commit be01fb010c176886adf795c8226f9fb603c90309
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Thu Sep 12 11:22:00 2024 +0800
optimize: use curator instead of zkclient in config model (#6779)
---
changes/en-us/2.x.md | 1 +
changes/zh-cn/2.x.md | 2 +-
.../seata/config/ConfigurationChangeEvent.java | 11 +
config/seata-config-zk/pom.xml | 18 +-
.../seata/config/zk/DefaultZkSerializer.java | 42 ----
.../seata/config/zk/ZookeeperConfiguration.java | 266 +++++++++++++--------
.../seata/config/zk/ZkConfigurationTest.java | 127 ++++++++++
dependencies/pom.xml | 14 +-
8 files changed, 329 insertions(+), 152 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 84f351c675..d6e978f854 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -95,6 +95,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6819](https://github.com/apache/incubator-seata/pull/6819)] merge the
packaging processes of namingserver and seata-server
- [[#6827](https://github.com/apache/incubator-seata/pull/6827)] rename
namingserver registry type
- [[#6836](https://github.com/apache/incubator-seata/pull/6836)] add
independent nacos for the CI process
+- [[#6779](https://github.com/apache/incubator-seata/pull/6779)] use curator
instead of zkclient in config model
### refactor:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index f0987c88e6..72d7533a1a 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -95,7 +95,7 @@
- [[#6819](https://github.com/apache/incubator-seata/pull/6819)]
namingserver与server的合并打包
- [[#6827](https://github.com/apache/incubator-seata/pull/6827)]
重命名namingserver注册类型改为seata
- [[#6836](https://github.com/apache/incubator-seata/pull/6836)] 为CI流程增加独立nacos
-
+- [[#6779](https://github.com/apache/incubator-seata/pull/6779)]
在config模块中使用curator替代zkclient
### refactor:
diff --git
a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java
b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java
index 215805a301..714f8b1767 100644
---
a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java
+++
b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java
@@ -141,4 +141,15 @@ public class ConfigurationChangeEvent {
this.namespace = namespace;
return this;
}
+
+ @Override
+ public String toString() {
+ return "ConfigurationChangeEvent{" +
+ "dataId='" + dataId + '\'' +
+ ", oldValue='" + oldValue + '\'' +
+ ", newValue='" + newValue + '\'' +
+ ", namespace='" + namespace + '\'' +
+ ", changeType=" + changeType +
+ '}';
+ }
}
diff --git a/config/seata-config-zk/pom.xml b/config/seata-config-zk/pom.xml
index eacb161136..c84196de83 100644
--- a/config/seata-config-zk/pom.xml
+++ b/config/seata-config-zk/pom.xml
@@ -36,14 +36,16 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <exclusions>
- <exclusion>
- <artifactId>log4j</artifactId>
- <groupId>log4j</groupId>
- </exclusion>
- </exclusions>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
</dependency>
</dependencies>
diff --git
a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java
b/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java
deleted file mode 100644
index bd764d75c3..0000000000
---
a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.seata.config.zk;
-
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import java.nio.charset.StandardCharsets;
-
-/**
- * Default zk serializer.
- * <p>
- * If the user is not configured in config.zk.serializer configuration item,
then use default serializer.
- *
- * @since 1.3.0
- */
-public class DefaultZkSerializer implements ZkSerializer {
-
- @Override
- public byte[] serialize(Object data) throws ZkMarshallingError {
- return String.valueOf(data).getBytes(StandardCharsets.UTF_8);
- }
-
- @Override
- public Object deserialize(byte[] bytes) throws ZkMarshallingError {
- return new String(bytes, StandardCharsets.UTF_8);
- }
-}
diff --git
a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java
b/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java
index 8c29ad2086..b045984f66 100644
---
a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java
+++
b/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java
@@ -17,7 +17,8 @@
package org.apache.seata.config.zk;
import java.io.IOException;
-import java.lang.reflect.Constructor;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.Map;
import java.util.Properties;
@@ -29,7 +30,12 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.retry.RetryNTimes;
import org.apache.seata.common.exception.NotSupportYetException;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.CollectionUtils;
@@ -41,10 +47,7 @@ import org.apache.seata.config.ConfigurationChangeListener;
import org.apache.seata.config.ConfigurationChangeType;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.config.processor.ConfigProcessor;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +57,6 @@ import static
org.apache.seata.config.ConfigurationKeys.SEATA_FILE_ROOT_CONFIG;
/**
* The type Zookeeper configuration.
- *
*/
public class ZookeeperConfiguration extends AbstractConfiguration {
private final static Logger LOGGER =
LoggerFactory.getLogger(ZookeeperConfiguration.class);
@@ -75,15 +77,17 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
private static final int DEFAULT_CONNECT_TIMEOUT = 2000;
private static final String DEFAULT_CONFIG_PATH = ROOT_PATH +
"/seata.properties";
private static final String FILE_CONFIG_KEY_PREFIX = FILE_ROOT_CONFIG +
FILE_CONFIG_SPLIT_CHAR + CONFIG_TYPE
- + FILE_CONFIG_SPLIT_CHAR;
+ + FILE_CONFIG_SPLIT_CHAR;
private static final ExecutorService CONFIG_EXECUTOR = new
ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM,
- Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<>(),
- new NamedThreadFactory("ZKConfigThread", THREAD_POOL_NUM));
- private static volatile ZkClient zkClient;
+ Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("ZKConfigThread", THREAD_POOL_NUM));
+ private static volatile CuratorFramework zkClient;
private static final int MAP_INITIAL_CAPACITY = 8;
- private static final ConcurrentMap<String,
ConcurrentMap<ConfigurationChangeListener, ZKListener>> CONFIG_LISTENERS_MAP
- = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
+ private static final ConcurrentMap<String,
ConcurrentMap<ConfigurationChangeListener, NodeCacheListenerImpl>>
CONFIG_LISTENERS_MAP
+ = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
private static volatile Properties seataConfig = new Properties();
+ static final Charset CHARSET = StandardCharsets.UTF_8;
+ private static Map<String, CuratorCache> nodeCacheMap = new
ConcurrentHashMap<>();
/**
* Instantiates a new Zookeeper configuration.
@@ -93,26 +97,51 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
if (zkClient == null) {
synchronized (ZookeeperConfiguration.class) {
if (zkClient == null) {
- ZkSerializer zkSerializer = getZkSerializer();
String serverAddr =
FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + SERVER_ADDR_KEY);
int sessionTimeout =
FILE_CONFIG.getInt(FILE_CONFIG_KEY_PREFIX + SESSION_TIMEOUT_KEY,
DEFAULT_SESSION_TIMEOUT);
int connectTimeout =
FILE_CONFIG.getInt(FILE_CONFIG_KEY_PREFIX + CONNECT_TIMEOUT_KEY,
DEFAULT_CONNECT_TIMEOUT);
- zkClient = new ZkClient(serverAddr, sessionTimeout,
connectTimeout, zkSerializer);
+ CuratorFrameworkFactory.Builder builder =
CuratorFrameworkFactory.builder()
+ .connectString(serverAddr)
+ .retryPolicy(new RetryNTimes(1, 1000))
+ .connectionTimeoutMs(connectTimeout)
+ .sessionTimeoutMs(sessionTimeout);
String username =
FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + AUTH_USERNAME);
String password =
FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + AUTH_PASSWORD);
if (!StringUtils.isBlank(username) &&
!StringUtils.isBlank(password)) {
StringBuilder auth = new
StringBuilder(username).append(":").append(password);
- zkClient.addAuthInfo("digest",
auth.toString().getBytes());
+ builder.authorization("digest",
auth.toString().getBytes());
}
+ zkClient = builder.build();
+ zkClient.start();
}
}
- if (!zkClient.exists(ROOT_PATH)) {
- zkClient.createPersistent(ROOT_PATH, true);
+ if (!checkExists(ROOT_PATH)) {
+ createPersistent(ROOT_PATH);
}
initSeataConfig();
}
}
+ public void createPersistent(String path) {
+ try {
+ zkClient.create().forPath(path);
+ } catch (KeeperException.NodeExistsException e) {
+ LOGGER.warn("ZNode " + path + " already exists.", e);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public boolean checkExists(String path) {
+ try {
+ if (zkClient.checkExists().forPath(path) != null) {
+ return true;
+ }
+ } catch (Exception e) {
+ }
+ return false;
+ }
+
@Override
public String getTypeName() {
return CONFIG_TYPE;
@@ -125,13 +154,13 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
return value;
}
FutureTask<String> future = new FutureTask<>(() -> {
- String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId;
- if (!zkClient.exists(path)) {
+ String path = buildPath(dataId);
+ if (!checkExists(path)) {
LOGGER.warn("config {} is not existed, return defaultValue {}
",
- dataId, defaultValue);
+ dataId, defaultValue);
return defaultValue;
}
- String value1 = zkClient.readData(path);
+ String value1 = readData(path);
return StringUtils.isNullOrEmpty(value1) ? defaultValue : value1;
});
CONFIG_EXECUTOR.execute(future);
@@ -139,25 +168,37 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
return future.get(timeoutMills, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOGGER.error("getConfig {} error or timeout, return defaultValue
{}, exception:{} ",
- dataId, defaultValue, e.getMessage());
+ dataId, defaultValue, e.getMessage());
return defaultValue;
}
}
+ public String readData(String path) {
+ try {
+ byte[] dataBytes = zkClient.getData().forPath(path);
+ return (dataBytes == null || dataBytes.length == 0) ? null : new
String(dataBytes, CHARSET);
+ } catch (KeeperException.NoNodeException e) {
+ // ignore NoNode Exception.
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ return null;
+ }
+
@Override
public boolean putConfig(String dataId, String content, long timeoutMills)
{
if (!seataConfig.isEmpty()) {
seataConfig.setProperty(dataId, content);
- zkClient.writeData(getConfigPath(), getSeataConfigStr());
+ createPersistent(getConfigPath(), getSeataConfigStr());
return true;
}
FutureTask<Boolean> future = new FutureTask<>(() -> {
- String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId;
- if (!zkClient.exists(path)) {
- zkClient.create(path, content, CreateMode.PERSISTENT);
+ String path = buildPath(dataId);
+ if (!checkExists(path)) {
+ createPersistent(path, content);
} else {
- zkClient.writeData(path, content);
+ createPersistent(path, content);
}
return true;
});
@@ -166,11 +207,31 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
return future.get(timeoutMills, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOGGER.error("putConfig {}, value: {} is error or timeout,
exception: {}",
- dataId, content, e.getMessage());
+ dataId, content, e.getMessage());
return false;
}
}
+ public String buildPath(String dataId) {
+ String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId;
+ return path;
+ }
+
+ protected void createPersistent(String path, String data) {
+ byte[] dataBytes = data.getBytes(CHARSET);
+ try {
+ zkClient.create().forPath(path, dataBytes);
+ } catch (KeeperException.NodeExistsException e) {
+ try {
+ zkClient.setData().forPath(path, dataBytes);
+ } catch (Exception e1) {
+ throw new IllegalStateException(e.getMessage(), e1);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
@Override
public boolean putConfigIfAbsent(String dataId, String content, long
timeoutMills) {
throw new NotSupportYetException("not support atomic operation
putConfigIfAbsent");
@@ -180,13 +241,13 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
public boolean removeConfig(String dataId, long timeoutMills) {
if (!seataConfig.isEmpty()) {
seataConfig.remove(dataId);
- zkClient.writeData(getConfigPath(), getSeataConfigStr());
+ createPersistent(getConfigPath(), getSeataConfigStr());
return true;
}
FutureTask<Boolean> future = new FutureTask<>(() -> {
- String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId;
- return zkClient.delete(path);
+ String path = buildPath(dataId);
+ return deletePath(path);
});
CONFIG_EXECUTOR.execute(future);
try {
@@ -198,25 +259,36 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
}
+ protected boolean deletePath(String path) {
+ try {
+ zkClient.delete().deletingChildrenIfNeeded().forPath(path);
+ return true;
+ } catch (KeeperException.NoNodeException ignored) {
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("deletePath {} is error or timeout", path, e);
+ return false;
+ }
+ }
+
@Override
public void addConfigListener(String dataId, ConfigurationChangeListener
listener) {
if (StringUtils.isBlank(dataId) || listener == null) {
return;
}
-
+ String path = buildPath(dataId);
if (!seataConfig.isEmpty()) {
- ZKListener zkListener = new ZKListener(dataId, listener);
+ NodeCacheListenerImpl zkListener = new
NodeCacheListenerImpl(dataId, listener);
+ CuratorCacheListener.builder().forAll(zkListener).build();
CONFIG_LISTENERS_MAP.computeIfAbsent(dataId, key -> new
ConcurrentHashMap<>())
- .put(listener, zkListener);
+ .put(listener, zkListener);
return;
}
-
- String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId;
- if (zkClient.exists(path)) {
- ZKListener zkListener = new ZKListener(path, listener);
+ if (checkExists(path)) {
+ NodeCacheListenerImpl zkListener = new NodeCacheListenerImpl(path,
listener);
CONFIG_LISTENERS_MAP.computeIfAbsent(dataId, key -> new
ConcurrentHashMap<>())
- .put(listener, zkListener);
- zkClient.subscribeDataChanges(path, zkListener);
+ .put(listener, zkListener);
+ addDataListener(path, zkListener);
}
}
@@ -227,18 +299,18 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
}
Set<ConfigurationChangeListener> configChangeListeners =
getConfigListeners(dataId);
if (CollectionUtils.isNotEmpty(configChangeListeners)) {
- String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId;
- if (zkClient.exists(path)) {
+ String path = buildPath(dataId);
+ if (checkExists(path)) {
for (ConfigurationChangeListener entry :
configChangeListeners) {
if (listener.equals(entry)) {
- ZKListener zkListener = null;
- Map<ConfigurationChangeListener, ZKListener>
configListeners = CONFIG_LISTENERS_MAP.get(dataId);
+ NodeCacheListenerImpl zkListener = null;
+ Map<ConfigurationChangeListener,
NodeCacheListenerImpl> configListeners = CONFIG_LISTENERS_MAP.get(dataId);
if (configListeners != null) {
zkListener = configListeners.get(listener);
configListeners.remove(entry);
}
if (zkListener != null) {
- zkClient.unsubscribeDataChanges(path, zkListener);
+ removeDataListener(path, zkListener);
}
break;
}
@@ -249,7 +321,7 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
@Override
public Set<ConfigurationChangeListener> getConfigListeners(String dataId) {
- ConcurrentMap<ConfigurationChangeListener, ZKListener> configListeners
= CONFIG_LISTENERS_MAP.get(dataId);
+ ConcurrentMap<ConfigurationChangeListener, NodeCacheListenerImpl>
configListeners = CONFIG_LISTENERS_MAP.get(dataId);
if (CollectionUtils.isNotEmpty(configListeners)) {
return configListeners.keySet();
} else {
@@ -259,15 +331,14 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
private void initSeataConfig() {
String configPath = getConfigPath();
- String config = zkClient.readData(configPath, true);
+ String config = readData(configPath);
if (StringUtils.isNotBlank(config)) {
try {
seataConfig = ConfigProcessor.processConfig(config,
getZkDataType());
} catch (IOException e) {
LOGGER.error("init config properties error", e);
}
- ZKListener zkListener = new ZKListener(configPath, null);
- zkClient.subscribeDataChanges(configPath, zkListener);
+ addDataListener(configPath, new NodeCacheListenerImpl(configPath,
null));
}
}
@@ -292,28 +363,25 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
return sb.toString();
}
- /**
- * The type Zk listener.
- */
- public static class ZKListener implements IZkDataListener {
-
+ public static class NodeCacheListenerImpl implements CuratorCacheListener {
private String path;
private ConfigurationChangeListener listener;
- /**
- * Instantiates a new Zk listener.
- *
- * @param path the path
- * @param listener the listener
- */
- public ZKListener(String path, ConfigurationChangeListener listener) {
+ public NodeCacheListenerImpl(String path, ConfigurationChangeListener
listener) {
this.path = path;
this.listener = listener;
}
@Override
- public void handleDataChange(String s, Object o) {
- if (s.equals(getConfigPath())) {
+ public void event(Type type, ChildData oldData, ChildData data) {
+
+ String o;
+ if (type == Type.NODE_DELETED) {
+ o = "";
+ } else {
+ o = new String(data.getData());
+ }
+ if (path.equals(getConfigPath())) {
Properties seataConfigNew = new Properties();
if (StringUtils.isNotBlank(o.toString())) {
try {
@@ -325,17 +393,17 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
}
}
- for (Map.Entry<String,
ConcurrentMap<ConfigurationChangeListener, ZKListener>> entry :
CONFIG_LISTENERS_MAP.entrySet()) {
+ for (Map.Entry<String,
ConcurrentMap<ConfigurationChangeListener, NodeCacheListenerImpl>> entry :
CONFIG_LISTENERS_MAP.entrySet()) {
String listenedDataId = entry.getKey();
String propertyOld =
seataConfig.getProperty(listenedDataId, "");
String propertyNew =
seataConfigNew.getProperty(listenedDataId, "");
if (!propertyOld.equals(propertyNew)) {
ConfigurationChangeEvent event = new
ConfigurationChangeEvent()
- .setDataId(listenedDataId)
- .setNewValue(propertyNew)
- .setChangeType(ConfigurationChangeType.MODIFY);
+ .setDataId(listenedDataId)
+ .setNewValue(propertyNew)
+ .setChangeType(ConfigurationChangeType.MODIFY);
- ConcurrentMap<ConfigurationChangeListener, ZKListener>
configListeners = entry.getValue();
+ ConcurrentMap<ConfigurationChangeListener,
NodeCacheListenerImpl> configListeners = entry.getValue();
for (ConfigurationChangeListener configListener :
configListeners.keySet()) {
configListener.onProcessEvent(event);
}
@@ -344,42 +412,42 @@ public class ZookeeperConfiguration extends
AbstractConfiguration {
seataConfig = seataConfigNew;
return;
+ } else {
+ if (type == Type.NODE_DELETED) {
+ // Node is deleted.
+ String dataId = path.replaceFirst(ROOT_PATH +
ZK_PATH_SPLIT_CHAR, "");
+ ConfigurationChangeEvent event = new
ConfigurationChangeEvent().setDataId(dataId).setChangeType(
+ ConfigurationChangeType.DELETE);
+ listener.onProcessEvent(event);
+ } else {
+ // Node is changed.
+ String dataId = path.replaceFirst(ROOT_PATH +
ZK_PATH_SPLIT_CHAR, "");
+ ConfigurationChangeEvent event = new
ConfigurationChangeEvent().setDataId(dataId).setNewValue(o.toString())
+ .setChangeType(ConfigurationChangeType.MODIFY);
+ listener.onProcessEvent(event);
+ }
}
- String dataId = s.replaceFirst(ROOT_PATH + ZK_PATH_SPLIT_CHAR, "");
- ConfigurationChangeEvent event = new
ConfigurationChangeEvent().setDataId(dataId).setNewValue(o.toString())
- .setChangeType(ConfigurationChangeType.MODIFY);
- listener.onProcessEvent(event);
- }
-
- @Override
- public void handleDataDeleted(String s) {
- String dataId = s.replaceFirst(ROOT_PATH + ZK_PATH_SPLIT_CHAR, "");
- ConfigurationChangeEvent event = new
ConfigurationChangeEvent().setDataId(dataId).setChangeType(
- ConfigurationChangeType.DELETE);
- listener.onProcessEvent(event);
}
}
- private ZkSerializer getZkSerializer() {
- ZkSerializer zkSerializer = null;
- String serializer = FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX +
SERIALIZER_KEY);
- if (StringUtils.isNotBlank(serializer)) {
- try {
- Class<?> clazz = Class.forName(serializer);
- Constructor<?> constructor = clazz.getDeclaredConstructor();
- constructor.setAccessible(true);
- zkSerializer = (ZkSerializer) constructor.newInstance();
- } catch (ClassNotFoundException cfe) {
- LOGGER.warn("No zk serializer class found, serializer:{}",
serializer, cfe);
- } catch (Throwable cause) {
- LOGGER.warn("found zk serializer encountered an unknown
exception", cause);
+ protected void addDataListener(String path, NodeCacheListenerImpl
nodeCacheListener) {
+ try {
+ CuratorCache nodeCache = CuratorCache.build(zkClient, path);
+ if (nodeCacheMap.putIfAbsent(path, nodeCache) != null) {
+ return;
}
+ nodeCache.listenable().addListener(nodeCacheListener);
+ nodeCache.start();
+ } catch (Exception e) {
+ throw new IllegalStateException("Add nodeCache listener for path:"
+ path, e);
}
- if (zkSerializer == null) {
- zkSerializer = new DefaultZkSerializer();
- LOGGER.info("Use default zk serializer:
org.apache.seata.config.zk.DefaultZkSerializer.");
- }
- return zkSerializer;
}
+ protected void removeDataListener(String path, NodeCacheListenerImpl
nodeCacheListener) {
+ CuratorCache nodeCache = nodeCacheMap.get(path);
+ if (nodeCache != null) {
+ nodeCache.listenable().removeListener(nodeCacheListener);
+ }
+ nodeCacheListener.listener = null;
+ }
}
diff --git
a/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java
b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java
new file mode 100644
index 0000000000..a5cc19442f
--- /dev/null
+++
b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.config.zk;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.curator.test.TestingServer;
+import org.apache.seata.config.ConfigurationChangeEvent;
+import org.apache.seata.config.ConfigurationChangeListener;
+import org.apache.seata.config.ConfigurationChangeType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The type zk configuration test
+ */
+public class ZkConfigurationTest {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ZkConfigurationTest.class);
+
+ protected static TestingServer server = null;
+
+ @BeforeAll
+ public static void adBeforeClass() throws Exception {
+ System.setProperty("config.type", "zk");
+ System.setProperty("config.zk.serverAddr", "127.0.0.1:2181");
+ server = new TestingServer(2181);
+ server.start();
+ }
+
+ @AfterAll
+ public static void adAfterClass() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ @Test
+ public void testCheckExist() {
+ ZookeeperConfiguration zookeeperConfiguration = new
ZookeeperConfiguration();
+ boolean exist = zookeeperConfiguration.checkExists("/");
+ Assertions.assertTrue(exist);
+ }
+
+ @Test
+ public void testPutConfig() {
+ ZookeeperConfiguration zookeeperConfiguration = new
ZookeeperConfiguration();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ final boolean[] listened = {false};
+ String dataId = "putMockDataId";
+ ConfigurationChangeListener changeListener = new
ConfigurationChangeListener() {
+ @Override
+ public void onChangeEvent(ConfigurationChangeEvent event) {
+ LOGGER.info("onChangeEvent:{}", event);
+ if (event.getChangeType() == ConfigurationChangeType.MODIFY) {
+ Assertions.assertEquals("value2", event.getNewValue());
+ listened[0] = true;
+ countDownLatch.countDown();
+ }
+ }
+ };
+
zookeeperConfiguration.createPersistent(zookeeperConfiguration.buildPath(dataId),
"value");
+ zookeeperConfiguration.addConfigListener(dataId, changeListener);
+ zookeeperConfiguration.putConfig(dataId, "value2");
+ try {
+ countDownLatch.await(10000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ Assertions.assertTrue(listened[0]);
+
+ zookeeperConfiguration.removeConfig(dataId);
+
+ zookeeperConfiguration.removeConfigListener(dataId, changeListener);
+ }
+
+ @Test
+ public void testRemoveConfig() {
+ ZookeeperConfiguration zookeeperConfiguration = new
ZookeeperConfiguration();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ final boolean[] listened = {false};
+ String dataId = "removeMockDataId";
+
zookeeperConfiguration.createPersistent(zookeeperConfiguration.buildPath(dataId),
"value");
+ ConfigurationChangeListener changeListener = new
ConfigurationChangeListener() {
+ @Override
+ public void onChangeEvent(ConfigurationChangeEvent event) {
+ LOGGER.info("onChangeEvent:{}", event);
+ if (event.getChangeType() == ConfigurationChangeType.DELETE) {
+ Assertions.assertNull(event.getNewValue());
+ listened[0] = true;
+ countDownLatch.countDown();
+ }
+ }
+ };
+
+ zookeeperConfiguration.addConfigListener(dataId, changeListener);
+ zookeeperConfiguration.putConfig(dataId, "value2");
+ boolean remove = zookeeperConfiguration.removeConfig(dataId);
+ Assertions.assertTrue(remove);
+ try {
+ countDownLatch.await(10000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ Assertions.assertTrue(listened[0]);
+ }
+
+}
diff --git a/dependencies/pom.xml b/dependencies/pom.xml
index d00ac503b7..bf5a1a2084 100644
--- a/dependencies/pom.xml
+++ b/dependencies/pom.xml
@@ -50,7 +50,7 @@
<aopalliance.version>1.0</aopalliance.version>
<zkclient.version>0.11</zkclient.version>
<apache-zookeeper.version>3.7.2</apache-zookeeper.version>
- <curator-test.version>5.1.0</curator-test.version>
+ <curator.version>5.1.0</curator.version>
<spring-context-support.version>1.0.2</spring-context-support.version>
<apollo-client.version>2.0.1</apollo-client.version>
<eureka-clients.version>1.10.18</eureka-clients.version>
@@ -354,10 +354,20 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
- <version>${curator-test.version}</version>
+ <version>${curator.version}</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]