This is an automated email from the ASF dual-hosted git repository.

popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c34642c3 fix localdistservice missing result local route (#227)
7c34642c3 is described below

commit 7c34642c3d616cc8cbba7d34d97aaaf12065180e
Author: liaodongnian <[email protected]>
AuthorDate: Mon Feb 9 09:03:20 2026 +0800

    fix localdistservice missing result local route (#227)
---
 .../bifromq/mqtt/service/LocalDistService.java     |  27 +++--
 .../bifromq/mqtt/service/LocalDistServiceTest.java | 114 ++++++++++++++++++++-
 2 files changed, 129 insertions(+), 12 deletions(-)

diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java
index 7f4de0a47..6422b802c 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java
@@ -105,6 +105,7 @@ public class LocalDistService implements ILocalDistService {
             Set<MatchInfo> ok = new HashSet<>();
             Set<MatchInfo> skip = new HashSet<>();
             Set<MatchInfo> noSub = new HashSet<>();
+            Set<MatchInfo> noReceiver = new HashSet<>();
             long totalFanOutBytes = 0L;
             for (DeliveryPack writePack : 
packageEntry.getValue().getPackList()) {
                 TopicMessagePack topicMsgPack = writePack.getMessagePack();
@@ -124,14 +125,13 @@ public class LocalDistService implements 
ILocalDistService {
                                         matchInfo);
                             }
                         } else {
-                            // no session found for shared subscription
-                            noSub.add(matchInfo);
+                            noReceiver.add(matchInfo);
                         }
                     } else {
                         Optional<CompletableFuture<? extends 
ILocalTopicRouter.ILocalRoutes>> routesFutureOpt =
                             localTopicRouter.getTopicRoutes(tenantId, 
matchInfo);
                         if (routesFutureOpt.isEmpty()) {
-                            noSub.add(matchInfo);
+                            noReceiver.add(matchInfo);
                             continue;
                         }
                         CompletableFuture<? extends 
ILocalTopicRouter.ILocalRoutes> routesFuture =
@@ -143,15 +143,21 @@ public class LocalDistService implements 
ILocalDistService {
                         }
                         ILocalTopicRouter.ILocalRoutes localRoutes = 
routesFuture.join();
                         if 
(!localRoutes.localReceiverId().equals(matchInfo.getReceiverId())) {
-                            noSub.add(matchInfo);
+                            noReceiver.add(matchInfo);
                             continue;
                         }
+                        if (localRoutes.routesInfo().isEmpty()) {
+                            noReceiver.add(matchInfo);
+                            continue;
+                        }
+                        boolean hasUsableSession = false;
                         for (Map.Entry<String, Long> route : 
localRoutes.routesInfo().entrySet()) {
                             String sessionId = route.getKey();
                             long incarnation = route.getValue();
                             // at least one session should publish the message
                             IMQTTSession session = 
sessionRegistry.get(sessionId);
                             if (session instanceof IMQTTTransientSession) {
+                                hasUsableSession = true;
                                 if (isFanOutThrottled && 
!matchedSessions.isEmpty()) {
                                     skip.add(matchInfo);
                                 } else {
@@ -161,6 +167,9 @@ public class LocalDistService implements ILocalDistService {
                                 }
                             }
                         }
+                        if (!hasUsableSession) {
+                            noReceiver.add(matchInfo);
+                        }
                     }
                 }
                 long msgPackSize = SizeUtil.estSizeOf(topicMsgPack);
@@ -185,10 +194,13 @@ public class LocalDistService implements 
ILocalDistService {
             tenantMeter.recordSummary(MqttTransientFanOutBytes, 
totalFanOutBytes);
             // don't include duplicated matchInfo in the result
             // treat skip as ok
-            Sets.difference(Sets.union(ok, skip), noSub).forEach(matchInfo -> 
resultsBuilder.addResult(
-                
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.OK).build()));
+            Sets.difference(Sets.union(ok, skip), Sets.union(noSub, 
noReceiver)).forEach(matchInfo ->
+                resultsBuilder.addResult(
+                    
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.OK).build()));
             noSub.forEach(matchInfo -> resultsBuilder.addResult(
                 
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.NO_SUB).build()));
+            noReceiver.forEach(matchInfo -> resultsBuilder.addResult(
+                
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.NO_RECEIVER).build()));
             replyBuilder.putResult(tenantId, resultsBuilder.build());
         }
         return CompletableFuture.completedFuture(replyBuilder.build());
@@ -205,8 +217,7 @@ public class LocalDistService implements ILocalDistService {
                 return 
transientSession.hasSubscribed(matchInfo.getMatcher().getMqttTopicFilter())
                     ? CheckReply.Code.OK : CheckReply.Code.NO_SUB;
             } else {
-                // should not be here
-                return CheckReply.Code.ERROR;
+                return CheckReply.Code.NO_RECEIVER;
             }
         } else {
             Optional<CompletableFuture<? extends 
ILocalTopicRouter.ILocalRoutes>> routesFutureOpt =
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java
index e1f358809..60ea83091 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java
@@ -38,6 +38,7 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
 import org.apache.bifromq.mqtt.MockableTest;
+import org.apache.bifromq.mqtt.session.IMQTTSession;
 import org.apache.bifromq.mqtt.session.IMQTTTransientSession;
 import org.apache.bifromq.plugin.subbroker.CheckReply;
 import org.apache.bifromq.plugin.subbroker.DeliveryPack;
@@ -158,6 +159,7 @@ public class LocalDistServiceTest extends MockableTest {
         assertEquals(code, CheckReply.Code.NO_RECEIVER);
 
         when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L));
+        when(localSessionRegistry.get(channelId)).thenReturn(session);
         code = localDistService.checkMatchInfo(tenantId, MatchInfo.newBuilder()
             .setMatcher(TopicUtil.from(topicFilter))
             .setReceiverId(ILocalDistService.localize(channelId))
@@ -364,7 +366,7 @@ public class LocalDistServiceTest extends MockableTest {
 
         DeliveryResults results = reply.getResultMap().get(tenantId);
         DeliveryResult result = results.getResult(0);
-        assertEquals(DeliveryResult.Code.NO_SUB, result.getCode());
+        assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
     }
 
     @Test
@@ -398,7 +400,111 @@ public class LocalDistServiceTest extends MockableTest {
 
         DeliveryResults results = reply.getResultMap().get(tenantId);
         DeliveryResult result = results.getResult(0);
-        assertEquals(DeliveryResult.Code.NO_SUB, result.getCode());
+        assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
+    }
+
+    @Test
+    public void deliverToEmptyLocalRoutes() {
+        String tenantId = "tenant1";
+        String topic = "testTopic";
+        String topicFilter = "testTopic/#";
+        String channelId = "channel0";
+        MatchInfo matchInfo = MatchInfo.newBuilder()
+            .setMatcher(TopicUtil.from(topicFilter))
+            .setReceiverId("receiverId")
+            .build();
+        TopicMessagePack topicMessagePack = 
TopicMessagePack.newBuilder().setTopic(topic).build();
+        DeliveryPackage deliveryPack = DeliveryPackage.newBuilder()
+            
.addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build())
+            .build();
+        DeliveryRequest request = 
DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build();
+
+        ILocalTopicRouter.ILocalRoutes localRoutes = 
mock(ILocalTopicRouter.ILocalRoutes.class);
+        when(localRoutes.localReceiverId()).thenReturn("receiverId");
+        when(localRoutes.routesInfo()).thenReturn(Map.of());
+        when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn(
+            Optional.of(CompletableFuture.completedFuture(localRoutes)));
+
+        LocalDistService localDistService =
+            new LocalDistService(serverId, localSessionRegistry, 
localTopicRouter, distClient, resourceThrottler);
+
+        CompletableFuture<DeliveryReply> future = 
localDistService.dist(request);
+        DeliveryReply reply = future.join();
+
+        DeliveryResults results = reply.getResultMap().get(tenantId);
+        DeliveryResult result = results.getResult(0);
+        assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
+    }
+
+    @Test
+    public void deliverToNoLocalSession() {
+        String tenantId = "tenant1";
+        String topic = "testTopic";
+        String topicFilter = "testTopic/#";
+        String channelId = "channel0";
+        MatchInfo matchInfo = MatchInfo.newBuilder()
+            .setMatcher(TopicUtil.from(topicFilter))
+            .setReceiverId("receiverId")
+            .build();
+        TopicMessagePack topicMessagePack = 
TopicMessagePack.newBuilder().setTopic(topic).build();
+        DeliveryPackage deliveryPack = DeliveryPackage.newBuilder()
+            
.addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build())
+            .build();
+        DeliveryRequest request = 
DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build();
+
+        ILocalTopicRouter.ILocalRoutes localRoutes = 
mock(ILocalTopicRouter.ILocalRoutes.class);
+        when(localRoutes.localReceiverId()).thenReturn("receiverId");
+        when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L));
+        when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn(
+            Optional.of(CompletableFuture.completedFuture(localRoutes)));
+
+        when(localSessionRegistry.get(channelId)).thenReturn(null);
+
+        LocalDistService localDistService =
+            new LocalDistService(serverId, localSessionRegistry, 
localTopicRouter, distClient, resourceThrottler);
+
+        CompletableFuture<DeliveryReply> future = 
localDistService.dist(request);
+        DeliveryReply reply = future.join();
+
+        DeliveryResults results = reply.getResultMap().get(tenantId);
+        DeliveryResult result = results.getResult(0);
+        assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
+    }
+
+    @Test
+    public void deliverToNonTransientSession() {
+        String tenantId = "tenant1";
+        String topic = "testTopic";
+        String topicFilter = "testTopic/#";
+        String channelId = "channel0";
+        MatchInfo matchInfo = MatchInfo.newBuilder()
+            .setMatcher(TopicUtil.from(topicFilter))
+            .setReceiverId("receiverId")
+            .build();
+        TopicMessagePack topicMessagePack = 
TopicMessagePack.newBuilder().setTopic(topic).build();
+        DeliveryPackage deliveryPack = DeliveryPackage.newBuilder()
+            
.addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build())
+            .build();
+        DeliveryRequest request = 
DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build();
+
+        ILocalTopicRouter.ILocalRoutes localRoutes = 
mock(ILocalTopicRouter.ILocalRoutes.class);
+        when(localRoutes.localReceiverId()).thenReturn("receiverId");
+        when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L));
+        when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn(
+            Optional.of(CompletableFuture.completedFuture(localRoutes)));
+
+        IMQTTSession nonTransientSession = mock(IMQTTSession.class);
+        
when(localSessionRegistry.get(channelId)).thenReturn(nonTransientSession);
+
+        LocalDistService localDistService =
+            new LocalDistService(serverId, localSessionRegistry, 
localTopicRouter, distClient, resourceThrottler);
+
+        CompletableFuture<DeliveryReply> future = 
localDistService.dist(request);
+        DeliveryReply reply = future.join();
+
+        DeliveryResults results = reply.getResultMap().get(tenantId);
+        DeliveryResult result = results.getResult(0);
+        assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
     }
 
     @Test
@@ -432,7 +538,7 @@ public class LocalDistServiceTest extends MockableTest {
 
         DeliveryResults results = reply.getResultMap().get(tenantId);
         DeliveryResult result = results.getResult(0);
-        assertEquals(DeliveryResult.Code.OK, result.getCode());
+        assertEquals(result.getCode(), DeliveryResult.Code.OK);
     }
 
     @Test
@@ -467,7 +573,7 @@ public class LocalDistServiceTest extends MockableTest {
 
         DeliveryResults results = reply.getResultMap().get(tenantId);
         DeliveryResult result = results.getResult(0);
-        assertEquals(DeliveryResult.Code.OK, result.getCode());
+        assertEquals(result.getCode(), DeliveryResult.Code.OK);
     }
 
     @Test

Reply via email to