This is an automated email from the ASF dual-hosted git repository.

popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git


The following commit(s) were added to refs/heads/main by this push:
     new b7f17af8 Fixed the issue that cleanStart failed to cleanup previous 
persistent session when SEI > 0. (#145)
b7f17af8 is described below

commit b7f17af817eb09c0f6b32236245d28437221d1f8
Author: zhangShunLin <[email protected]>
AuthorDate: Mon Jun 16 13:30:03 2025 +0800

    Fixed the issue that cleanStart failed to cleanup previous persistent 
session when SEI > 0. (#145)
    
    Co-authored-by: Yonny(Yu) Hao <[email protected]>
---
 .../inbox/server/scheduler/BatchDetachCall.java    | 16 ++++++----
 .../bifromq/mqtt/handler/MQTTConnectHandler.java   |  1 +
 .../mqtt/integration/v5/MQTTConnectTest.java       | 37 ++++++++++++++++++++++
 3 files changed, 47 insertions(+), 7 deletions(-)

diff --git 
a/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/scheduler/BatchDetachCall.java
 
b/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/scheduler/BatchDetachCall.java
index b49da44b..aeeeb865 100644
--- 
a/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/scheduler/BatchDetachCall.java
+++ 
b/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/scheduler/BatchDetachCall.java
@@ -21,6 +21,12 @@ package org.apache.bifromq.inbox.server.scheduler;
 
 import static java.util.Collections.emptySet;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bifromq.basekv.client.IMutationPipeline;
 import org.apache.bifromq.basekv.client.exception.BadVersionException;
 import org.apache.bifromq.basekv.client.exception.TryLaterException;
@@ -37,12 +43,6 @@ import 
org.apache.bifromq.inbox.storage.proto.BatchDetachRequest;
 import org.apache.bifromq.inbox.storage.proto.InboxServiceRWCoProcInput;
 import org.apache.bifromq.inbox.storage.proto.InboxVersion;
 import org.apache.bifromq.inbox.storage.proto.Replica;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 class BatchDetachCall extends BatchMutationCall<DetachRequest, DetachReply> {
@@ -69,10 +69,12 @@ class BatchDetachCall extends 
BatchMutationCall<DetachRequest, DetachReply> {
             BatchDetachRequest.Params.Builder paramsBuilder = 
BatchDetachRequest.Params.newBuilder()
                 .setTenantId(request.getClient().getTenantId())
                 .setInboxId(request.getInboxId())
-                .setVersion(request.getVersion())
                 .setExpirySeconds(request.getExpirySeconds())
                 .setDiscardLWT(request.getDiscardLWT())
                 .setNow(request.getNow());
+            if (request.hasVersion()) {
+                paramsBuilder.setVersion(request.getVersion());
+            }
             reqBuilder.addParams(paramsBuilder.build());
         }
 
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
index 7fba9567..49aba521 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
@@ -387,6 +387,7 @@ public abstract class MQTTConnectHandler extends 
ChannelDuplexHandler {
         if (requestClientId.isEmpty()) {
             return CompletableFuture.completedFuture(ExpireResult.NOT_FOUND);
         }
+        // detach and expire the latest version immediately
         return inboxClient.detach(DetachRequest.newBuilder()
                 .setReqId(reqId)
                 .setInboxId(userSessionId)
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/integration/v5/MQTTConnectTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/integration/v5/MQTTConnectTest.java
index dcabcdec..6169b19c 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/integration/v5/MQTTConnectTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/integration/v5/MQTTConnectTest.java
@@ -24,6 +24,8 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
 import org.apache.bifromq.mqtt.integration.MQTTTest;
 import org.apache.bifromq.mqtt.integration.v5.client.MqttTestClient;
@@ -134,4 +136,39 @@ public class MQTTConnectTest extends MQTTTest {
         IMqttToken token = client.connect(connOpts);
         assertEquals(token.getResponseProperties().getSessionExpiryInterval(), 
0L);
     }
+
+    /**
+     *  Test whether it can reconnect when cleanStart=true but the session 
expiration interval is not 0, when reconnecting.
+     */
+    @Test(groups = "integration")
+    public void reconnectTest() {
+        when(authProvider.auth(any(MQTT5AuthData.class)))
+            
.thenReturn(CompletableFuture.completedFuture(MQTT5AuthResult.newBuilder()
+                .setSuccess(Success.newBuilder()
+                    .setTenantId("tenant")
+                    .setUserId("testUser")
+                    .build()).build()));
+        when(authProvider.checkPermission(any(), any()))
+            .thenReturn(CompletableFuture.completedFuture(
+                
CheckResult.newBuilder().setGranted(Granted.newBuilder().build()).build()));
+        when(settingProvider.provide(eq(Setting.ForceTransient), 
eq("tenant"))).thenReturn(false);
+
+        MqttConnectionOptions connOpts = new MqttConnectionOptions();
+        connOpts.setCleanStart(true);
+        connOpts.setSessionExpiryInterval(1800L);
+        connOpts.setUserName("tenant/testUser");
+
+        MqttTestClient client1 = new MqttTestClient(BROKER_URI, "client_id");
+        IMqttToken token1 = client1.connect(connOpts);
+        assertTrue(token1.isComplete());
+        assertNull(token1.getException());
+        client1.disconnect();
+
+        // reconnect
+        MqttTestClient client2 = new MqttTestClient(BROKER_URI, "client_id");
+        IMqttToken token2 = client2.connect(connOpts);
+        assertTrue(token2.isComplete());
+        assertNull(token2.getException());
+    }
+
 }

Reply via email to