This is an automated email from the ASF dual-hosted git repository.
popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git
The following commit(s) were added to refs/heads/main by this push:
new c4a2df1c Adjust the backpressure strategy for MQTT 3 and MQTT 5. (#178)
c4a2df1c is described below
commit c4a2df1c433d9d0e5dffc0908ed07635b011fcac
Author: yuanyang <[email protected]>
AuthorDate: Wed Sep 24 11:58:38 2025 +0800
Adjust the backpressure strategy for MQTT 3 and MQTT 5. (#178)
https://cwiki.apache.org/confluence/display/BIFROMQ/BIP-003-Backpressure+Handling+Strategy+in+MQTT+Broker
---
.../mqtt/handler/v3/MQTT3ProtocolHelper.java | 30 +++---
.../mqtt/handler/v5/MQTT5ProtocolHelper.java | 53 ++++++---
.../v3/MQTT3TransientSessionHandlerTest.java | 109 +++++++++----------
.../handler/v5/TransientSessionHandlerTest.java | 119 +++++++++++----------
4 files changed, 168 insertions(+), 143 deletions(-)
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ProtocolHelper.java
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ProtocolHelper.java
index 2aab5bc4..0fb746f2 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ProtocolHelper.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ProtocolHelper.java
@@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.mqtt.handler.v3;
@@ -35,6 +35,7 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttReasonCodes;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
@@ -53,6 +54,7 @@ import org.apache.bifromq.mqtt.handler.record.SubTask;
import org.apache.bifromq.mqtt.handler.record.SubTasks;
import org.apache.bifromq.mqtt.spi.IUserPropsCustomizer;
import org.apache.bifromq.plugin.authprovider.type.CheckResult;
+import org.apache.bifromq.plugin.eventcollector.IEventCollector;
import org.apache.bifromq.plugin.eventcollector.OutOfTenantResource;
import
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.BadPacket;
import
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.ByServer;
@@ -196,9 +198,13 @@ public class MQTT3ProtocolHelper implements
IMQTTProtocolHelper {
@Override
public ProtocolResponse onSubBackPressured(MqttSubscribeMessage
subMessage) {
- return goAway((getLocal(ServerBusy.class)
- .reason("Too many subscribe")
- .clientInfo(clientInfo)));
+ return response(MqttMessageBuilders.subAck()
+ .packetId(subMessage.variableHeader().messageId())
+ .addGrantedQos(MqttQoS.FAILURE)
+ .build(),
+ getLocal(ServerBusy.class)
+ .reason("Too many subscribe")
+ .clientInfo(clientInfo));
}
@Override
@@ -253,9 +259,7 @@ public class MQTT3ProtocolHelper implements
IMQTTProtocolHelper {
@Override
public ProtocolResponse onUnsubBackPressured(MqttUnsubscribeMessage
unsubMessage) {
- return goAway((getLocal(ServerBusy.class)
- .reason("Too many unsubscribe")
- .clientInfo(clientInfo)));
+ return responseNothing(getLocal(ServerBusy.class).reason("Too many
unsubscribe").clientInfo(clientInfo));
}
@Override
@@ -396,9 +400,7 @@ public class MQTT3ProtocolHelper implements
IMQTTProtocolHelper {
String reason = result.retainResult() ==
RetainReply.Result.BACK_PRESSURE_REJECTED
? "Too many retained qos0 publish"
: "Too many qos0 publish";
- return goAway(getLocal(ServerBusy.class)
- .reason(reason)
- .clientInfo(clientInfo));
+ return
responseNothing(getLocal(ServerBusy.class).reason(reason).clientInfo(clientInfo));
} else {
return responseNothing();
}
@@ -425,9 +427,7 @@ public class MQTT3ProtocolHelper implements
IMQTTProtocolHelper {
String reason = result.retainResult() ==
RetainReply.Result.BACK_PRESSURE_REJECTED
? "Too many retained qos1 publish"
: "Too many qos1 publish";
- return goAway(getLocal(ServerBusy.class)
- .reason(reason)
- .clientInfo(clientInfo));
+ return
responseNothing(getLocal(ServerBusy.class).reason(reason).clientInfo(clientInfo));
} else {
if (settings.debugMode) {
return response(MqttMessageBuilders.pubAck()
@@ -468,9 +468,7 @@ public class MQTT3ProtocolHelper implements
IMQTTProtocolHelper {
String reason = result.retainResult() ==
RetainReply.Result.BACK_PRESSURE_REJECTED
? "Too many retained qos2 publish"
: "Too many qos2 publish";
- return goAway(getLocal(ServerBusy.class)
- .reason(reason)
- .clientInfo(clientInfo));
+ return
responseNothing(getLocal(ServerBusy.class).reason(reason).clientInfo(clientInfo));
} else {
if (settings.debugMode) {
return response(MQTT3MessageBuilders.pubRec()
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ProtocolHelper.java
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ProtocolHelper.java
index e9e28f7d..d8485c9e 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ProtocolHelper.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ProtocolHelper.java
@@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.mqtt.handler.v5;
@@ -23,6 +23,7 @@ import static
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUS
import static org.apache.bifromq.mqtt.handler.record.ProtocolResponse.farewell;
import static
org.apache.bifromq.mqtt.handler.record.ProtocolResponse.farewellNow;
import static org.apache.bifromq.mqtt.handler.record.ProtocolResponse.response;
+import static
org.apache.bifromq.mqtt.handler.record.ProtocolResponse.responseNothing;
import static
org.apache.bifromq.mqtt.handler.v5.MQTT5MessageUtils.isUTF8Payload;
import static
org.apache.bifromq.mqtt.handler.v5.MQTT5MessageUtils.messageExpiryInterval;
import static
org.apache.bifromq.mqtt.handler.v5.MQTT5MessageUtils.receiveMaximum;
@@ -73,6 +74,7 @@ import org.apache.bifromq.mqtt.spi.IUserPropsCustomizer;
import org.apache.bifromq.mqtt.spi.UserProperty;
import org.apache.bifromq.plugin.authprovider.type.CheckResult;
import org.apache.bifromq.plugin.eventcollector.Event;
+import org.apache.bifromq.plugin.eventcollector.IEventCollector;
import org.apache.bifromq.plugin.eventcollector.OutOfTenantResource;
import
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.BadPacket;
import
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.ByServer;
@@ -295,9 +297,15 @@ public class MQTT5ProtocolHelper implements
IMQTTProtocolHelper {
@Override
public ProtocolResponse onSubBackPressured(MqttSubscribeMessage
subMessage) {
- return
farewell(MQTT5MessageBuilders.disconnect().reasonCode(MQTT5DisconnectReasonCode.ServerBusy)
+ return response(MQTT5MessageBuilders
+ .subAck()
+ .packetId(subMessage.variableHeader().messageId())
+ .reasonCodes(MQTT5SubAckReasonCode.ImplementationSpecificError)
.reasonString("Too many subscribe").build(),
- getLocal(ServerBusy.class).reason("Too many
subscribe").clientInfo(clientInfo));
+ getLocal(ServerBusy.class)
+ .reason("Too many subscribe")
+ .clientInfo(clientInfo));
+
}
@Override
@@ -373,9 +381,15 @@ public class MQTT5ProtocolHelper implements
IMQTTProtocolHelper {
@Override
public ProtocolResponse onUnsubBackPressured(MqttUnsubscribeMessage
unsubMessage) {
- return
farewell(MQTT5MessageBuilders.disconnect().reasonCode(MQTT5DisconnectReasonCode.ServerBusy)
+ return response(MQTT5MessageBuilders
+ .unsubAck()
+ .packetId(unsubMessage.variableHeader().messageId())
+
.addReasonCode(MQTT5UnsubAckReasonCode.ImplementationSpecificError)
.reasonString("Too many unsubscribe").build(),
- getLocal(ServerBusy.class).reason("Too many
unsubscribe").clientInfo(clientInfo));
+ getLocal(ServerBusy.class)
+ .reason("Too many unsubscribe")
+ .clientInfo(clientInfo));
+
}
@Override
@@ -719,9 +733,10 @@ public class MQTT5ProtocolHelper implements
IMQTTProtocolHelper {
String reason =
result.retainResult() ==
RetainReply.Result.BACK_PRESSURE_REJECTED ? "Too many retained qos0 publish" :
"Too many qos0 publish";
- return farewell(
-
MQTT5MessageBuilders.disconnect().reasonCode(MQTT5DisconnectReasonCode.ServerBusy).reasonString(reason)
- .userProps(userProps).build(),
getLocal(ServerBusy.class).reason(reason).clientInfo(clientInfo));
+ return ProtocolResponse.responseNothing(
+ getLocal(ServerBusy.class)
+ .reason(reason)
+ .clientInfo(clientInfo));
} else {
return ProtocolResponse.responseNothing();
}
@@ -763,9 +778,14 @@ public class MQTT5ProtocolHelper implements
IMQTTProtocolHelper {
String reason =
result.retainResult() ==
RetainReply.Result.BACK_PRESSURE_REJECTED ? "Too many retained qos1 publish" :
"Too many qos1 publish";
- return farewell(
-
MQTT5MessageBuilders.disconnect().reasonCode(MQTT5DisconnectReasonCode.ServerBusy).reasonString(reason)
- .build(),
getLocal(ServerBusy.class).reason(reason).clientInfo(clientInfo));
+ return response(MQTT5MessageBuilders.pubAck(requestProblemInfo)
+ .packetId(message.variableHeader().packetId())
+
.reasonCode(MQTT5PubAckReasonCode.ImplementationSpecificError)
+ .reasonString(reason)
+ .userProps(userProps).build(),
+ getLocal(ServerBusy.class)
+ .reason(reason)
+ .clientInfo(clientInfo));
}
int packetId = message.variableHeader().packetId();
Event<?>[] debugEvents;
@@ -836,9 +856,14 @@ public class MQTT5ProtocolHelper implements
IMQTTProtocolHelper {
String reason =
result.retainResult() ==
RetainReply.Result.BACK_PRESSURE_REJECTED ? "Too many retained qos2 publish" :
"Too many qos2 publish";
- return farewell(
-
MQTT5MessageBuilders.disconnect().reasonCode(MQTT5DisconnectReasonCode.ServerBusy).reasonString(reason)
- .build(),
getLocal(ServerBusy.class).reason(reason).clientInfo(clientInfo));
+ return response(MQTT5MessageBuilders.pubRec(requestProblemInfo)
+ .packetId(message.variableHeader().packetId())
+
.reasonCode(MQTT5PubRecReasonCode.ImplementationSpecificError)
+ .reasonString(reason)
+ .userProps(userProps).build(),
+ getLocal(ServerBusy.class)
+ .reason(reason)
+ .clientInfo(clientInfo));
}
int packetId = message.variableHeader().packetId();
Event<?>[] debugEvents;
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java
index 5cb0385d..bf108937 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java
@@ -20,6 +20,57 @@
package org.apache.bifromq.mqtt.handler.v3;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.mqtt.MqttDecoder;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttSubAckMessage;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
+import io.netty.handler.traffic.ChannelTrafficShapingHandler;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bifromq.basehlc.HLC;
+import org.apache.bifromq.dist.client.PubResult;
+import org.apache.bifromq.mqtt.handler.BaseSessionHandlerTest;
+import org.apache.bifromq.mqtt.handler.ChannelAttrs;
+import org.apache.bifromq.mqtt.handler.TenantSettings;
+import org.apache.bifromq.mqtt.session.IMQTTTransientSession;
+import org.apache.bifromq.mqtt.utils.MQTTMessageUtils;
+import org.apache.bifromq.plugin.authprovider.type.CheckResult;
+import org.apache.bifromq.plugin.authprovider.type.Granted;
+import org.apache.bifromq.plugin.authprovider.type.MQTTAction;
+import
org.apache.bifromq.plugin.eventcollector.mqttbroker.pushhandling.QoS1Confirmed;
+import
org.apache.bifromq.plugin.eventcollector.mqttbroker.pushhandling.QoS2Confirmed;
+import org.apache.bifromq.type.ClientInfo;
+import org.apache.bifromq.type.Message;
+import org.apache.bifromq.type.QoS;
+import org.apache.bifromq.type.TopicMessagePack;
+import org.mockito.ArgumentCaptor;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static io.netty.handler.codec.mqtt.MqttMessageType.PUBREL;
import static org.apache.bifromq.mqtt.handler.MQTTSessionIdUtil.userSessionId;
import static org.apache.bifromq.plugin.eventcollector.EventType.DISCARD;
import static
org.apache.bifromq.plugin.eventcollector.EventType.EXCEED_RECEIVING_LIMIT;
@@ -65,7 +116,6 @@ import static
org.apache.bifromq.type.MQTTClientInfoConstants.MQTT_CLIENT_ID_KEY
import static
org.apache.bifromq.type.MQTTClientInfoConstants.MQTT_PROTOCOL_VER_KEY;
import static org.apache.bifromq.type.MQTTClientInfoConstants.MQTT_TYPE_VALUE;
import static org.apache.bifromq.type.MQTTClientInfoConstants.MQTT_USER_ID_KEY;
-import static io.netty.handler.codec.mqtt.MqttMessageType.PUBREL;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -82,55 +132,6 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
-import org.apache.bifromq.basehlc.HLC;
-import org.apache.bifromq.dist.client.PubResult;
-import org.apache.bifromq.mqtt.handler.BaseSessionHandlerTest;
-import org.apache.bifromq.mqtt.handler.ChannelAttrs;
-import org.apache.bifromq.mqtt.handler.TenantSettings;
-import org.apache.bifromq.mqtt.session.IMQTTTransientSession;
-import org.apache.bifromq.mqtt.utils.MQTTMessageUtils;
-import org.apache.bifromq.plugin.authprovider.type.CheckResult;
-import org.apache.bifromq.plugin.authprovider.type.Granted;
-import org.apache.bifromq.plugin.authprovider.type.MQTTAction;
-import
org.apache.bifromq.plugin.eventcollector.mqttbroker.pushhandling.QoS1Confirmed;
-import
org.apache.bifromq.plugin.eventcollector.mqttbroker.pushhandling.QoS2Confirmed;
-import org.apache.bifromq.type.ClientInfo;
-import org.apache.bifromq.type.Message;
-import org.apache.bifromq.type.QoS;
-import org.apache.bifromq.type.TopicMessagePack;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.codec.mqtt.MqttDecoder;
-import io.netty.handler.codec.mqtt.MqttMessage;
-import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
-import io.netty.handler.codec.mqtt.MqttMessageType;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
-import io.netty.handler.codec.mqtt.MqttSubAckMessage;
-import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
-import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
-import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
-import io.netty.handler.traffic.ChannelTrafficShapingHandler;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.mockito.ArgumentCaptor;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
@Slf4j
public class MQTT3TransientSessionHandlerTest extends BaseSessionHandlerTest {
@@ -416,8 +417,8 @@ public class MQTT3TransientSessionHandlerTest extends
BaseSessionHandlerTest {
channel.writeInbound(message);
channel.advanceTimeBy(6, TimeUnit.SECONDS);
channel.runScheduledPendingTasks();
- assertFalse(channel.isOpen());
- verifyEvent(MQTT_SESSION_START, QOS1_DIST_ERROR, SERVER_BUSY,
MQTT_SESSION_STOP);
+ assertTrue(channel.isOpen());
+ verifyEvent(MQTT_SESSION_START, QOS1_DIST_ERROR, SERVER_BUSY);
}
@Test
@@ -487,8 +488,8 @@ public class MQTT3TransientSessionHandlerTest extends
BaseSessionHandlerTest {
channel.writeInbound(message);
channel.advanceTimeBy(6, TimeUnit.SECONDS);
channel.runScheduledPendingTasks();
- assertFalse(channel.isOpen());
- verifyEvent(MQTT_SESSION_START, QOS2_DIST_ERROR, SERVER_BUSY,
MQTT_SESSION_STOP);
+ assertTrue(channel.isOpen());
+ verifyEvent(MQTT_SESSION_START, QOS2_DIST_ERROR, SERVER_BUSY);
}
@Test
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/TransientSessionHandlerTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/TransientSessionHandlerTest.java
index 24251915..2328b216 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/TransientSessionHandlerTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/TransientSessionHandlerTest.java
@@ -20,6 +20,62 @@
package org.apache.bifromq.mqtt.handler.v5;
+import com.google.protobuf.ByteString;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.mqtt.MqttDecoder;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageBuilders;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttProperties;
+import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
+import io.netty.handler.codec.mqtt.MqttSubAckMessage;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
+import io.netty.handler.codec.mqtt.MqttVersion;
+import io.netty.handler.traffic.ChannelTrafficShapingHandler;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bifromq.basehlc.HLC;
+import org.apache.bifromq.dist.client.PubResult;
+import org.apache.bifromq.mqtt.handler.BaseSessionHandlerTest;
+import org.apache.bifromq.mqtt.handler.ChannelAttrs;
+import org.apache.bifromq.mqtt.handler.TenantSettings;
+import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5DisconnectReasonCode;
+import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5PubAckReasonCode;
+import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5PubRecReasonCode;
+import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5SubAckReasonCode;
+import org.apache.bifromq.mqtt.session.IMQTTTransientSession;
+import org.apache.bifromq.mqtt.utils.MQTTMessageUtils;
+import org.apache.bifromq.plugin.authprovider.type.CheckResult;
+import org.apache.bifromq.plugin.authprovider.type.Granted;
+import org.apache.bifromq.plugin.authprovider.type.MQTTAction;
+import
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.ExceedReceivingLimit;
+import org.apache.bifromq.type.ClientInfo;
+import org.apache.bifromq.type.MQTTClientInfoConstants;
+import org.apache.bifromq.type.Message;
+import org.apache.bifromq.type.QoS;
+import org.apache.bifromq.type.TopicMessagePack;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
import static io.netty.handler.codec.mqtt.MqttMessageType.DISCONNECT;
import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
import static io.netty.handler.codec.mqtt.MqttMessageType.PUBREC;
@@ -81,61 +137,6 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
-import com.google.protobuf.ByteString;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.codec.mqtt.MqttDecoder;
-import io.netty.handler.codec.mqtt.MqttMessage;
-import io.netty.handler.codec.mqtt.MqttMessageBuilders;
-import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
-import io.netty.handler.codec.mqtt.MqttMessageType;
-import io.netty.handler.codec.mqtt.MqttProperties;
-import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
-import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
-import io.netty.handler.codec.mqtt.MqttSubAckMessage;
-import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
-import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
-import io.netty.handler.codec.mqtt.MqttVersion;
-import io.netty.handler.traffic.ChannelTrafficShapingHandler;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bifromq.basehlc.HLC;
-import org.apache.bifromq.dist.client.PubResult;
-import org.apache.bifromq.mqtt.handler.BaseSessionHandlerTest;
-import org.apache.bifromq.mqtt.handler.ChannelAttrs;
-import org.apache.bifromq.mqtt.handler.TenantSettings;
-import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5DisconnectReasonCode;
-import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5PubAckReasonCode;
-import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5PubRecReasonCode;
-import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5SubAckReasonCode;
-import org.apache.bifromq.mqtt.session.IMQTTTransientSession;
-import org.apache.bifromq.mqtt.utils.MQTTMessageUtils;
-import org.apache.bifromq.plugin.authprovider.type.CheckResult;
-import org.apache.bifromq.plugin.authprovider.type.Granted;
-import org.apache.bifromq.plugin.authprovider.type.MQTTAction;
-import
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.ExceedReceivingLimit;
-import org.apache.bifromq.type.ClientInfo;
-import org.apache.bifromq.type.MQTTClientInfoConstants;
-import org.apache.bifromq.type.Message;
-import org.apache.bifromq.type.QoS;
-import org.apache.bifromq.type.TopicMessagePack;
-import org.mockito.ArgumentCaptor;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
@Slf4j
public class TransientSessionHandlerTest extends BaseSessionHandlerTest {
@@ -425,8 +426,8 @@ public class TransientSessionHandlerTest extends
BaseSessionHandlerTest {
channel.advanceTimeBy(6, TimeUnit.SECONDS);
channel.runScheduledPendingTasks();
channel.runPendingTasks();
- assertFalse(channel.isOpen());
- verifyEvent(MQTT_SESSION_START, QOS1_DIST_ERROR, SERVER_BUSY,
MQTT_SESSION_STOP);
+ assertTrue(channel.isOpen());
+ verifyEvent(MQTT_SESSION_START, QOS1_DIST_ERROR, SERVER_BUSY);
}
@Test
@@ -502,8 +503,8 @@ public class TransientSessionHandlerTest extends
BaseSessionHandlerTest {
channel.advanceTimeBy(6, TimeUnit.SECONDS);
channel.runScheduledPendingTasks();
channel.runPendingTasks();
- assertFalse(channel.isOpen());
- verifyEvent(MQTT_SESSION_START, QOS2_DIST_ERROR, SERVER_BUSY,
MQTT_SESSION_STOP);
+ assertTrue(channel.isOpen());
+ verifyEvent(MQTT_SESSION_START, QOS2_DIST_ERROR, SERVER_BUSY);
}
@Test