This is an automated email from the ASF dual-hosted git repository.

shunzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git


The following commit(s) were added to refs/heads/main by this push:
     new ff8242f5 Check existence of inbox before detaching for CleanStart=true 
(#155)
ff8242f5 is described below

commit ff8242f56ee26b82fa980d9e4dc4fe4ed7bf3b10
Author: Yonny(Yu) Hao <[email protected]>
AuthorDate: Thu Jul 17 14:55:59 2025 +0800

    Check existence of inbox before detaching for CleanStart=true (#155)
    
    Co-authored-by: haoyu <[email protected]>
---
 .../bifromq/mqtt/handler/MQTTConnectHandler.java   | 115 +++++++++++----------
 .../bifromq/mqtt/handler/v3/BaseMQTTTest.java      |   2 +-
 .../mqtt/handler/v3/MQTT3ConnectHandlerTest.java   |  32 +++---
 .../bifromq/mqtt/handler/v3/MQTTConnectTest.java   |  16 ++-
 .../mqtt/handler/v3/MQTTDisconnectTest.java        |  12 ++-
 .../bifromq/mqtt/handler/v3/MQTTKickTest.java      |   5 +-
 .../mqtt/handler/v3/MQTTWillMessageTest.java       |   9 +-
 .../bifromq/mqtt/handler/v5/EnhancedAuthTest.java  |  66 +++++++-----
 .../mqtt/handler/v5/MQTT5ConnectHandlerTest.java   |   3 +
 9 files changed, 149 insertions(+), 111 deletions(-)

diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
index 49aba521..704a272f 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
@@ -14,12 +14,11 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.mqtt.handler;
 
-import static org.apache.bifromq.base.util.CompletableFutureUtil.unwrap;
 import static org.apache.bifromq.metrics.TenantMetric.MqttIngressBytes;
 import static org.apache.bifromq.mqtt.handler.MQTTSessionIdUtil.userSessionId;
 import static org.apache.bifromq.mqtt.handler.condition.ORCondition.or;
@@ -47,15 +46,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bifromq.base.util.AsyncRetry;
 import org.apache.bifromq.base.util.FutureTracker;
-import org.apache.bifromq.base.util.exception.RetryTimeoutException;
 import org.apache.bifromq.basehlc.HLC;
 import org.apache.bifromq.inbox.client.IInboxClient;
 import org.apache.bifromq.inbox.rpc.proto.AttachRequest;
 import org.apache.bifromq.inbox.rpc.proto.DetachReply;
 import org.apache.bifromq.inbox.rpc.proto.DetachRequest;
-import org.apache.bifromq.inbox.rpc.proto.ExistReply;
 import org.apache.bifromq.inbox.rpc.proto.ExistRequest;
 import org.apache.bifromq.inbox.storage.proto.InboxVersion;
 import org.apache.bifromq.inbox.storage.proto.LWT;
@@ -231,30 +227,11 @@ public abstract class MQTTConnectHandler extends 
ChannelDuplexHandler {
                             if (sessionExpiryInterval == 0) {
                                 // try to attach to previous session and reset 
its SEI to 0
                                 // or set up a new transient session
-                                return AsyncRetry.exec(() -> 
inboxClient.exist(ExistRequest.newBuilder()
-                                            .setReqId(reqId)
-                                            
.setTenantId(clientInfo.getTenantId())
-                                            .setInboxId(userSessionId)
-                                            .build()),
-                                        (reply, t) -> {
-                                            if (reply != null) {
-                                                return reply.getCode() == 
ExistReply.Code.TRY_LATER;
-                                            }
-                                            return false;
-                                        }, sessionCtx.retryTimeoutNanos / 5, 
sessionCtx.retryTimeoutNanos)
-                                    .exceptionally(unwrap(e -> {
-                                        if (e instanceof 
RetryTimeoutException) {
-                                            return ExistReply.newBuilder()
-                                                .setReqId(reqId)
-                                                
.setCode(ExistReply.Code.TRY_LATER)
-                                                .build();
-                                        }
-                                        log.debug("Failed to get inbox", e);
-                                        return ExistReply.newBuilder()
-                                            .setReqId(reqId)
-                                            .setCode(ExistReply.Code.ERROR)
-                                            .build();
-                                    }))
+                                return 
inboxClient.exist(ExistRequest.newBuilder()
+                                        .setReqId(reqId)
+                                        .setTenantId(clientInfo.getTenantId())
+                                        .setInboxId(userSessionId)
+                                        .build())
                                     .thenAcceptAsync(getReply -> {
                                         switch (getReply.getCode()) {
                                             case EXIST -> {
@@ -385,47 +362,75 @@ public abstract class MQTTConnectHandler extends 
ChannelDuplexHandler {
                                                         String userSessionId,
                                                         ClientInfo clientInfo) 
{
         if (requestClientId.isEmpty()) {
+            // if server generated client id, no need to expire
             return CompletableFuture.completedFuture(ExpireResult.NOT_FOUND);
         }
-        // detach and expire the latest version immediately
-        return inboxClient.detach(DetachRequest.newBuilder()
+        // check if the inbox exists which is a more lightweight operation 
than detach
+        return inboxClient.exist(ExistRequest.newBuilder()
                 .setReqId(reqId)
+                .setTenantId(clientInfo.getTenantId())
                 .setInboxId(userSessionId)
-                .setExpirySeconds(0)
-                .setDiscardLWT(true)
-                .setClient(clientInfo)
-                .setNow(HLC.INST.getPhysical())
                 .build())
-            .exceptionally(e -> {
-                log.debug("Failed to expire inbox", e);
-                return DetachReply.newBuilder()
-                    .setReqId(reqId)
-                    .setCode(DetachReply.Code.ERROR)
-                    .build();
-            })
-            .thenApplyAsync(reply -> {
-                switch (reply.getCode()) {
-                    case OK -> {
-                        return ExpireResult.OK;
-                    }
+            .thenComposeAsync(existReply -> {
+                switch (existReply.getCode()) {
                     case NO_INBOX -> {
-                        return ExpireResult.NOT_FOUND;
+                        return 
CompletableFuture.completedFuture(ExpireResult.NOT_FOUND);
+                    }
+                    case EXIST -> {
+                        // detach and expire the latest version immediately
+                        return inboxClient.detach(DetachRequest.newBuilder()
+                                .setReqId(reqId)
+                                .setInboxId(userSessionId)
+                                .setExpirySeconds(0)
+                                .setDiscardLWT(true)
+                                .setClient(clientInfo)
+                                .setNow(HLC.INST.getPhysical())
+                                .build())
+                            .exceptionally(e -> {
+                                log.debug("Failed to expire inbox", e);
+                                return DetachReply.newBuilder()
+                                    .setReqId(reqId)
+                                    .setCode(DetachReply.Code.ERROR)
+                                    .build();
+                            })
+                            .thenApplyAsync(reply -> {
+                                switch (reply.getCode()) {
+                                    case OK -> {
+                                        return ExpireResult.OK;
+                                    }
+                                    case NO_INBOX -> {
+                                        return ExpireResult.NOT_FOUND;
+                                    }
+                                    case TRY_LATER -> {
+                                        handleGoAway(
+                                            onInboxCallRetry(clientInfo, 
"Inbox service call[expire] needs retry"));
+                                        return ExpireResult.ERROR;
+                                    }
+                                    case BACK_PRESSURE_REJECTED -> {
+                                        
handleGoAway(onInboxCallBusy(clientInfo, "Inbox service call[expire] busy"));
+                                        return ExpireResult.ERROR;
+                                    }
+                                    default -> {
+                                        
handleGoAway(onInboxCallError(clientInfo, "Inbox service call[expire] error"));
+                                        return ExpireResult.ERROR;
+                                    }
+                                }
+                            }, ctx.executor());
                     }
                     case TRY_LATER -> {
-                        handleGoAway(onInboxCallRetry(clientInfo, "Inbox 
service call[expire] needs retry"));
-                        return ExpireResult.ERROR;
+                        handleGoAway(onInboxCallError(clientInfo, "Inbox 
service call[exist] needs retry"));
+                        return 
CompletableFuture.completedFuture(ExpireResult.ERROR);
                     }
                     case BACK_PRESSURE_REJECTED -> {
-                        handleGoAway(onInboxCallBusy(clientInfo, "Inbox 
service call[expire] busy"));
-                        return ExpireResult.ERROR;
+                        handleGoAway(onInboxCallError(clientInfo, "Inbox 
service call[exist] needs busy"));
+                        return 
CompletableFuture.completedFuture(ExpireResult.ERROR);
                     }
                     default -> {
-                        handleGoAway(onInboxCallError(clientInfo, "Inbox 
service call[expire] error"));
-                        return ExpireResult.ERROR;
+                        handleGoAway(onInboxCallError(clientInfo, "Inbox 
service call[exist] error"));
+                        return 
CompletableFuture.completedFuture(ExpireResult.ERROR);
                     }
                 }
             }, ctx.executor());
-
     }
 
     protected abstract GoAway sanityCheck(MqttConnectMessage message);
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/BaseMQTTTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/BaseMQTTTest.java
index 98cdc822..ecc2021c 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/BaseMQTTTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/BaseMQTTTest.java
@@ -425,7 +425,7 @@ public abstract class BaseMQTTTest {
     protected void setupTransientSession() {
         mockAuthPass();
         mockSessionReg();
-        mockInboxDetach(DetachReply.Code.NO_INBOX);
+        mockInboxExist(false);
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true);
         channel.writeInbound(connectMessage);
         channel.runPendingTasks();
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ConnectHandlerTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ConnectHandlerTest.java
index 9a618f0c..787c74b2 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ConnectHandlerTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ConnectHandlerTest.java
@@ -14,7 +14,7 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.mqtt.handler.v3;
@@ -30,9 +30,22 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.mqtt.MqttConnAckMessage;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
+import io.netty.handler.codec.mqtt.MqttMessageBuilders;
+import io.netty.handler.codec.mqtt.MqttVersion;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.bifromq.inbox.client.IInboxClient;
 import org.apache.bifromq.inbox.rpc.proto.AttachReply;
 import org.apache.bifromq.inbox.rpc.proto.DetachReply;
+import org.apache.bifromq.inbox.rpc.proto.ExistReply;
 import org.apache.bifromq.mqtt.MockableTest;
 import org.apache.bifromq.mqtt.handler.ChannelAttrs;
 import org.apache.bifromq.mqtt.handler.v5.MQTT5MessageUtils;
@@ -49,22 +62,10 @@ import org.apache.bifromq.plugin.clientbalancer.Redirection;
 import org.apache.bifromq.plugin.eventcollector.EventType;
 import org.apache.bifromq.plugin.eventcollector.IEventCollector;
 import 
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.Redirect;
+import org.apache.bifromq.plugin.resourcethrottler.IResourceThrottler;
 import org.apache.bifromq.plugin.settingprovider.ISettingProvider;
 import org.apache.bifromq.plugin.settingprovider.Setting;
 import org.apache.bifromq.type.ClientInfo;
-import org.apache.bifromq.plugin.resourcethrottler.IResourceThrottler;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.codec.mqtt.MqttConnAckMessage;
-import io.netty.handler.codec.mqtt.MqttConnectMessage;
-import io.netty.handler.codec.mqtt.MqttMessageBuilders;
-import io.netty.handler.codec.mqtt.MqttVersion;
-import java.net.InetSocketAddress;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import org.mockito.Mock;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -229,6 +230,9 @@ public class MQTT3ConnectHandlerTest extends MockableTest {
         when(authProvider.checkPermission(any(ClientInfo.class), 
argThat(MQTTAction::hasConn))).thenReturn(
             CompletableFuture.completedFuture(
                 
CheckResult.newBuilder().setGranted(Granted.getDefaultInstance()).build()));
+        
when(inboxClient.exist(any())).thenReturn(CompletableFuture.completedFuture(ExistReply.newBuilder()
+            .setCode(ExistReply.Code.EXIST)
+            .build()));
         
when(inboxClient.detach(any())).thenReturn(CompletableFuture.completedFuture(DetachReply.newBuilder()
             .setCode(DetachReply.Code.BACK_PRESSURE_REJECTED)
             .build()));
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTConnectTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTConnectTest.java
index 30cac18d..038bc11d 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTConnectTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTConnectTest.java
@@ -59,7 +59,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
     public void transientSessionWithoutInbox() {
         mockAuthPass();
         mockSessionReg();
-        mockInboxDetach(DetachReply.Code.NO_INBOX);
+        mockInboxExist(false);
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true);
         channel.writeInbound(connectMessage);
         channel.runPendingTasks();
@@ -72,6 +72,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
     public void transientSessionWithInbox() {
         mockAuthPass();
         mockSessionReg();
+        mockInboxExist(true);
         mockInboxDetach(DetachReply.Code.OK);
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true);
         channel.writeInbound(connectMessage);
@@ -86,6 +87,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
         // clear failed
         mockAuthPass();
         mockSessionReg();
+        mockInboxExist(true);
         mockInboxDetach(DetachReply.Code.ERROR);
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true);
         channel.writeInbound(connectMessage);
@@ -161,6 +163,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
         String attrVal = "attrVal";
         mockAuthPass(attrKey, attrVal);
         mockSessionReg();
+        mockInboxExist(true);
         mockInboxDetach(DetachReply.Code.OK);
 
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true);
@@ -183,6 +186,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
         String attrVal = "attrVal";
         mockAuthPass(attrKey, attrVal);
         mockSessionReg();
+        mockInboxExist(true);
         mockInboxDetach(DetachReply.Code.OK);
 
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true);
@@ -191,7 +195,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
         channel.runPendingTasks();
         MqttConnAckMessage ackMessage = channel.readOutbound();
         // verifications
-        Assert.assertEquals(CONNECTION_ACCEPTED, 
ackMessage.variableHeader().connectReturnCode());
+        Assert.assertEquals(ackMessage.variableHeader().connectReturnCode(), 
CONNECTION_ACCEPTED);
         ArgumentCaptor<ClientConnected> eventArgumentCaptor = 
ArgumentCaptor.forClass(ClientConnected.class);
         verify(eventCollector).report(eventArgumentCaptor.capture());
         ClientConnected clientConnected = eventArgumentCaptor.getValue();
@@ -208,7 +212,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
         channel.runPendingTasks();
         MqttConnAckMessage ackMessage = channel.readOutbound();
         // verifications
-        Assert.assertEquals(CONNECTION_REFUSED_NOT_AUTHORIZED, 
ackMessage.variableHeader().connectReturnCode());
+        Assert.assertEquals(ackMessage.variableHeader().connectReturnCode(), 
CONNECTION_REFUSED_NOT_AUTHORIZED);
         verifyEvent(EventType.NOT_AUTHORIZED_CLIENT);
     }
 
@@ -221,8 +225,8 @@ public class MQTTConnectTest extends BaseMQTTTest {
         channel.advanceTimeBy(disconnectDelay, TimeUnit.MILLISECONDS);
         channel.runPendingTasks();
         MqttConnAckMessage ackMessage = channel.readOutbound();
-        Assert.assertEquals(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD,
-            ackMessage.variableHeader().connectReturnCode());
+        Assert.assertEquals(ackMessage.variableHeader().connectReturnCode(),
+                CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
         verifyEvent(EventType.UNAUTHENTICATED_CLIENT);
     }
 
@@ -245,6 +249,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
     public void validWillTopic() {
         mockAuthPass();
         mockSessionReg();
+        mockInboxExist(true);
         mockInboxDetach(DetachReply.Code.OK);
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.qoSWillMqttConnectMessage(1, true);
         channel.writeInbound(connectMessage);
@@ -258,6 +263,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
     public void pingAndPingResp() {
         mockAuthPass();
         mockSessionReg();
+        mockInboxExist(true);
         mockInboxDetach(DetachReply.Code.OK);
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.qoSWillMqttConnectMessage(1, true);
         channel.writeInbound(connectMessage);
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTDisconnectTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTDisconnectTest.java
index a8bb915c..6cf7db34 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTDisconnectTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTDisconnectTest.java
@@ -50,6 +50,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
     public void transientSession() {
         mockAuthPass();
         mockSessionReg();
+        mockInboxExist(true);
         mockInboxDetach(DetachReply.Code.OK);
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true);
         channel.writeInbound(connectMessage);
@@ -86,7 +87,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
     public void idle() {
         mockAuthPass();
         mockSessionReg();
-        mockInboxDetach(DetachReply.Code.NO_INBOX);
+        mockInboxExist(false);
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true, "testClientId", 60);
         channel.writeInbound(connectMessage);
         channel.runPendingTasks();
@@ -102,7 +103,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
     public void noKeepAlive() {
         mockAuthPass();
         mockSessionReg();
-        mockInboxDetach(DetachReply.Code.NO_INBOX);
+        mockInboxExist(false);
 
         // keepalive = 0
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true, "abc", 0);
@@ -121,7 +122,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
     public void enforceMinKeepAlive() {
         mockAuthPass();
         mockSessionReg();
-        mockInboxDetach(DetachReply.Code.NO_INBOX);
+        mockInboxExist(false);
 
         // keepalive too short, least is 60s
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true, "abc", 1);
@@ -139,7 +140,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
     public void connectTwice() {
         mockAuthPass();
         mockSessionReg();
-        mockInboxDetach(DetachReply.Code.NO_INBOX);
+        mockInboxExist(false);
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true);
         channel.writeInbound(connectMessage);
         channel.runPendingTasks();
@@ -156,6 +157,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
     public void disconnectByServer() {
         mockAuthPass();
         mockSessionReg();
+        mockInboxExist(true);
         mockInboxDetach(DetachReply.Code.NO_INBOX);
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true);
         channel.writeInbound(connectMessage);
@@ -171,7 +173,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
     public void badPacketAfterConnected() {
         mockAuthPass();
         mockSessionReg();
-        mockInboxDetach(DetachReply.Code.NO_INBOX);
+        mockInboxExist(false);
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true);
         channel.writeInbound(connectMessage);
         channel.runPendingTasks();
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTKickTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTKickTest.java
index c4222639..7ceb61f3 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTKickTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTKickTest.java
@@ -27,7 +27,6 @@ import static 
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEP
 
 import io.netty.handler.codec.mqtt.MqttConnAckMessage;
 import io.netty.handler.codec.mqtt.MqttConnectMessage;
-import org.apache.bifromq.inbox.rpc.proto.DetachReply;
 import org.apache.bifromq.mqtt.utils.MQTTMessageUtils;
 import org.apache.bifromq.sessiondict.rpc.proto.ServerRedirection;
 import org.apache.bifromq.type.ClientInfo;
@@ -40,11 +39,11 @@ public class MQTTKickTest extends BaseMQTTTest {
     public void testKick() {
         mockAuthPass();
         mockSessionReg();
-        mockInboxDetach(DetachReply.Code.NO_INBOX);
+        mockInboxExist(false);
         MqttConnectMessage connectMessage = 
MQTTMessageUtils.mqttConnectMessage(true);
         channel.writeInbound(connectMessage);
         MqttConnAckMessage ackMessage = channel.readOutbound();
-        Assert.assertEquals(CONNECTION_ACCEPTED, 
ackMessage.variableHeader().connectReturnCode());
+        Assert.assertEquals(ackMessage.variableHeader().connectReturnCode(), 
CONNECTION_ACCEPTED);
 
         // kick
 
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTWillMessageTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTWillMessageTest.java
index 50249e1a..941711e2 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTWillMessageTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTWillMessageTest.java
@@ -14,12 +14,13 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.mqtt.handler.v3;
 
 
+import static 
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
 import static 
org.apache.bifromq.plugin.eventcollector.EventType.CLIENT_CONNECTED;
 import static org.apache.bifromq.plugin.eventcollector.EventType.IDLE;
 import static org.apache.bifromq.plugin.eventcollector.EventType.KICKED;
@@ -34,7 +35,6 @@ import static 
org.apache.bifromq.plugin.eventcollector.EventType.WILL_DIST_ERROR
 import static org.apache.bifromq.retain.rpc.proto.RetainReply.Result.CLEARED;
 import static org.apache.bifromq.retain.rpc.proto.RetainReply.Result.ERROR;
 import static org.apache.bifromq.retain.rpc.proto.RetainReply.Result.RETAINED;
-import static 
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -42,13 +42,12 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 
-import org.apache.bifromq.plugin.eventcollector.EventType;
 import io.netty.handler.codec.mqtt.MqttConnAckMessage;
 import io.netty.handler.codec.mqtt.MqttConnectMessage;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bifromq.inbox.rpc.proto.DetachReply;
 import org.apache.bifromq.mqtt.utils.MQTTMessageUtils;
+import org.apache.bifromq.plugin.eventcollector.EventType;
 import org.apache.bifromq.sessiondict.rpc.proto.ServerRedirection;
 import org.apache.bifromq.type.ClientInfo;
 import org.testng.Assert;
@@ -189,7 +188,7 @@ public class MQTTWillMessageTest extends BaseMQTTTest {
     protected void setupTransientSessionWithLWT(boolean willRetain) {
         mockAuthPass();
         mockSessionReg();
-        mockInboxDetach(DetachReply.Code.NO_INBOX);
+        mockInboxExist(false);
         MqttConnectMessage connectMessage;
         if (!willRetain) {
             connectMessage = MQTTMessageUtils.qoSWillMqttConnectMessage(1, 
true);
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/EnhancedAuthTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/EnhancedAuthTest.java
index a73143df..60e0c5bf 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/EnhancedAuthTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/EnhancedAuthTest.java
@@ -29,20 +29,6 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
-import org.apache.bifromq.plugin.authprovider.IAuthProvider;
-import org.apache.bifromq.plugin.authprovider.type.CheckResult;
-import org.apache.bifromq.plugin.authprovider.type.Continue;
-import org.apache.bifromq.plugin.authprovider.type.Failed;
-import org.apache.bifromq.plugin.authprovider.type.Granted;
-import org.apache.bifromq.plugin.authprovider.type.MQTT5ExtendedAuthData;
-import org.apache.bifromq.plugin.authprovider.type.MQTT5ExtendedAuthResult;
-import org.apache.bifromq.plugin.authprovider.type.MQTTAction;
-import org.apache.bifromq.plugin.authprovider.type.Success;
-import org.apache.bifromq.plugin.clientbalancer.IClientBalancer;
-import org.apache.bifromq.plugin.eventcollector.IEventCollector;
-import org.apache.bifromq.plugin.settingprovider.ISettingProvider;
-import org.apache.bifromq.plugin.settingprovider.Setting;
-import org.apache.bifromq.plugin.resourcethrottler.IResourceThrottler;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.protobuf.ByteString;
 import io.netty.channel.Channel;
@@ -66,6 +52,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bifromq.inbox.client.IInboxClient;
 import org.apache.bifromq.inbox.rpc.proto.DetachReply;
+import org.apache.bifromq.inbox.rpc.proto.ExistReply;
 import org.apache.bifromq.mqtt.MockableTest;
 import org.apache.bifromq.mqtt.handler.ChannelAttrs;
 import org.apache.bifromq.mqtt.handler.ConditionalRejectHandler;
@@ -77,6 +64,20 @@ import 
org.apache.bifromq.mqtt.handler.v5.reason.MQTT5AuthReasonCode;
 import org.apache.bifromq.mqtt.service.ILocalSessionRegistry;
 import org.apache.bifromq.mqtt.service.LocalSessionRegistry;
 import org.apache.bifromq.mqtt.session.MQTTSessionContext;
+import org.apache.bifromq.plugin.authprovider.IAuthProvider;
+import org.apache.bifromq.plugin.authprovider.type.CheckResult;
+import org.apache.bifromq.plugin.authprovider.type.Continue;
+import org.apache.bifromq.plugin.authprovider.type.Failed;
+import org.apache.bifromq.plugin.authprovider.type.Granted;
+import org.apache.bifromq.plugin.authprovider.type.MQTT5ExtendedAuthData;
+import org.apache.bifromq.plugin.authprovider.type.MQTT5ExtendedAuthResult;
+import org.apache.bifromq.plugin.authprovider.type.MQTTAction;
+import org.apache.bifromq.plugin.authprovider.type.Success;
+import org.apache.bifromq.plugin.clientbalancer.IClientBalancer;
+import org.apache.bifromq.plugin.eventcollector.IEventCollector;
+import org.apache.bifromq.plugin.resourcethrottler.IResourceThrottler;
+import org.apache.bifromq.plugin.settingprovider.ISettingProvider;
+import org.apache.bifromq.plugin.settingprovider.Setting;
 import org.apache.bifromq.sessiondict.client.ISessionDictClient;
 import org.apache.bifromq.type.ClientInfo;
 import org.apache.bifromq.type.StringPair;
@@ -159,12 +160,21 @@ public class EnhancedAuthTest extends MockableTest {
     @Test
     public void testAuthSuccess() {
         Mockito.reset(authProvider);
+        
when(inboxClient.exist(any())).thenReturn(CompletableFuture.completedFuture(ExistReply.newBuilder()
+            .setCode(ExistReply.Code.NO_INBOX)
+            .build()));
         
when(authProvider.extendedAuth(any(MQTT5ExtendedAuthData.class))).thenReturn(CompletableFuture.completedFuture(
-            MQTT5ExtendedAuthResult.newBuilder().setSuccess(
-                
Success.newBuilder().setAuthData(ByteString.copyFromUtf8("hello")).setUserProps(
-                        UserProperties.newBuilder()
-                            
.addUserProperties(StringPair.newBuilder().setKey("key").setValue("val").build()).build())
-                    .build()).build()));
+            MQTT5ExtendedAuthResult.newBuilder()
+                .setSuccess(Success.newBuilder()
+                    .setAuthData(ByteString.copyFromUtf8("hello"))
+                    .setUserProps(UserProperties.newBuilder()
+                        .addUserProperties(StringPair.newBuilder()
+                            .setKey("key")
+                            .setValue("val")
+                            .build())
+                        .build())
+                    .build())
+                .build()));
         when(authProvider.checkPermission(any(ClientInfo.class), 
argThat(MQTTAction::hasConn))).thenReturn(
             CompletableFuture.completedFuture(
                 
CheckResult.newBuilder().setGranted(Granted.getDefaultInstance()).build()));
@@ -193,6 +203,9 @@ public class EnhancedAuthTest extends MockableTest {
 
     @Test
     public void testAuthSuccess2() {
+        
when(inboxClient.exist(any())).thenReturn(CompletableFuture.completedFuture(ExistReply.newBuilder()
+            .setCode(ExistReply.Code.NO_INBOX)
+            .build()));
         MqttConnectMessage connect = 
MqttMessageBuilders.connect().clientId("client")
             .protocolVersion(MqttVersion.MQTT_5).cleanSession(true).properties(
                 MQTT5MessageUtils.mqttProps().addAuthMethod("authMethod")
@@ -207,7 +220,8 @@ public class EnhancedAuthTest extends MockableTest {
                     .build()).build()));
         channel.writeInbound(connect);
         MqttMessage authMessage = channel.readOutbound();
-        MqttProperties properties = 
((MqttReasonCodeAndPropertiesVariableHeader) 
authMessage.variableHeader()).properties();
+        MqttProperties properties =
+            ((MqttReasonCodeAndPropertiesVariableHeader) 
authMessage.variableHeader()).properties();
         String authMethod = authMethod(properties).orElseThrow();
         ByteString authData = 
MQTT5MessageUtils.authData(properties).orElseThrow();
         assertEquals(authMethod, "authMethod");
@@ -245,7 +259,8 @@ public class EnhancedAuthTest extends MockableTest {
                 
.setContinue(Continue.newBuilder().setAuthData(ByteString.copyFromUtf8(challenge)).build()).build()));
         channel.writeInbound(connect);
         MqttMessage authMessage = channel.readOutbound();
-        MqttProperties properties = 
((MqttReasonCodeAndPropertiesVariableHeader) 
authMessage.variableHeader()).properties();
+        MqttProperties properties =
+            ((MqttReasonCodeAndPropertiesVariableHeader) 
authMessage.variableHeader()).properties();
         String authMethod = authMethod(properties).orElseThrow();
         ByteString authData = 
MQTT5MessageUtils.authData(properties).orElseThrow();
         assertEquals(authMethod, "authMethod");
@@ -298,6 +313,9 @@ public class EnhancedAuthTest extends MockableTest {
     @Test
     public void testReAuth() {
         Mockito.reset(authProvider);
+        
when(inboxClient.exist(any())).thenReturn(CompletableFuture.completedFuture(ExistReply.newBuilder()
+            .setCode(ExistReply.Code.NO_INBOX)
+            .build()));
         
when(authProvider.extendedAuth(any(MQTT5ExtendedAuthData.class))).thenReturn(CompletableFuture.completedFuture(
             MQTT5ExtendedAuthResult.newBuilder().setSuccess(
                 
Success.newBuilder().setAuthData(ByteString.copyFromUtf8("hello")).setUserProps(
@@ -319,11 +337,13 @@ public class EnhancedAuthTest extends MockableTest {
             MQTT5MessageUtils.mqttProps().addAuthMethod("authMethod")
                 .addAuthData(ByteString.copyFrom("reAuthData", 
StandardCharsets.UTF_8)).build()).build());
         MqttMessage reAuthMessage = channel.readOutbound();
-        MqttReasonCodeAndPropertiesVariableHeader variableHeader = 
((MqttReasonCodeAndPropertiesVariableHeader) reAuthMessage.variableHeader());
+        MqttReasonCodeAndPropertiesVariableHeader variableHeader =
+            ((MqttReasonCodeAndPropertiesVariableHeader) 
reAuthMessage.variableHeader());
         MQTT5AuthReasonCode reasonCode = 
MQTT5AuthReasonCode.valueOf(variableHeader.reasonCode());
         assertEquals(reasonCode, MQTT5AuthReasonCode.Success);
 
-        MqttProperties mqttProperties = 
((MqttReasonCodeAndPropertiesVariableHeader) 
reAuthMessage.variableHeader()).properties();
+        MqttProperties mqttProperties =
+            ((MqttReasonCodeAndPropertiesVariableHeader) 
reAuthMessage.variableHeader()).properties();
         Optional<String> authMethodOpt = authMethod(mqttProperties);
         Optional<ByteString> authDataOpt = 
MQTT5MessageUtils.authData(mqttProperties);
         assertTrue(authMethodOpt.isPresent());
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ConnectHandlerTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ConnectHandlerTest.java
index 04c98c25..465366da 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ConnectHandlerTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ConnectHandlerTest.java
@@ -371,6 +371,9 @@ public class MQTT5ConnectHandlerTest extends MockableTest {
         when(authProvider.checkPermission(any(ClientInfo.class), 
argThat(MQTTAction::hasConn))).thenReturn(
             CompletableFuture.completedFuture(
                 
CheckResult.newBuilder().setGranted(Granted.getDefaultInstance()).build()));
+        
when(inboxClient.exist(any())).thenReturn(CompletableFuture.completedFuture(ExistReply.newBuilder()
+                .setCode(ExistReply.Code.EXIST)
+                .build()));
         
when(inboxClient.detach(any())).thenReturn(CompletableFuture.completedFuture(DetachReply.newBuilder()
             .setCode(DetachReply.Code.BACK_PRESSURE_REJECTED)
             .build()));


Reply via email to