This is an automated email from the ASF dual-hosted git repository.

popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d1e6f63 Correct the matching key for the internal cache of ordered 
shared subscription. (#143)
6d1e6f63 is described below

commit 6d1e6f6361b5238dfdcc873431f4bc59db4f0471
Author: Gu Jiawei <[email protected]>
AuthorDate: Mon Jun 16 09:40:25 2025 +0800

    Correct the matching key for the internal cache of ordered shared 
subscription. (#143)
---
 .../bifromq/dist/worker/DeliverExecutorGroup.java  |  9 +++---
 .../apache/bifromq/dist/worker/DistQoS0Test.java   | 34 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 5 deletions(-)

diff --git 
a/bifromq-dist/bifromq-dist-worker/src/main/java/org/apache/bifromq/dist/worker/DeliverExecutorGroup.java
 
b/bifromq-dist/bifromq-dist-worker/src/main/java/org/apache/bifromq/dist/worker/DeliverExecutorGroup.java
index b0eb0341..e6fe2173 100644
--- 
a/bifromq-dist/bifromq-dist-worker/src/main/java/org/apache/bifromq/dist/worker/DeliverExecutorGroup.java
+++ 
b/bifromq-dist/bifromq-dist-worker/src/main/java/org/apache/bifromq/dist/worker/DeliverExecutorGroup.java
@@ -46,7 +46,6 @@ import com.github.benmanes.caffeine.cache.RemovalListener;
 import com.github.benmanes.caffeine.cache.Scheduler;
 import com.google.common.base.Charsets;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
@@ -55,7 +54,7 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 class DeliverExecutorGroup implements IDeliverExecutorGroup {
-    // OuterCacheKey: OrderedSharedMatchingKey(<tenantId>, 
<escapedTopicFilter>)
+    // OuterCacheKey: OrderedSharedMatchingKey(<tenantId>, <mqttTopicFilter>)
     // InnerCacheKey: ClientInfo(<tenantId>, <type>, <metadata>)
     private final LoadingCache<OrderedSharedMatchingKey, Cache<ClientInfo, 
NormalMatching>> orderedSharedMatching;
     private final int inlineFanOutThreshold = 
DistInlineFanOutThreshold.INSTANCE.get();
@@ -161,7 +160,7 @@ class DeliverExecutorGroup implements IDeliverExecutorGroup 
{
             log.debug("Refresh ordered shared sub routes: tenantId={}, 
sharedTopicFilter={}", tenantId, routeMatcher);
         }
         orderedSharedMatching.invalidate(
-            new OrderedSharedMatchingKey(tenantId, 
routeMatcher.getFilterLevelList()));
+            new OrderedSharedMatchingKey(tenantId, 
routeMatcher.getMqttTopicFilter()));
     }
 
     private void prepareSend(Matching matching, TopicMessagePackHolder 
msgPackHolder, boolean inline) {
@@ -180,7 +179,7 @@ class DeliverExecutorGroup implements IDeliverExecutorGroup 
{
                         ClientInfo sender = publisherPack.getPublisher();
                         NormalMatching matchedInbox = orderedSharedMatching
                             .get(new 
OrderedSharedMatchingKey(groupMatching.tenantId(),
-                                groupMatching.matcher.getFilterLevelList()))
+                                groupMatching.matcher.getMqttTopicFilter()))
                             .get(sender, senderInfo -> {
                                 RendezvousHash<ClientInfo, NormalMatching> 
hash =
                                     RendezvousHash.<ClientInfo, 
NormalMatching>builder()
@@ -217,6 +216,6 @@ class DeliverExecutorGroup implements IDeliverExecutorGroup 
{
         fanoutExecutors[idx].submit(route, msgPackHolder, inline);
     }
 
-    private record OrderedSharedMatchingKey(String tenantId, List<String> 
filterLevels) {
+    private record OrderedSharedMatchingKey(String tenantId, String 
mqttTopicFilter) {
     }
 }
diff --git 
a/bifromq-dist/bifromq-dist-worker/src/test/java/org/apache/bifromq/dist/worker/DistQoS0Test.java
 
b/bifromq-dist/bifromq-dist-worker/src/test/java/org/apache/bifromq/dist/worker/DistQoS0Test.java
index 74cc4e9d..4740133c 100644
--- 
a/bifromq-dist/bifromq-dist-worker/src/test/java/org/apache/bifromq/dist/worker/DistQoS0Test.java
+++ 
b/bifromq-dist/bifromq-dist-worker/src/test/java/org/apache/bifromq/dist/worker/DistQoS0Test.java
@@ -524,4 +524,38 @@ public class DistQoS0Test extends DistWorkerTest {
         
assertEquals(reply.getResultMap().get(tenantA).getFanoutMap().getOrDefault("TopicB",
 0).intValue(), 0);
         unmatch(tenantA, "TopicA/#", InboxService, "inbox1", "batch1");
     }
+
+    @Test(groups = "integration")
+    public void testOrderedShareWithGroups() {
+        when(mqttBroker.open("batch1")).thenReturn(writer1);
+
+        match(tenantA, "$oshare/group1/#", MqttBroker, "inbox1", "batch1");
+        match(tenantA, "$oshare/group2/#", MqttBroker, "inbox1", "batch1");
+        Set<String> set = Set.of("$oshare/group1/#", "$oshare/group2/#");
+
+        await().until(() -> {
+            clearInvocations(writer1);
+            Set<String> topicFilterSet = new HashSet<>();
+            dist(tenantA, AT_MOST_ONCE, "/a/b/c", copyFromUtf8("Hello"), 
"orderKey1");
+            try {
+                ArgumentCaptor<DeliveryRequest> captor = 
ArgumentCaptor.forClass(DeliveryRequest.class);
+                verify(writer1, 
timeout(1000).times(2)).deliver(captor.capture());
+                captor.getAllValues().forEach(req -> {
+                    DeliveryPackage packs = req.getPackageMap().get(tenantA);
+                    for (DeliveryPack pack : packs.getPackList()) {
+                        for (MatchInfo matchInfo : pack.getMatchInfoList()) {
+                            
topicFilterSet.add(matchInfo.getMatcher().getMqttTopicFilter());
+                        }
+                    }
+                });
+                assertEquals(set, topicFilterSet);
+                return true;
+            } catch (Throwable e) {
+                return false;
+            }
+        });
+
+        unmatch(tenantA, "$oshare/group1/#", MqttBroker, "inbox1", "batch1");
+        unmatch(tenantA, "$oshare/group2/#", MqttBroker, "inbox1", "batch1");
+    }
 }
\ No newline at end of file

Reply via email to