This is an automated email from the ASF dual-hosted git repository.
popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git
The following commit(s) were added to refs/heads/main by this push:
new 7c34642c3 fix localdistservice missing result local route (#227)
7c34642c3 is described below
commit 7c34642c3d616cc8cbba7d34d97aaaf12065180e
Author: liaodongnian <[email protected]>
AuthorDate: Mon Feb 9 09:03:20 2026 +0800
fix localdistservice missing result local route (#227)
---
.../bifromq/mqtt/service/LocalDistService.java | 27 +++--
.../bifromq/mqtt/service/LocalDistServiceTest.java | 114 ++++++++++++++++++++-
2 files changed, 129 insertions(+), 12 deletions(-)
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java
index 7f4de0a47..6422b802c 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java
@@ -105,6 +105,7 @@ public class LocalDistService implements ILocalDistService {
Set<MatchInfo> ok = new HashSet<>();
Set<MatchInfo> skip = new HashSet<>();
Set<MatchInfo> noSub = new HashSet<>();
+ Set<MatchInfo> noReceiver = new HashSet<>();
long totalFanOutBytes = 0L;
for (DeliveryPack writePack :
packageEntry.getValue().getPackList()) {
TopicMessagePack topicMsgPack = writePack.getMessagePack();
@@ -124,14 +125,13 @@ public class LocalDistService implements
ILocalDistService {
matchInfo);
}
} else {
- // no session found for shared subscription
- noSub.add(matchInfo);
+ noReceiver.add(matchInfo);
}
} else {
Optional<CompletableFuture<? extends
ILocalTopicRouter.ILocalRoutes>> routesFutureOpt =
localTopicRouter.getTopicRoutes(tenantId,
matchInfo);
if (routesFutureOpt.isEmpty()) {
- noSub.add(matchInfo);
+ noReceiver.add(matchInfo);
continue;
}
CompletableFuture<? extends
ILocalTopicRouter.ILocalRoutes> routesFuture =
@@ -143,15 +143,21 @@ public class LocalDistService implements
ILocalDistService {
}
ILocalTopicRouter.ILocalRoutes localRoutes =
routesFuture.join();
if
(!localRoutes.localReceiverId().equals(matchInfo.getReceiverId())) {
- noSub.add(matchInfo);
+ noReceiver.add(matchInfo);
continue;
}
+ if (localRoutes.routesInfo().isEmpty()) {
+ noReceiver.add(matchInfo);
+ continue;
+ }
+ boolean hasUsableSession = false;
for (Map.Entry<String, Long> route :
localRoutes.routesInfo().entrySet()) {
String sessionId = route.getKey();
long incarnation = route.getValue();
// at least one session should publish the message
IMQTTSession session =
sessionRegistry.get(sessionId);
if (session instanceof IMQTTTransientSession) {
+ hasUsableSession = true;
if (isFanOutThrottled &&
!matchedSessions.isEmpty()) {
skip.add(matchInfo);
} else {
@@ -161,6 +167,9 @@ public class LocalDistService implements ILocalDistService {
}
}
}
+ if (!hasUsableSession) {
+ noReceiver.add(matchInfo);
+ }
}
}
long msgPackSize = SizeUtil.estSizeOf(topicMsgPack);
@@ -185,10 +194,13 @@ public class LocalDistService implements
ILocalDistService {
tenantMeter.recordSummary(MqttTransientFanOutBytes,
totalFanOutBytes);
// don't include duplicated matchInfo in the result
// treat skip as ok
- Sets.difference(Sets.union(ok, skip), noSub).forEach(matchInfo ->
resultsBuilder.addResult(
-
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.OK).build()));
+ Sets.difference(Sets.union(ok, skip), Sets.union(noSub,
noReceiver)).forEach(matchInfo ->
+ resultsBuilder.addResult(
+
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.OK).build()));
noSub.forEach(matchInfo -> resultsBuilder.addResult(
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.NO_SUB).build()));
+ noReceiver.forEach(matchInfo -> resultsBuilder.addResult(
+
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.NO_RECEIVER).build()));
replyBuilder.putResult(tenantId, resultsBuilder.build());
}
return CompletableFuture.completedFuture(replyBuilder.build());
@@ -205,8 +217,7 @@ public class LocalDistService implements ILocalDistService {
return
transientSession.hasSubscribed(matchInfo.getMatcher().getMqttTopicFilter())
? CheckReply.Code.OK : CheckReply.Code.NO_SUB;
} else {
- // should not be here
- return CheckReply.Code.ERROR;
+ return CheckReply.Code.NO_RECEIVER;
}
} else {
Optional<CompletableFuture<? extends
ILocalTopicRouter.ILocalRoutes>> routesFutureOpt =
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java
index e1f358809..60ea83091 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java
@@ -38,6 +38,7 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import org.apache.bifromq.mqtt.MockableTest;
+import org.apache.bifromq.mqtt.session.IMQTTSession;
import org.apache.bifromq.mqtt.session.IMQTTTransientSession;
import org.apache.bifromq.plugin.subbroker.CheckReply;
import org.apache.bifromq.plugin.subbroker.DeliveryPack;
@@ -158,6 +159,7 @@ public class LocalDistServiceTest extends MockableTest {
assertEquals(code, CheckReply.Code.NO_RECEIVER);
when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L));
+ when(localSessionRegistry.get(channelId)).thenReturn(session);
code = localDistService.checkMatchInfo(tenantId, MatchInfo.newBuilder()
.setMatcher(TopicUtil.from(topicFilter))
.setReceiverId(ILocalDistService.localize(channelId))
@@ -364,7 +366,7 @@ public class LocalDistServiceTest extends MockableTest {
DeliveryResults results = reply.getResultMap().get(tenantId);
DeliveryResult result = results.getResult(0);
- assertEquals(DeliveryResult.Code.NO_SUB, result.getCode());
+ assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
}
@Test
@@ -398,7 +400,111 @@ public class LocalDistServiceTest extends MockableTest {
DeliveryResults results = reply.getResultMap().get(tenantId);
DeliveryResult result = results.getResult(0);
- assertEquals(DeliveryResult.Code.NO_SUB, result.getCode());
+ assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
+ }
+
+ @Test
+ public void deliverToEmptyLocalRoutes() {
+ String tenantId = "tenant1";
+ String topic = "testTopic";
+ String topicFilter = "testTopic/#";
+ String channelId = "channel0";
+ MatchInfo matchInfo = MatchInfo.newBuilder()
+ .setMatcher(TopicUtil.from(topicFilter))
+ .setReceiverId("receiverId")
+ .build();
+ TopicMessagePack topicMessagePack =
TopicMessagePack.newBuilder().setTopic(topic).build();
+ DeliveryPackage deliveryPack = DeliveryPackage.newBuilder()
+
.addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build())
+ .build();
+ DeliveryRequest request =
DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build();
+
+ ILocalTopicRouter.ILocalRoutes localRoutes =
mock(ILocalTopicRouter.ILocalRoutes.class);
+ when(localRoutes.localReceiverId()).thenReturn("receiverId");
+ when(localRoutes.routesInfo()).thenReturn(Map.of());
+ when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn(
+ Optional.of(CompletableFuture.completedFuture(localRoutes)));
+
+ LocalDistService localDistService =
+ new LocalDistService(serverId, localSessionRegistry,
localTopicRouter, distClient, resourceThrottler);
+
+ CompletableFuture<DeliveryReply> future =
localDistService.dist(request);
+ DeliveryReply reply = future.join();
+
+ DeliveryResults results = reply.getResultMap().get(tenantId);
+ DeliveryResult result = results.getResult(0);
+ assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
+ }
+
+ @Test
+ public void deliverToNoLocalSession() {
+ String tenantId = "tenant1";
+ String topic = "testTopic";
+ String topicFilter = "testTopic/#";
+ String channelId = "channel0";
+ MatchInfo matchInfo = MatchInfo.newBuilder()
+ .setMatcher(TopicUtil.from(topicFilter))
+ .setReceiverId("receiverId")
+ .build();
+ TopicMessagePack topicMessagePack =
TopicMessagePack.newBuilder().setTopic(topic).build();
+ DeliveryPackage deliveryPack = DeliveryPackage.newBuilder()
+
.addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build())
+ .build();
+ DeliveryRequest request =
DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build();
+
+ ILocalTopicRouter.ILocalRoutes localRoutes =
mock(ILocalTopicRouter.ILocalRoutes.class);
+ when(localRoutes.localReceiverId()).thenReturn("receiverId");
+ when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L));
+ when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn(
+ Optional.of(CompletableFuture.completedFuture(localRoutes)));
+
+ when(localSessionRegistry.get(channelId)).thenReturn(null);
+
+ LocalDistService localDistService =
+ new LocalDistService(serverId, localSessionRegistry,
localTopicRouter, distClient, resourceThrottler);
+
+ CompletableFuture<DeliveryReply> future =
localDistService.dist(request);
+ DeliveryReply reply = future.join();
+
+ DeliveryResults results = reply.getResultMap().get(tenantId);
+ DeliveryResult result = results.getResult(0);
+ assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
+ }
+
+ @Test
+ public void deliverToNonTransientSession() {
+ String tenantId = "tenant1";
+ String topic = "testTopic";
+ String topicFilter = "testTopic/#";
+ String channelId = "channel0";
+ MatchInfo matchInfo = MatchInfo.newBuilder()
+ .setMatcher(TopicUtil.from(topicFilter))
+ .setReceiverId("receiverId")
+ .build();
+ TopicMessagePack topicMessagePack =
TopicMessagePack.newBuilder().setTopic(topic).build();
+ DeliveryPackage deliveryPack = DeliveryPackage.newBuilder()
+
.addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build())
+ .build();
+ DeliveryRequest request =
DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build();
+
+ ILocalTopicRouter.ILocalRoutes localRoutes =
mock(ILocalTopicRouter.ILocalRoutes.class);
+ when(localRoutes.localReceiverId()).thenReturn("receiverId");
+ when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L));
+ when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn(
+ Optional.of(CompletableFuture.completedFuture(localRoutes)));
+
+ IMQTTSession nonTransientSession = mock(IMQTTSession.class);
+
when(localSessionRegistry.get(channelId)).thenReturn(nonTransientSession);
+
+ LocalDistService localDistService =
+ new LocalDistService(serverId, localSessionRegistry,
localTopicRouter, distClient, resourceThrottler);
+
+ CompletableFuture<DeliveryReply> future =
localDistService.dist(request);
+ DeliveryReply reply = future.join();
+
+ DeliveryResults results = reply.getResultMap().get(tenantId);
+ DeliveryResult result = results.getResult(0);
+ assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
}
@Test
@@ -432,7 +538,7 @@ public class LocalDistServiceTest extends MockableTest {
DeliveryResults results = reply.getResultMap().get(tenantId);
DeliveryResult result = results.getResult(0);
- assertEquals(DeliveryResult.Code.OK, result.getCode());
+ assertEquals(result.getCode(), DeliveryResult.Code.OK);
}
@Test
@@ -467,7 +573,7 @@ public class LocalDistServiceTest extends MockableTest {
DeliveryResults results = reply.getResultMap().get(tenantId);
DeliveryResult result = results.getResult(0);
- assertEquals(DeliveryResult.Code.OK, result.getCode());
+ assertEquals(result.getCode(), DeliveryResult.Code.OK);
}
@Test