This is an automated email from the ASF dual-hosted git repository.

xingfudeshi pushed a commit to branch gsoc-2025-meta-registry
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/gsoc-2025-meta-registry by 
this push:
     new 5dbe239c7e optimize: metadata discovery support for redis
5dbe239c7e is described below

commit 5dbe239c7e0d72ff53ddad0908551ca56683098c
Author: YoWuwuuuw <[email protected]>
AuthorDate: Tue Nov 18 14:19:24 2025 +0800

    optimize: metadata discovery support for redis
---
 .../registry/redis/RedisRegistryServiceImpl.java   | 83 ++++++++++++++++------
 .../redis/RedisRegisterServiceImplTest.java        | 22 ++++--
 2 files changed, 77 insertions(+), 28 deletions(-)

diff --git 
a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
 
b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
index 9fd93b2e06..bdeed8d0ac 100644
--- 
a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
@@ -43,13 +43,13 @@ import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 /**
  * The type Redis registry service.
@@ -155,23 +155,36 @@ public class RedisRegistryServiceImpl implements 
RegistryService<RedisListener>
     public void register(ServiceInstance instance) {
         InetSocketAddress address = instance.getAddress();
         NetUtil.validAddress(address);
-        doRegisterOrExpire(address, true);
+        // 1) set alive key to ensure key exists
+        doRegisterOrExpire(address);
+        // 2) write metadata once and set ttl; subsequent heartbeats will 
refresh ttl only
+        String serverAddr = NetUtil.toStringAddress(address);
+        String metaKey = getRedisRegistryMetaKey(serverAddr);
+        try (Jedis jedis = jedisPool.getResource();
+                Pipeline pipelined = jedis.pipelined()) {
+            if (instance.getMetadata() != null && 
!instance.getMetadata().isEmpty()) {
+                for (Map.Entry<String, Object> e : 
instance.getMetadata().entrySet()) {
+                    String value = e.getValue() == null ? "" : 
String.valueOf(e.getValue());
+                    pipelined.hset(metaKey, e.getKey(), value);
+                }
+            }
+            // ensure metadata key ttl
+            pipelined.expire(metaKey, (int) KEY_TTL);
+            // 3) publish register after metadata prepared
+            pipelined.publish(getRedisRegistryKey(), serverAddr + "-" + 
RedisListener.REGISTER);
+            pipelined.sync();
+        }
         RegistryHeartBeats.addHeartBeat(REGISTRY_TYPE, address, 
KEY_REFRESH_PERIOD, this::doRegisterOrExpire);
     }
 
     private void doRegisterOrExpire(InetSocketAddress address) {
-        doRegisterOrExpire(address, false);
-    }
-
-    private void doRegisterOrExpire(InetSocketAddress address, boolean 
publish) {
         String serverAddr = NetUtil.toStringAddress(address);
         String key = getRedisRegistryKey() + "_" + serverAddr; // key = 
registry.redis.${cluster}_ip:port
         try (Jedis jedis = jedisPool.getResource();
                 Pipeline pipelined = jedis.pipelined()) {
             pipelined.setex(key, KEY_TTL, 
ManagementFactory.getRuntimeMXBean().getName());
-            if (publish) {
-                pipelined.publish(getRedisRegistryKey(), serverAddr + "-" + 
RedisListener.REGISTER);
-            }
+            // refresh metadata ttl as well
+            pipelined.expire(getRedisRegistryMetaKey(serverAddr), (int) 
KEY_TTL);
             pipelined.sync();
         }
     }
@@ -185,6 +198,9 @@ public class RedisRegistryServiceImpl implements 
RegistryService<RedisListener>
                 Pipeline pipelined = jedis.pipelined()) {
             pipelined.hdel(getRedisRegistryKey(), serverAddr);
             pipelined.publish(getRedisRegistryKey(), serverAddr + "-" + 
RedisListener.UN_REGISTER);
+            // remove ephemeral key and metadata hash proactively
+            pipelined.del(getRedisRegistryKey() + "_" + serverAddr);
+            pipelined.del(getRedisRegistryMetaKey(serverAddr));
             pipelined.sync();
         }
     }
@@ -252,9 +268,21 @@ public class RedisRegistryServiceImpl implements 
RegistryService<RedisListener>
                 String eventType = msgr[1];
                 switch (eventType) {
                     case RedisListener.REGISTER:
-                        CollectionUtils.computeIfAbsent(
-                                        CLUSTER_INSTANCE_MAP, clusterName, 
value -> ConcurrentHashMap.newKeySet(2))
-                                .add(new 
ServiceInstance(NetUtil.toInetSocketAddress(serverAddr)));
+                        {
+                            Map<String, String> meta = null;
+                            try (Jedis jedis = jedisPool.getResource()) {
+                                meta = 
jedis.hgetAll(getRedisRegistryMetaKey(serverAddr));
+                            }
+                            InetSocketAddress addr = 
NetUtil.toInetSocketAddress(serverAddr);
+                            ServiceInstance instance = meta == null || 
meta.isEmpty()
+                                    ? new ServiceInstance(addr)
+                                    : ServiceInstance.fromStringMap(addr, 
meta);
+                            Set<ServiceInstance> set = 
CollectionUtils.computeIfAbsent(
+                                    CLUSTER_INSTANCE_MAP, clusterName, value 
-> ConcurrentHashMap.newKeySet(2));
+                            // replace older entry with same address to avoid 
duplicates with/without metadata
+                            set.removeIf(si -> si.getAddress().equals(addr));
+                            set.add(instance);
+                        }
                         break;
                     case RedisListener.UN_REGISTER:
                         removeServerAddressByPushEmptyProtection(clusterName, 
serverAddr);
@@ -365,26 +393,30 @@ public class RedisRegistryServiceImpl implements 
RegistryService<RedisListener>
         scanParams.count(10);
         scanParams.match(redisRegistryKey + "_*");
         String cursor = ScanParams.SCAN_POINTER_START;
-        Set<InetSocketAddress> newAddressSet = ConcurrentHashMap.newKeySet(2);
+        Set<ServiceInstance> currentInstances = ConcurrentHashMap.newKeySet(2);
         do {
             ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
             cursor = scanResult.getCursor();
             List<String> instances = scanResult.getResult();
             if (instances != null && !instances.isEmpty()) {
                 // key = registry.redis.${cluster}_ip:port
-                Set<InetSocketAddress> part = instances.stream()
-                        .map(key -> {
-                            String[] split = key.split("_");
-                            return NetUtil.toInetSocketAddress(split[1]);
-                        })
-                        .collect(Collectors.toSet());
-
-                newAddressSet.addAll(part);
+                for (String key : instances) {
+                    String[] split = key.split("_");
+                    if (split.length < 2) {
+                        continue;
+                    }
+                    String serverAddr = split[1];
+                    InetSocketAddress address = 
NetUtil.toInetSocketAddress(serverAddr);
+                    Map<String, String> meta = 
jedis.hgetAll(getRedisRegistryMetaKey(serverAddr));
+                    if (meta == null || meta.isEmpty()) {
+                        currentInstances.add(new ServiceInstance(address));
+                    } else {
+                        
currentInstances.add(ServiceInstance.fromStringMap(address, meta));
+                    }
+                }
             }
         } while (!cursor.equals(ScanParams.SCAN_POINTER_START));
 
-        Set<ServiceInstance> currentInstances = 
ServiceInstance.convertToServiceInstanceSet(newAddressSet);
-
         if (CollectionUtils.isNotEmpty(currentInstances)
                 && 
!currentInstances.equals(CLUSTER_INSTANCE_MAP.get(clusterName))) {
             CLUSTER_INSTANCE_MAP.put(clusterName, currentInstances);
@@ -395,6 +427,11 @@ public class RedisRegistryServiceImpl implements 
RegistryService<RedisListener>
         return REDIS_FILEKEY_PREFIX + clusterName;
     }
 
+    private String getRedisRegistryMetaKey(String serverAddr) {
+        // meta key example: registry.redis.${cluster}.meta_ip:port
+        return REDIS_FILEKEY_PREFIX + clusterName + ".meta_" + serverAddr;
+    }
+
     private String getRedisAddrFileKey() {
         return REDIS_FILEKEY_PREFIX + PRO_SERVER_ADDR_KEY;
     }
diff --git 
a/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
 
b/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
index d8738f2d55..b3c41fd8b1 100644
--- 
a/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
+++ 
b/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
@@ -35,6 +35,9 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
@@ -66,15 +69,24 @@ public class RedisRegisterServiceImplTest {
 
     @Test
     @Order(1)
-    public void testFlow() {
-        ServiceInstance serviceInstance = new ServiceInstance(new 
InetSocketAddress(NetUtil.getLocalIp(), 8091));
+    public void testRegisterWithMetadataAndLookup() {
+        Map<String, Object> meta = new HashMap<>();
+        meta.put("zone", "A");
+        meta.put("version", "v1");
+        ServiceInstance serviceInstance = new ServiceInstance(new 
InetSocketAddress(NetUtil.getLocalIp(), 8092), meta);
         redisRegistryService.register(serviceInstance);
 
-        
Assertions.assertTrue(redisRegistryService.lookup("default_tx_group").size() > 
0);
+        List<ServiceInstance> instances = 
redisRegistryService.lookup("default_tx_group");
+        ServiceInstance target = instances.stream()
+                .filter(si -> si.getAddress().getPort() == 8092)
+                .findFirst()
+                .orElse(null);
+        Assertions.assertNotNull(target);
+        Assertions.assertNotNull(target.getMetadata());
+        Assertions.assertEquals("A", target.getMetadata().get("zone"));
+        Assertions.assertEquals("v1", target.getMetadata().get("version"));
 
         redisRegistryService.unregister(serviceInstance);
-
-        
Assertions.assertTrue(redisRegistryService.lookup("default_tx_group").size() > 
0);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to