This is an automated email from the ASF dual-hosted git repository.
shunzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git
The following commit(s) were added to refs/heads/main by this push:
new ff8242f5 Check existence of inbox before detaching for CleanStart=true
(#155)
ff8242f5 is described below
commit ff8242f56ee26b82fa980d9e4dc4fe4ed7bf3b10
Author: Yonny(Yu) Hao <[email protected]>
AuthorDate: Thu Jul 17 14:55:59 2025 +0800
Check existence of inbox before detaching for CleanStart=true (#155)
Co-authored-by: haoyu <[email protected]>
---
.../bifromq/mqtt/handler/MQTTConnectHandler.java | 115 +++++++++++----------
.../bifromq/mqtt/handler/v3/BaseMQTTTest.java | 2 +-
.../mqtt/handler/v3/MQTT3ConnectHandlerTest.java | 32 +++---
.../bifromq/mqtt/handler/v3/MQTTConnectTest.java | 16 ++-
.../mqtt/handler/v3/MQTTDisconnectTest.java | 12 ++-
.../bifromq/mqtt/handler/v3/MQTTKickTest.java | 5 +-
.../mqtt/handler/v3/MQTTWillMessageTest.java | 9 +-
.../bifromq/mqtt/handler/v5/EnhancedAuthTest.java | 66 +++++++-----
.../mqtt/handler/v5/MQTT5ConnectHandlerTest.java | 3 +
9 files changed, 149 insertions(+), 111 deletions(-)
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
index 49aba521..704a272f 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java
@@ -14,12 +14,11 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.mqtt.handler;
-import static org.apache.bifromq.base.util.CompletableFutureUtil.unwrap;
import static org.apache.bifromq.metrics.TenantMetric.MqttIngressBytes;
import static org.apache.bifromq.mqtt.handler.MQTTSessionIdUtil.userSessionId;
import static org.apache.bifromq.mqtt.handler.condition.ORCondition.or;
@@ -47,15 +46,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bifromq.base.util.AsyncRetry;
import org.apache.bifromq.base.util.FutureTracker;
-import org.apache.bifromq.base.util.exception.RetryTimeoutException;
import org.apache.bifromq.basehlc.HLC;
import org.apache.bifromq.inbox.client.IInboxClient;
import org.apache.bifromq.inbox.rpc.proto.AttachRequest;
import org.apache.bifromq.inbox.rpc.proto.DetachReply;
import org.apache.bifromq.inbox.rpc.proto.DetachRequest;
-import org.apache.bifromq.inbox.rpc.proto.ExistReply;
import org.apache.bifromq.inbox.rpc.proto.ExistRequest;
import org.apache.bifromq.inbox.storage.proto.InboxVersion;
import org.apache.bifromq.inbox.storage.proto.LWT;
@@ -231,30 +227,11 @@ public abstract class MQTTConnectHandler extends
ChannelDuplexHandler {
if (sessionExpiryInterval == 0) {
// try to attach to previous session and reset
its SEI to 0
// or set up a new transient session
- return AsyncRetry.exec(() ->
inboxClient.exist(ExistRequest.newBuilder()
- .setReqId(reqId)
-
.setTenantId(clientInfo.getTenantId())
- .setInboxId(userSessionId)
- .build()),
- (reply, t) -> {
- if (reply != null) {
- return reply.getCode() ==
ExistReply.Code.TRY_LATER;
- }
- return false;
- }, sessionCtx.retryTimeoutNanos / 5,
sessionCtx.retryTimeoutNanos)
- .exceptionally(unwrap(e -> {
- if (e instanceof
RetryTimeoutException) {
- return ExistReply.newBuilder()
- .setReqId(reqId)
-
.setCode(ExistReply.Code.TRY_LATER)
- .build();
- }
- log.debug("Failed to get inbox", e);
- return ExistReply.newBuilder()
- .setReqId(reqId)
- .setCode(ExistReply.Code.ERROR)
- .build();
- }))
+ return
inboxClient.exist(ExistRequest.newBuilder()
+ .setReqId(reqId)
+ .setTenantId(clientInfo.getTenantId())
+ .setInboxId(userSessionId)
+ .build())
.thenAcceptAsync(getReply -> {
switch (getReply.getCode()) {
case EXIST -> {
@@ -385,47 +362,75 @@ public abstract class MQTTConnectHandler extends
ChannelDuplexHandler {
String userSessionId,
ClientInfo clientInfo)
{
if (requestClientId.isEmpty()) {
+ // if server generated client id, no need to expire
return CompletableFuture.completedFuture(ExpireResult.NOT_FOUND);
}
- // detach and expire the latest version immediately
- return inboxClient.detach(DetachRequest.newBuilder()
+ // check if the inbox exists which is a more lightweight operation
than detach
+ return inboxClient.exist(ExistRequest.newBuilder()
.setReqId(reqId)
+ .setTenantId(clientInfo.getTenantId())
.setInboxId(userSessionId)
- .setExpirySeconds(0)
- .setDiscardLWT(true)
- .setClient(clientInfo)
- .setNow(HLC.INST.getPhysical())
.build())
- .exceptionally(e -> {
- log.debug("Failed to expire inbox", e);
- return DetachReply.newBuilder()
- .setReqId(reqId)
- .setCode(DetachReply.Code.ERROR)
- .build();
- })
- .thenApplyAsync(reply -> {
- switch (reply.getCode()) {
- case OK -> {
- return ExpireResult.OK;
- }
+ .thenComposeAsync(existReply -> {
+ switch (existReply.getCode()) {
case NO_INBOX -> {
- return ExpireResult.NOT_FOUND;
+ return
CompletableFuture.completedFuture(ExpireResult.NOT_FOUND);
+ }
+ case EXIST -> {
+ // detach and expire the latest version immediately
+ return inboxClient.detach(DetachRequest.newBuilder()
+ .setReqId(reqId)
+ .setInboxId(userSessionId)
+ .setExpirySeconds(0)
+ .setDiscardLWT(true)
+ .setClient(clientInfo)
+ .setNow(HLC.INST.getPhysical())
+ .build())
+ .exceptionally(e -> {
+ log.debug("Failed to expire inbox", e);
+ return DetachReply.newBuilder()
+ .setReqId(reqId)
+ .setCode(DetachReply.Code.ERROR)
+ .build();
+ })
+ .thenApplyAsync(reply -> {
+ switch (reply.getCode()) {
+ case OK -> {
+ return ExpireResult.OK;
+ }
+ case NO_INBOX -> {
+ return ExpireResult.NOT_FOUND;
+ }
+ case TRY_LATER -> {
+ handleGoAway(
+ onInboxCallRetry(clientInfo,
"Inbox service call[expire] needs retry"));
+ return ExpireResult.ERROR;
+ }
+ case BACK_PRESSURE_REJECTED -> {
+
handleGoAway(onInboxCallBusy(clientInfo, "Inbox service call[expire] busy"));
+ return ExpireResult.ERROR;
+ }
+ default -> {
+
handleGoAway(onInboxCallError(clientInfo, "Inbox service call[expire] error"));
+ return ExpireResult.ERROR;
+ }
+ }
+ }, ctx.executor());
}
case TRY_LATER -> {
- handleGoAway(onInboxCallRetry(clientInfo, "Inbox
service call[expire] needs retry"));
- return ExpireResult.ERROR;
+ handleGoAway(onInboxCallError(clientInfo, "Inbox
service call[exist] needs retry"));
+ return
CompletableFuture.completedFuture(ExpireResult.ERROR);
}
case BACK_PRESSURE_REJECTED -> {
- handleGoAway(onInboxCallBusy(clientInfo, "Inbox
service call[expire] busy"));
- return ExpireResult.ERROR;
+ handleGoAway(onInboxCallError(clientInfo, "Inbox
service call[exist] needs busy"));
+ return
CompletableFuture.completedFuture(ExpireResult.ERROR);
}
default -> {
- handleGoAway(onInboxCallError(clientInfo, "Inbox
service call[expire] error"));
- return ExpireResult.ERROR;
+ handleGoAway(onInboxCallError(clientInfo, "Inbox
service call[exist] error"));
+ return
CompletableFuture.completedFuture(ExpireResult.ERROR);
}
}
}, ctx.executor());
-
}
protected abstract GoAway sanityCheck(MqttConnectMessage message);
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/BaseMQTTTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/BaseMQTTTest.java
index 98cdc822..ecc2021c 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/BaseMQTTTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/BaseMQTTTest.java
@@ -425,7 +425,7 @@ public abstract class BaseMQTTTest {
protected void setupTransientSession() {
mockAuthPass();
mockSessionReg();
- mockInboxDetach(DetachReply.Code.NO_INBOX);
+ mockInboxExist(false);
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true);
channel.writeInbound(connectMessage);
channel.runPendingTasks();
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ConnectHandlerTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ConnectHandlerTest.java
index 9a618f0c..787c74b2 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ConnectHandlerTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ConnectHandlerTest.java
@@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.mqtt.handler.v3;
@@ -30,9 +30,22 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.mqtt.MqttConnAckMessage;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
+import io.netty.handler.codec.mqtt.MqttMessageBuilders;
+import io.netty.handler.codec.mqtt.MqttVersion;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.bifromq.inbox.client.IInboxClient;
import org.apache.bifromq.inbox.rpc.proto.AttachReply;
import org.apache.bifromq.inbox.rpc.proto.DetachReply;
+import org.apache.bifromq.inbox.rpc.proto.ExistReply;
import org.apache.bifromq.mqtt.MockableTest;
import org.apache.bifromq.mqtt.handler.ChannelAttrs;
import org.apache.bifromq.mqtt.handler.v5.MQTT5MessageUtils;
@@ -49,22 +62,10 @@ import org.apache.bifromq.plugin.clientbalancer.Redirection;
import org.apache.bifromq.plugin.eventcollector.EventType;
import org.apache.bifromq.plugin.eventcollector.IEventCollector;
import
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.Redirect;
+import org.apache.bifromq.plugin.resourcethrottler.IResourceThrottler;
import org.apache.bifromq.plugin.settingprovider.ISettingProvider;
import org.apache.bifromq.plugin.settingprovider.Setting;
import org.apache.bifromq.type.ClientInfo;
-import org.apache.bifromq.plugin.resourcethrottler.IResourceThrottler;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.codec.mqtt.MqttConnAckMessage;
-import io.netty.handler.codec.mqtt.MqttConnectMessage;
-import io.netty.handler.codec.mqtt.MqttMessageBuilders;
-import io.netty.handler.codec.mqtt.MqttVersion;
-import java.net.InetSocketAddress;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import org.mockito.Mock;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -229,6 +230,9 @@ public class MQTT3ConnectHandlerTest extends MockableTest {
when(authProvider.checkPermission(any(ClientInfo.class),
argThat(MQTTAction::hasConn))).thenReturn(
CompletableFuture.completedFuture(
CheckResult.newBuilder().setGranted(Granted.getDefaultInstance()).build()));
+
when(inboxClient.exist(any())).thenReturn(CompletableFuture.completedFuture(ExistReply.newBuilder()
+ .setCode(ExistReply.Code.EXIST)
+ .build()));
when(inboxClient.detach(any())).thenReturn(CompletableFuture.completedFuture(DetachReply.newBuilder()
.setCode(DetachReply.Code.BACK_PRESSURE_REJECTED)
.build()));
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTConnectTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTConnectTest.java
index 30cac18d..038bc11d 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTConnectTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTConnectTest.java
@@ -59,7 +59,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
public void transientSessionWithoutInbox() {
mockAuthPass();
mockSessionReg();
- mockInboxDetach(DetachReply.Code.NO_INBOX);
+ mockInboxExist(false);
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true);
channel.writeInbound(connectMessage);
channel.runPendingTasks();
@@ -72,6 +72,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
public void transientSessionWithInbox() {
mockAuthPass();
mockSessionReg();
+ mockInboxExist(true);
mockInboxDetach(DetachReply.Code.OK);
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true);
channel.writeInbound(connectMessage);
@@ -86,6 +87,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
// clear failed
mockAuthPass();
mockSessionReg();
+ mockInboxExist(true);
mockInboxDetach(DetachReply.Code.ERROR);
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true);
channel.writeInbound(connectMessage);
@@ -161,6 +163,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
String attrVal = "attrVal";
mockAuthPass(attrKey, attrVal);
mockSessionReg();
+ mockInboxExist(true);
mockInboxDetach(DetachReply.Code.OK);
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true);
@@ -183,6 +186,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
String attrVal = "attrVal";
mockAuthPass(attrKey, attrVal);
mockSessionReg();
+ mockInboxExist(true);
mockInboxDetach(DetachReply.Code.OK);
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true);
@@ -191,7 +195,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
channel.runPendingTasks();
MqttConnAckMessage ackMessage = channel.readOutbound();
// verifications
- Assert.assertEquals(CONNECTION_ACCEPTED,
ackMessage.variableHeader().connectReturnCode());
+ Assert.assertEquals(ackMessage.variableHeader().connectReturnCode(),
CONNECTION_ACCEPTED);
ArgumentCaptor<ClientConnected> eventArgumentCaptor =
ArgumentCaptor.forClass(ClientConnected.class);
verify(eventCollector).report(eventArgumentCaptor.capture());
ClientConnected clientConnected = eventArgumentCaptor.getValue();
@@ -208,7 +212,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
channel.runPendingTasks();
MqttConnAckMessage ackMessage = channel.readOutbound();
// verifications
- Assert.assertEquals(CONNECTION_REFUSED_NOT_AUTHORIZED,
ackMessage.variableHeader().connectReturnCode());
+ Assert.assertEquals(ackMessage.variableHeader().connectReturnCode(),
CONNECTION_REFUSED_NOT_AUTHORIZED);
verifyEvent(EventType.NOT_AUTHORIZED_CLIENT);
}
@@ -221,8 +225,8 @@ public class MQTTConnectTest extends BaseMQTTTest {
channel.advanceTimeBy(disconnectDelay, TimeUnit.MILLISECONDS);
channel.runPendingTasks();
MqttConnAckMessage ackMessage = channel.readOutbound();
- Assert.assertEquals(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD,
- ackMessage.variableHeader().connectReturnCode());
+ Assert.assertEquals(ackMessage.variableHeader().connectReturnCode(),
+ CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
verifyEvent(EventType.UNAUTHENTICATED_CLIENT);
}
@@ -245,6 +249,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
public void validWillTopic() {
mockAuthPass();
mockSessionReg();
+ mockInboxExist(true);
mockInboxDetach(DetachReply.Code.OK);
MqttConnectMessage connectMessage =
MQTTMessageUtils.qoSWillMqttConnectMessage(1, true);
channel.writeInbound(connectMessage);
@@ -258,6 +263,7 @@ public class MQTTConnectTest extends BaseMQTTTest {
public void pingAndPingResp() {
mockAuthPass();
mockSessionReg();
+ mockInboxExist(true);
mockInboxDetach(DetachReply.Code.OK);
MqttConnectMessage connectMessage =
MQTTMessageUtils.qoSWillMqttConnectMessage(1, true);
channel.writeInbound(connectMessage);
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTDisconnectTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTDisconnectTest.java
index a8bb915c..6cf7db34 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTDisconnectTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTDisconnectTest.java
@@ -50,6 +50,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
public void transientSession() {
mockAuthPass();
mockSessionReg();
+ mockInboxExist(true);
mockInboxDetach(DetachReply.Code.OK);
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true);
channel.writeInbound(connectMessage);
@@ -86,7 +87,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
public void idle() {
mockAuthPass();
mockSessionReg();
- mockInboxDetach(DetachReply.Code.NO_INBOX);
+ mockInboxExist(false);
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true, "testClientId", 60);
channel.writeInbound(connectMessage);
channel.runPendingTasks();
@@ -102,7 +103,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
public void noKeepAlive() {
mockAuthPass();
mockSessionReg();
- mockInboxDetach(DetachReply.Code.NO_INBOX);
+ mockInboxExist(false);
// keepalive = 0
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true, "abc", 0);
@@ -121,7 +122,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
public void enforceMinKeepAlive() {
mockAuthPass();
mockSessionReg();
- mockInboxDetach(DetachReply.Code.NO_INBOX);
+ mockInboxExist(false);
// keepalive too short, least is 60s
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true, "abc", 1);
@@ -139,7 +140,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
public void connectTwice() {
mockAuthPass();
mockSessionReg();
- mockInboxDetach(DetachReply.Code.NO_INBOX);
+ mockInboxExist(false);
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true);
channel.writeInbound(connectMessage);
channel.runPendingTasks();
@@ -156,6 +157,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
public void disconnectByServer() {
mockAuthPass();
mockSessionReg();
+ mockInboxExist(true);
mockInboxDetach(DetachReply.Code.NO_INBOX);
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true);
channel.writeInbound(connectMessage);
@@ -171,7 +173,7 @@ public class MQTTDisconnectTest extends BaseMQTTTest {
public void badPacketAfterConnected() {
mockAuthPass();
mockSessionReg();
- mockInboxDetach(DetachReply.Code.NO_INBOX);
+ mockInboxExist(false);
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true);
channel.writeInbound(connectMessage);
channel.runPendingTasks();
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTKickTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTKickTest.java
index c4222639..7ceb61f3 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTKickTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTKickTest.java
@@ -27,7 +27,6 @@ import static
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEP
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
-import org.apache.bifromq.inbox.rpc.proto.DetachReply;
import org.apache.bifromq.mqtt.utils.MQTTMessageUtils;
import org.apache.bifromq.sessiondict.rpc.proto.ServerRedirection;
import org.apache.bifromq.type.ClientInfo;
@@ -40,11 +39,11 @@ public class MQTTKickTest extends BaseMQTTTest {
public void testKick() {
mockAuthPass();
mockSessionReg();
- mockInboxDetach(DetachReply.Code.NO_INBOX);
+ mockInboxExist(false);
MqttConnectMessage connectMessage =
MQTTMessageUtils.mqttConnectMessage(true);
channel.writeInbound(connectMessage);
MqttConnAckMessage ackMessage = channel.readOutbound();
- Assert.assertEquals(CONNECTION_ACCEPTED,
ackMessage.variableHeader().connectReturnCode());
+ Assert.assertEquals(ackMessage.variableHeader().connectReturnCode(),
CONNECTION_ACCEPTED);
// kick
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTWillMessageTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTWillMessageTest.java
index 50249e1a..941711e2 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTWillMessageTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTWillMessageTest.java
@@ -14,12 +14,13 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.mqtt.handler.v3;
+import static
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
import static
org.apache.bifromq.plugin.eventcollector.EventType.CLIENT_CONNECTED;
import static org.apache.bifromq.plugin.eventcollector.EventType.IDLE;
import static org.apache.bifromq.plugin.eventcollector.EventType.KICKED;
@@ -34,7 +35,6 @@ import static
org.apache.bifromq.plugin.eventcollector.EventType.WILL_DIST_ERROR
import static org.apache.bifromq.retain.rpc.proto.RetainReply.Result.CLEARED;
import static org.apache.bifromq.retain.rpc.proto.RetainReply.Result.ERROR;
import static org.apache.bifromq.retain.rpc.proto.RetainReply.Result.RETAINED;
-import static
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
@@ -42,13 +42,12 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
-import org.apache.bifromq.plugin.eventcollector.EventType;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bifromq.inbox.rpc.proto.DetachReply;
import org.apache.bifromq.mqtt.utils.MQTTMessageUtils;
+import org.apache.bifromq.plugin.eventcollector.EventType;
import org.apache.bifromq.sessiondict.rpc.proto.ServerRedirection;
import org.apache.bifromq.type.ClientInfo;
import org.testng.Assert;
@@ -189,7 +188,7 @@ public class MQTTWillMessageTest extends BaseMQTTTest {
protected void setupTransientSessionWithLWT(boolean willRetain) {
mockAuthPass();
mockSessionReg();
- mockInboxDetach(DetachReply.Code.NO_INBOX);
+ mockInboxExist(false);
MqttConnectMessage connectMessage;
if (!willRetain) {
connectMessage = MQTTMessageUtils.qoSWillMqttConnectMessage(1,
true);
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/EnhancedAuthTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/EnhancedAuthTest.java
index a73143df..60e0c5bf 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/EnhancedAuthTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/EnhancedAuthTest.java
@@ -29,20 +29,6 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-import org.apache.bifromq.plugin.authprovider.IAuthProvider;
-import org.apache.bifromq.plugin.authprovider.type.CheckResult;
-import org.apache.bifromq.plugin.authprovider.type.Continue;
-import org.apache.bifromq.plugin.authprovider.type.Failed;
-import org.apache.bifromq.plugin.authprovider.type.Granted;
-import org.apache.bifromq.plugin.authprovider.type.MQTT5ExtendedAuthData;
-import org.apache.bifromq.plugin.authprovider.type.MQTT5ExtendedAuthResult;
-import org.apache.bifromq.plugin.authprovider.type.MQTTAction;
-import org.apache.bifromq.plugin.authprovider.type.Success;
-import org.apache.bifromq.plugin.clientbalancer.IClientBalancer;
-import org.apache.bifromq.plugin.eventcollector.IEventCollector;
-import org.apache.bifromq.plugin.settingprovider.ISettingProvider;
-import org.apache.bifromq.plugin.settingprovider.Setting;
-import org.apache.bifromq.plugin.resourcethrottler.IResourceThrottler;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.ByteString;
import io.netty.channel.Channel;
@@ -66,6 +52,7 @@ import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.inbox.client.IInboxClient;
import org.apache.bifromq.inbox.rpc.proto.DetachReply;
+import org.apache.bifromq.inbox.rpc.proto.ExistReply;
import org.apache.bifromq.mqtt.MockableTest;
import org.apache.bifromq.mqtt.handler.ChannelAttrs;
import org.apache.bifromq.mqtt.handler.ConditionalRejectHandler;
@@ -77,6 +64,20 @@ import
org.apache.bifromq.mqtt.handler.v5.reason.MQTT5AuthReasonCode;
import org.apache.bifromq.mqtt.service.ILocalSessionRegistry;
import org.apache.bifromq.mqtt.service.LocalSessionRegistry;
import org.apache.bifromq.mqtt.session.MQTTSessionContext;
+import org.apache.bifromq.plugin.authprovider.IAuthProvider;
+import org.apache.bifromq.plugin.authprovider.type.CheckResult;
+import org.apache.bifromq.plugin.authprovider.type.Continue;
+import org.apache.bifromq.plugin.authprovider.type.Failed;
+import org.apache.bifromq.plugin.authprovider.type.Granted;
+import org.apache.bifromq.plugin.authprovider.type.MQTT5ExtendedAuthData;
+import org.apache.bifromq.plugin.authprovider.type.MQTT5ExtendedAuthResult;
+import org.apache.bifromq.plugin.authprovider.type.MQTTAction;
+import org.apache.bifromq.plugin.authprovider.type.Success;
+import org.apache.bifromq.plugin.clientbalancer.IClientBalancer;
+import org.apache.bifromq.plugin.eventcollector.IEventCollector;
+import org.apache.bifromq.plugin.resourcethrottler.IResourceThrottler;
+import org.apache.bifromq.plugin.settingprovider.ISettingProvider;
+import org.apache.bifromq.plugin.settingprovider.Setting;
import org.apache.bifromq.sessiondict.client.ISessionDictClient;
import org.apache.bifromq.type.ClientInfo;
import org.apache.bifromq.type.StringPair;
@@ -159,12 +160,21 @@ public class EnhancedAuthTest extends MockableTest {
@Test
public void testAuthSuccess() {
Mockito.reset(authProvider);
+
when(inboxClient.exist(any())).thenReturn(CompletableFuture.completedFuture(ExistReply.newBuilder()
+ .setCode(ExistReply.Code.NO_INBOX)
+ .build()));
when(authProvider.extendedAuth(any(MQTT5ExtendedAuthData.class))).thenReturn(CompletableFuture.completedFuture(
- MQTT5ExtendedAuthResult.newBuilder().setSuccess(
-
Success.newBuilder().setAuthData(ByteString.copyFromUtf8("hello")).setUserProps(
- UserProperties.newBuilder()
-
.addUserProperties(StringPair.newBuilder().setKey("key").setValue("val").build()).build())
- .build()).build()));
+ MQTT5ExtendedAuthResult.newBuilder()
+ .setSuccess(Success.newBuilder()
+ .setAuthData(ByteString.copyFromUtf8("hello"))
+ .setUserProps(UserProperties.newBuilder()
+ .addUserProperties(StringPair.newBuilder()
+ .setKey("key")
+ .setValue("val")
+ .build())
+ .build())
+ .build())
+ .build()));
when(authProvider.checkPermission(any(ClientInfo.class),
argThat(MQTTAction::hasConn))).thenReturn(
CompletableFuture.completedFuture(
CheckResult.newBuilder().setGranted(Granted.getDefaultInstance()).build()));
@@ -193,6 +203,9 @@ public class EnhancedAuthTest extends MockableTest {
@Test
public void testAuthSuccess2() {
+
when(inboxClient.exist(any())).thenReturn(CompletableFuture.completedFuture(ExistReply.newBuilder()
+ .setCode(ExistReply.Code.NO_INBOX)
+ .build()));
MqttConnectMessage connect =
MqttMessageBuilders.connect().clientId("client")
.protocolVersion(MqttVersion.MQTT_5).cleanSession(true).properties(
MQTT5MessageUtils.mqttProps().addAuthMethod("authMethod")
@@ -207,7 +220,8 @@ public class EnhancedAuthTest extends MockableTest {
.build()).build()));
channel.writeInbound(connect);
MqttMessage authMessage = channel.readOutbound();
- MqttProperties properties =
((MqttReasonCodeAndPropertiesVariableHeader)
authMessage.variableHeader()).properties();
+ MqttProperties properties =
+ ((MqttReasonCodeAndPropertiesVariableHeader)
authMessage.variableHeader()).properties();
String authMethod = authMethod(properties).orElseThrow();
ByteString authData =
MQTT5MessageUtils.authData(properties).orElseThrow();
assertEquals(authMethod, "authMethod");
@@ -245,7 +259,8 @@ public class EnhancedAuthTest extends MockableTest {
.setContinue(Continue.newBuilder().setAuthData(ByteString.copyFromUtf8(challenge)).build()).build()));
channel.writeInbound(connect);
MqttMessage authMessage = channel.readOutbound();
- MqttProperties properties =
((MqttReasonCodeAndPropertiesVariableHeader)
authMessage.variableHeader()).properties();
+ MqttProperties properties =
+ ((MqttReasonCodeAndPropertiesVariableHeader)
authMessage.variableHeader()).properties();
String authMethod = authMethod(properties).orElseThrow();
ByteString authData =
MQTT5MessageUtils.authData(properties).orElseThrow();
assertEquals(authMethod, "authMethod");
@@ -298,6 +313,9 @@ public class EnhancedAuthTest extends MockableTest {
@Test
public void testReAuth() {
Mockito.reset(authProvider);
+
when(inboxClient.exist(any())).thenReturn(CompletableFuture.completedFuture(ExistReply.newBuilder()
+ .setCode(ExistReply.Code.NO_INBOX)
+ .build()));
when(authProvider.extendedAuth(any(MQTT5ExtendedAuthData.class))).thenReturn(CompletableFuture.completedFuture(
MQTT5ExtendedAuthResult.newBuilder().setSuccess(
Success.newBuilder().setAuthData(ByteString.copyFromUtf8("hello")).setUserProps(
@@ -319,11 +337,13 @@ public class EnhancedAuthTest extends MockableTest {
MQTT5MessageUtils.mqttProps().addAuthMethod("authMethod")
.addAuthData(ByteString.copyFrom("reAuthData",
StandardCharsets.UTF_8)).build()).build());
MqttMessage reAuthMessage = channel.readOutbound();
- MqttReasonCodeAndPropertiesVariableHeader variableHeader =
((MqttReasonCodeAndPropertiesVariableHeader) reAuthMessage.variableHeader());
+ MqttReasonCodeAndPropertiesVariableHeader variableHeader =
+ ((MqttReasonCodeAndPropertiesVariableHeader)
reAuthMessage.variableHeader());
MQTT5AuthReasonCode reasonCode =
MQTT5AuthReasonCode.valueOf(variableHeader.reasonCode());
assertEquals(reasonCode, MQTT5AuthReasonCode.Success);
- MqttProperties mqttProperties =
((MqttReasonCodeAndPropertiesVariableHeader)
reAuthMessage.variableHeader()).properties();
+ MqttProperties mqttProperties =
+ ((MqttReasonCodeAndPropertiesVariableHeader)
reAuthMessage.variableHeader()).properties();
Optional<String> authMethodOpt = authMethod(mqttProperties);
Optional<ByteString> authDataOpt =
MQTT5MessageUtils.authData(mqttProperties);
assertTrue(authMethodOpt.isPresent());
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ConnectHandlerTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ConnectHandlerTest.java
index 04c98c25..465366da 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ConnectHandlerTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ConnectHandlerTest.java
@@ -371,6 +371,9 @@ public class MQTT5ConnectHandlerTest extends MockableTest {
when(authProvider.checkPermission(any(ClientInfo.class),
argThat(MQTTAction::hasConn))).thenReturn(
CompletableFuture.completedFuture(
CheckResult.newBuilder().setGranted(Granted.getDefaultInstance()).build()));
+
when(inboxClient.exist(any())).thenReturn(CompletableFuture.completedFuture(ExistReply.newBuilder()
+ .setCode(ExistReply.Code.EXIST)
+ .build()));
when(inboxClient.detach(any())).thenReturn(CompletableFuture.completedFuture(DetachReply.newBuilder()
.setCode(DetachReply.Code.BACK_PRESSURE_REJECTED)
.build()));