This is an automated email from the ASF dual-hosted git repository.
xingfudeshi pushed a commit to branch gsoc-2025-meta-registry
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/gsoc-2025-meta-registry by
this push:
new 5dbe239c7e optimize: metadata discovery support for redis
5dbe239c7e is described below
commit 5dbe239c7e0d72ff53ddad0908551ca56683098c
Author: YoWuwuuuw <[email protected]>
AuthorDate: Tue Nov 18 14:19:24 2025 +0800
optimize: metadata discovery support for redis
---
.../registry/redis/RedisRegistryServiceImpl.java | 83 ++++++++++++++++------
.../redis/RedisRegisterServiceImplTest.java | 22 ++++--
2 files changed, 77 insertions(+), 28 deletions(-)
diff --git
a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
index 9fd93b2e06..bdeed8d0ac 100644
---
a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
+++
b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
@@ -43,13 +43,13 @@ import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
/**
* The type Redis registry service.
@@ -155,23 +155,36 @@ public class RedisRegistryServiceImpl implements
RegistryService<RedisListener>
public void register(ServiceInstance instance) {
InetSocketAddress address = instance.getAddress();
NetUtil.validAddress(address);
- doRegisterOrExpire(address, true);
+ // 1) set alive key to ensure key exists
+ doRegisterOrExpire(address);
+ // 2) write metadata once and set ttl; subsequent heartbeats will
refresh ttl only
+ String serverAddr = NetUtil.toStringAddress(address);
+ String metaKey = getRedisRegistryMetaKey(serverAddr);
+ try (Jedis jedis = jedisPool.getResource();
+ Pipeline pipelined = jedis.pipelined()) {
+ if (instance.getMetadata() != null &&
!instance.getMetadata().isEmpty()) {
+ for (Map.Entry<String, Object> e :
instance.getMetadata().entrySet()) {
+ String value = e.getValue() == null ? "" :
String.valueOf(e.getValue());
+ pipelined.hset(metaKey, e.getKey(), value);
+ }
+ }
+ // ensure metadata key ttl
+ pipelined.expire(metaKey, (int) KEY_TTL);
+ // 3) publish register after metadata prepared
+ pipelined.publish(getRedisRegistryKey(), serverAddr + "-" +
RedisListener.REGISTER);
+ pipelined.sync();
+ }
RegistryHeartBeats.addHeartBeat(REGISTRY_TYPE, address,
KEY_REFRESH_PERIOD, this::doRegisterOrExpire);
}
private void doRegisterOrExpire(InetSocketAddress address) {
- doRegisterOrExpire(address, false);
- }
-
- private void doRegisterOrExpire(InetSocketAddress address, boolean
publish) {
String serverAddr = NetUtil.toStringAddress(address);
String key = getRedisRegistryKey() + "_" + serverAddr; // key =
registry.redis.${cluster}_ip:port
try (Jedis jedis = jedisPool.getResource();
Pipeline pipelined = jedis.pipelined()) {
pipelined.setex(key, KEY_TTL,
ManagementFactory.getRuntimeMXBean().getName());
- if (publish) {
- pipelined.publish(getRedisRegistryKey(), serverAddr + "-" +
RedisListener.REGISTER);
- }
+ // refresh metadata ttl as well
+ pipelined.expire(getRedisRegistryMetaKey(serverAddr), (int)
KEY_TTL);
pipelined.sync();
}
}
@@ -185,6 +198,9 @@ public class RedisRegistryServiceImpl implements
RegistryService<RedisListener>
Pipeline pipelined = jedis.pipelined()) {
pipelined.hdel(getRedisRegistryKey(), serverAddr);
pipelined.publish(getRedisRegistryKey(), serverAddr + "-" +
RedisListener.UN_REGISTER);
+ // remove ephemeral key and metadata hash proactively
+ pipelined.del(getRedisRegistryKey() + "_" + serverAddr);
+ pipelined.del(getRedisRegistryMetaKey(serverAddr));
pipelined.sync();
}
}
@@ -252,9 +268,21 @@ public class RedisRegistryServiceImpl implements
RegistryService<RedisListener>
String eventType = msgr[1];
switch (eventType) {
case RedisListener.REGISTER:
- CollectionUtils.computeIfAbsent(
- CLUSTER_INSTANCE_MAP, clusterName,
value -> ConcurrentHashMap.newKeySet(2))
- .add(new
ServiceInstance(NetUtil.toInetSocketAddress(serverAddr)));
+ {
+ Map<String, String> meta = null;
+ try (Jedis jedis = jedisPool.getResource()) {
+ meta =
jedis.hgetAll(getRedisRegistryMetaKey(serverAddr));
+ }
+ InetSocketAddress addr =
NetUtil.toInetSocketAddress(serverAddr);
+ ServiceInstance instance = meta == null ||
meta.isEmpty()
+ ? new ServiceInstance(addr)
+ : ServiceInstance.fromStringMap(addr,
meta);
+ Set<ServiceInstance> set =
CollectionUtils.computeIfAbsent(
+ CLUSTER_INSTANCE_MAP, clusterName, value
-> ConcurrentHashMap.newKeySet(2));
+ // replace older entry with same address to avoid
duplicates with/without metadata
+ set.removeIf(si -> si.getAddress().equals(addr));
+ set.add(instance);
+ }
break;
case RedisListener.UN_REGISTER:
removeServerAddressByPushEmptyProtection(clusterName,
serverAddr);
@@ -365,26 +393,30 @@ public class RedisRegistryServiceImpl implements
RegistryService<RedisListener>
scanParams.count(10);
scanParams.match(redisRegistryKey + "_*");
String cursor = ScanParams.SCAN_POINTER_START;
- Set<InetSocketAddress> newAddressSet = ConcurrentHashMap.newKeySet(2);
+ Set<ServiceInstance> currentInstances = ConcurrentHashMap.newKeySet(2);
do {
ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
cursor = scanResult.getCursor();
List<String> instances = scanResult.getResult();
if (instances != null && !instances.isEmpty()) {
// key = registry.redis.${cluster}_ip:port
- Set<InetSocketAddress> part = instances.stream()
- .map(key -> {
- String[] split = key.split("_");
- return NetUtil.toInetSocketAddress(split[1]);
- })
- .collect(Collectors.toSet());
-
- newAddressSet.addAll(part);
+ for (String key : instances) {
+ String[] split = key.split("_");
+ if (split.length < 2) {
+ continue;
+ }
+ String serverAddr = split[1];
+ InetSocketAddress address =
NetUtil.toInetSocketAddress(serverAddr);
+ Map<String, String> meta =
jedis.hgetAll(getRedisRegistryMetaKey(serverAddr));
+ if (meta == null || meta.isEmpty()) {
+ currentInstances.add(new ServiceInstance(address));
+ } else {
+
currentInstances.add(ServiceInstance.fromStringMap(address, meta));
+ }
+ }
}
} while (!cursor.equals(ScanParams.SCAN_POINTER_START));
- Set<ServiceInstance> currentInstances =
ServiceInstance.convertToServiceInstanceSet(newAddressSet);
-
if (CollectionUtils.isNotEmpty(currentInstances)
&&
!currentInstances.equals(CLUSTER_INSTANCE_MAP.get(clusterName))) {
CLUSTER_INSTANCE_MAP.put(clusterName, currentInstances);
@@ -395,6 +427,11 @@ public class RedisRegistryServiceImpl implements
RegistryService<RedisListener>
return REDIS_FILEKEY_PREFIX + clusterName;
}
+ private String getRedisRegistryMetaKey(String serverAddr) {
+ // meta key example: registry.redis.${cluster}.meta_ip:port
+ return REDIS_FILEKEY_PREFIX + clusterName + ".meta_" + serverAddr;
+ }
+
private String getRedisAddrFileKey() {
return REDIS_FILEKEY_PREFIX + PRO_SERVER_ADDR_KEY;
}
diff --git
a/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
b/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
index d8738f2d55..b3c41fd8b1 100644
---
a/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
+++
b/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
@@ -35,6 +35,9 @@ import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
@@ -66,15 +69,24 @@ public class RedisRegisterServiceImplTest {
@Test
@Order(1)
- public void testFlow() {
- ServiceInstance serviceInstance = new ServiceInstance(new
InetSocketAddress(NetUtil.getLocalIp(), 8091));
+ public void testRegisterWithMetadataAndLookup() {
+ Map<String, Object> meta = new HashMap<>();
+ meta.put("zone", "A");
+ meta.put("version", "v1");
+ ServiceInstance serviceInstance = new ServiceInstance(new
InetSocketAddress(NetUtil.getLocalIp(), 8092), meta);
redisRegistryService.register(serviceInstance);
-
Assertions.assertTrue(redisRegistryService.lookup("default_tx_group").size() >
0);
+ List<ServiceInstance> instances =
redisRegistryService.lookup("default_tx_group");
+ ServiceInstance target = instances.stream()
+ .filter(si -> si.getAddress().getPort() == 8092)
+ .findFirst()
+ .orElse(null);
+ Assertions.assertNotNull(target);
+ Assertions.assertNotNull(target.getMetadata());
+ Assertions.assertEquals("A", target.getMetadata().get("zone"));
+ Assertions.assertEquals("v1", target.getMetadata().get("version"));
redisRegistryService.unregister(serviceInstance);
-
-
Assertions.assertTrue(redisRegistryService.lookup("default_tx_group").size() >
0);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]