This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new ca77ac91 fix(ClientImpl.java): broadcast SyncLiteSubscriptionRequest 
to all route endpoints (#1245)
ca77ac91 is described below

commit ca77ac914fe98fc82320aa6c0d6a4f581c35421e
Author: Quan <[email protected]>
AuthorDate: Thu May 14 14:44:01 2026 +0800

    fix(ClientImpl.java): broadcast SyncLiteSubscriptionRequest to all route 
endpoints (#1245)
    
    LiteSubscriptionManager#syncLiteSubscription previously only sent the
    request to the client's initial endpoint, causing lite subscription
    state to be inconsistent across brokers in a multi-broker cluster.
    Broadcast the request to all route endpoints and aggregate the results.
    Widen ClientImpl#getTotalRouteEndpoints() visibility to public accordingly.
---
 .../rocketmq/client/java/impl/ClientImpl.java      |  2 +-
 .../impl/consumer/LiteSubscriptionManager.java     | 23 ++++++++++++++--------
 .../impl/consumer/LiteSubscriptionManagerTest.java |  4 +++-
 3 files changed, 19 insertions(+), 10 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 691056e1..4d772104 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -669,7 +669,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         }, MoreExecutors.directExecutor());
     }
 
-    protected Set<Endpoints> getTotalRouteEndpoints() {
+    public Set<Endpoints> getTotalRouteEndpoints() {
         Set<Endpoints> totalRouteEndpoints = new HashSet<>();
         for (TopicRouteData topicRouteData : topicRouteCache.values()) {
             totalRouteEndpoints.addAll(topicRouteData.getTotalEndpoints());
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManager.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManager.java
index 07881144..7ce9a4aa 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManager.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManager.java
@@ -20,7 +20,6 @@ package org.apache.rocketmq.client.java.impl.consumer;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.LiteSubscriptionAction;
 import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
-import apache.rocketmq.v2.Status;
 import apache.rocketmq.v2.Subscription;
 import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
 import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
@@ -30,8 +29,10 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +43,7 @@ import 
org.apache.rocketmq.client.java.exception.LiteSubscriptionQuotaExceededEx
 import org.apache.rocketmq.client.java.exception.StatusChecker;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.misc.ProtobufUtils;
+import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -159,14 +161,19 @@ public class LiteSubscriptionManager {
             
builder.setOffsetOption(ProtobufUtils.toProtobufOffsetOption(offsetOption));
         }
 
+        final SyncLiteSubscriptionRequest request = builder.build();
         final Duration requestTimeout = 
consumerImpl.getClientConfiguration().getRequestTimeout();
-        RpcFuture<SyncLiteSubscriptionRequest, SyncLiteSubscriptionResponse> 
future = consumerImpl.getClientManager()
-            .syncLiteSubscription(consumerImpl.getEndpoints(), 
builder.build(), requestTimeout);
-        return Futures.transformAsync(future, response -> {
-            final Status status = response.getStatus();
-            StatusChecker.check(status, future);
-            return Futures.immediateVoidFuture();
-        }, MoreExecutors.directExecutor());
+        final Set<Endpoints> totalRouteEndpoints = 
consumerImpl.getTotalRouteEndpoints();
+        List<ListenableFuture<Void>> futures = new ArrayList<>();
+        for (Endpoints endpoints : totalRouteEndpoints) {
+            final RpcFuture<SyncLiteSubscriptionRequest, 
SyncLiteSubscriptionResponse> rpcFuture =
+                
consumerImpl.getClientManager().syncLiteSubscription(endpoints, request, 
requestTimeout);
+            futures.add(Futures.transformAsync(rpcFuture, response -> {
+                StatusChecker.check(response.getStatus(), rpcFuture);
+                return Futures.immediateVoidFuture();
+            }, MoreExecutors.directExecutor()));
+        }
+        return Futures.transform(Futures.allAsList(futures), input -> null, 
MoreExecutors.directExecutor());
     }
 
     void onNotifyUnsubscribeLiteCommand(NotifyUnsubscribeLiteCommand command) {
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManagerTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManagerTest.java
index 6087b6d7..b8e18a97 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManagerTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManagerTest.java
@@ -37,6 +37,7 @@ import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
 import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
@@ -84,7 +85,8 @@ public class LiteSubscriptionManagerTest {
         
when(consumerImpl.getClientConfiguration()).thenReturn(clientConfiguration);
         
when(clientConfiguration.getRequestTimeout()).thenReturn(Duration.ofSeconds(30));
         when(consumerImpl.getClientManager()).thenReturn(clientManager);
-        when(consumerImpl.getEndpoints()).thenReturn(endpoints);
+        lenient().when(consumerImpl.getEndpoints()).thenReturn(endpoints);
+        
when(consumerImpl.getTotalRouteEndpoints()).thenReturn(Collections.singleton(endpoints));
 
         // Mock successful response
         SyncLiteSubscriptionResponse successResponse = 
SyncLiteSubscriptionResponse.newBuilder()

Reply via email to