This is an automated email from the ASF dual-hosted git repository.
popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git
The following commit(s) were added to refs/heads/main by this push:
new b7f17af8 Fixed the issue that cleanStart failed to cleanup previous
persistent session when SEI > 0. (#145)
b7f17af8 is described below
commit b7f17af817eb09c0f6b32236245d28437221d1f8
Author: zhangShunLin <[email protected]>
AuthorDate: Mon Jun 16 13:30:03 2025 +0800
Fixed the issue that cleanStart failed to cleanup previous persistent
session when SEI > 0. (#145)
Co-authored-by: Yonny(Yu) Hao <[email protected]>
---
.../inbox/server/scheduler/BatchDetachCall.java | 16 ++++++----
.../bifromq/mqtt/handler/MQTTConnectHandler.java | 1 +
.../mqtt/integration/v5/MQTTConnectTest.java | 37 ++++++++++++++++++++++
3 files changed, 47 insertions(+), 7 deletions(-)
diff --git
a/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/scheduler/BatchDetachCall.java
b/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/scheduler/BatchDetachCall.java
index b49da44b..aeeeb865 100644
---
a/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/scheduler/BatchDetachCall.java
+++
b/bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/scheduler/BatchDetachCall.java
@@ -21,6 +21,12 @@ package org.apache.bifromq.inbox.server.scheduler;
import static java.util.Collections.emptySet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basekv.client.IMutationPipeline;
import org.apache.bifromq.basekv.client.exception.BadVersionException;
import org.apache.bifromq.basekv.client.exception.TryLaterException;
@@ -37,12 +43,6 @@ import
org.apache.bifromq.inbox.storage.proto.BatchDetachRequest;
import org.apache.bifromq.inbox.storage.proto.InboxServiceRWCoProcInput;
import org.apache.bifromq.inbox.storage.proto.InboxVersion;
import org.apache.bifromq.inbox.storage.proto.Replica;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import lombok.extern.slf4j.Slf4j;
@Slf4j
class BatchDetachCall extends BatchMutationCall<DetachRequest, DetachReply> {
@@ -69,10 +69,12 @@ class BatchDetachCall extends
BatchMutationCall<DetachRequest, DetachReply> {
BatchDetachRequest.Params.Builder paramsBuilder =
BatchDetachRequest.Params.newBuilder()
.setTenantId(request.getClient().getTenantId())
.setInboxId(request.getInboxId())
- .setVersion(request.getVersion())
.setExpirySeconds(request.getExpirySeconds())
.setDiscardLWT(request.getDiscardLWT())
.setNow(request.getNow());
+ if (request.hasVersion()) {
+ paramsBuilder.setVersion(request.getVersion());
+ }
reqBuilder.addParams(paramsBuilder.build());
}
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
index 7fba9567..49aba521 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
@@ -387,6 +387,7 @@ public abstract class MQTTConnectHandler extends
ChannelDuplexHandler {
if (requestClientId.isEmpty()) {
return CompletableFuture.completedFuture(ExpireResult.NOT_FOUND);
}
+ // detach and expire the latest version immediately
return inboxClient.detach(DetachRequest.newBuilder()
.setReqId(reqId)
.setInboxId(userSessionId)
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/integration/v5/MQTTConnectTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/integration/v5/MQTTConnectTest.java
index dcabcdec..6169b19c 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/integration/v5/MQTTConnectTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/integration/v5/MQTTConnectTest.java
@@ -24,6 +24,8 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import org.apache.bifromq.mqtt.integration.MQTTTest;
import org.apache.bifromq.mqtt.integration.v5.client.MqttTestClient;
@@ -134,4 +136,39 @@ public class MQTTConnectTest extends MQTTTest {
IMqttToken token = client.connect(connOpts);
assertEquals(token.getResponseProperties().getSessionExpiryInterval(),
0L);
}
+
+ /**
+ * Test whether it can reconnect when cleanStart=true but the session
expiration interval is not 0, when reconnecting.
+ */
+ @Test(groups = "integration")
+ public void reconnectTest() {
+ when(authProvider.auth(any(MQTT5AuthData.class)))
+
.thenReturn(CompletableFuture.completedFuture(MQTT5AuthResult.newBuilder()
+ .setSuccess(Success.newBuilder()
+ .setTenantId("tenant")
+ .setUserId("testUser")
+ .build()).build()));
+ when(authProvider.checkPermission(any(), any()))
+ .thenReturn(CompletableFuture.completedFuture(
+
CheckResult.newBuilder().setGranted(Granted.newBuilder().build()).build()));
+ when(settingProvider.provide(eq(Setting.ForceTransient),
eq("tenant"))).thenReturn(false);
+
+ MqttConnectionOptions connOpts = new MqttConnectionOptions();
+ connOpts.setCleanStart(true);
+ connOpts.setSessionExpiryInterval(1800L);
+ connOpts.setUserName("tenant/testUser");
+
+ MqttTestClient client1 = new MqttTestClient(BROKER_URI, "client_id");
+ IMqttToken token1 = client1.connect(connOpts);
+ assertTrue(token1.isComplete());
+ assertNull(token1.getException());
+ client1.disconnect();
+
+ // reconnect
+ MqttTestClient client2 = new MqttTestClient(BROKER_URI, "client_id");
+ IMqttToken token2 = client2.connect(connOpts);
+ assertTrue(token2.isComplete());
+ assertNull(token2.getException());
+ }
+
}