This is an automated email from the ASF dual-hosted git repository.
popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git
The following commit(s) were added to refs/heads/main by this push:
new 6d1e6f63 Correct the matching key for the internal cache of ordered
shared subscription. (#143)
6d1e6f63 is described below
commit 6d1e6f6361b5238dfdcc873431f4bc59db4f0471
Author: Gu Jiawei <[email protected]>
AuthorDate: Mon Jun 16 09:40:25 2025 +0800
Correct the matching key for the internal cache of ordered shared
subscription. (#143)
---
.../bifromq/dist/worker/DeliverExecutorGroup.java | 9 +++---
.../apache/bifromq/dist/worker/DistQoS0Test.java | 34 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 5 deletions(-)
diff --git
a/bifromq-dist/bifromq-dist-worker/src/main/java/org/apache/bifromq/dist/worker/DeliverExecutorGroup.java
b/bifromq-dist/bifromq-dist-worker/src/main/java/org/apache/bifromq/dist/worker/DeliverExecutorGroup.java
index b0eb0341..e6fe2173 100644
---
a/bifromq-dist/bifromq-dist-worker/src/main/java/org/apache/bifromq/dist/worker/DeliverExecutorGroup.java
+++
b/bifromq-dist/bifromq-dist-worker/src/main/java/org/apache/bifromq/dist/worker/DeliverExecutorGroup.java
@@ -46,7 +46,6 @@ import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.base.Charsets;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
@@ -55,7 +54,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
class DeliverExecutorGroup implements IDeliverExecutorGroup {
- // OuterCacheKey: OrderedSharedMatchingKey(<tenantId>,
<escapedTopicFilter>)
+ // OuterCacheKey: OrderedSharedMatchingKey(<tenantId>, <mqttTopicFilter>)
// InnerCacheKey: ClientInfo(<tenantId>, <type>, <metadata>)
private final LoadingCache<OrderedSharedMatchingKey, Cache<ClientInfo,
NormalMatching>> orderedSharedMatching;
private final int inlineFanOutThreshold =
DistInlineFanOutThreshold.INSTANCE.get();
@@ -161,7 +160,7 @@ class DeliverExecutorGroup implements IDeliverExecutorGroup
{
log.debug("Refresh ordered shared sub routes: tenantId={},
sharedTopicFilter={}", tenantId, routeMatcher);
}
orderedSharedMatching.invalidate(
- new OrderedSharedMatchingKey(tenantId,
routeMatcher.getFilterLevelList()));
+ new OrderedSharedMatchingKey(tenantId,
routeMatcher.getMqttTopicFilter()));
}
private void prepareSend(Matching matching, TopicMessagePackHolder
msgPackHolder, boolean inline) {
@@ -180,7 +179,7 @@ class DeliverExecutorGroup implements IDeliverExecutorGroup
{
ClientInfo sender = publisherPack.getPublisher();
NormalMatching matchedInbox = orderedSharedMatching
.get(new
OrderedSharedMatchingKey(groupMatching.tenantId(),
- groupMatching.matcher.getFilterLevelList()))
+ groupMatching.matcher.getMqttTopicFilter()))
.get(sender, senderInfo -> {
RendezvousHash<ClientInfo, NormalMatching>
hash =
RendezvousHash.<ClientInfo,
NormalMatching>builder()
@@ -217,6 +216,6 @@ class DeliverExecutorGroup implements IDeliverExecutorGroup
{
fanoutExecutors[idx].submit(route, msgPackHolder, inline);
}
- private record OrderedSharedMatchingKey(String tenantId, List<String>
filterLevels) {
+ private record OrderedSharedMatchingKey(String tenantId, String
mqttTopicFilter) {
}
}
diff --git
a/bifromq-dist/bifromq-dist-worker/src/test/java/org/apache/bifromq/dist/worker/DistQoS0Test.java
b/bifromq-dist/bifromq-dist-worker/src/test/java/org/apache/bifromq/dist/worker/DistQoS0Test.java
index 74cc4e9d..4740133c 100644
---
a/bifromq-dist/bifromq-dist-worker/src/test/java/org/apache/bifromq/dist/worker/DistQoS0Test.java
+++
b/bifromq-dist/bifromq-dist-worker/src/test/java/org/apache/bifromq/dist/worker/DistQoS0Test.java
@@ -524,4 +524,38 @@ public class DistQoS0Test extends DistWorkerTest {
assertEquals(reply.getResultMap().get(tenantA).getFanoutMap().getOrDefault("TopicB",
0).intValue(), 0);
unmatch(tenantA, "TopicA/#", InboxService, "inbox1", "batch1");
}
+
+ @Test(groups = "integration")
+ public void testOrderedShareWithGroups() {
+ when(mqttBroker.open("batch1")).thenReturn(writer1);
+
+ match(tenantA, "$oshare/group1/#", MqttBroker, "inbox1", "batch1");
+ match(tenantA, "$oshare/group2/#", MqttBroker, "inbox1", "batch1");
+ Set<String> set = Set.of("$oshare/group1/#", "$oshare/group2/#");
+
+ await().until(() -> {
+ clearInvocations(writer1);
+ Set<String> topicFilterSet = new HashSet<>();
+ dist(tenantA, AT_MOST_ONCE, "/a/b/c", copyFromUtf8("Hello"),
"orderKey1");
+ try {
+ ArgumentCaptor<DeliveryRequest> captor =
ArgumentCaptor.forClass(DeliveryRequest.class);
+ verify(writer1,
timeout(1000).times(2)).deliver(captor.capture());
+ captor.getAllValues().forEach(req -> {
+ DeliveryPackage packs = req.getPackageMap().get(tenantA);
+ for (DeliveryPack pack : packs.getPackList()) {
+ for (MatchInfo matchInfo : pack.getMatchInfoList()) {
+
topicFilterSet.add(matchInfo.getMatcher().getMqttTopicFilter());
+ }
+ }
+ });
+ assertEquals(set, topicFilterSet);
+ return true;
+ } catch (Throwable e) {
+ return false;
+ }
+ });
+
+ unmatch(tenantA, "$oshare/group1/#", MqttBroker, "inbox1", "batch1");
+ unmatch(tenantA, "$oshare/group2/#", MqttBroker, "inbox1", "batch1");
+ }
}
\ No newline at end of file