This is an automated email from the ASF dual-hosted git repository.

popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git


The following commit(s) were added to refs/heads/main by this push:
     new c4a2df1c Adjust the backpressure strategy for MQTT 3 and MQTT 5. (#178)
c4a2df1c is described below

commit c4a2df1c433d9d0e5dffc0908ed07635b011fcac
Author: yuanyang <[email protected]>
AuthorDate: Wed Sep 24 11:58:38 2025 +0800

    Adjust the backpressure strategy for MQTT 3 and MQTT 5. (#178)
    
    
https://cwiki.apache.org/confluence/display/BIFROMQ/BIP-003-Backpressure+Handling+Strategy+in+MQTT+Broker
---
 .../mqtt/handler/v3/MQTT3ProtocolHelper.java       |  30 +++---
 .../mqtt/handler/v5/MQTT5ProtocolHelper.java       |  53 ++++++---
 .../v3/MQTT3TransientSessionHandlerTest.java       | 109 +++++++++----------
 .../handler/v5/TransientSessionHandlerTest.java    | 119 +++++++++++----------
 4 files changed, 168 insertions(+), 143 deletions(-)

diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ProtocolHelper.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ProtocolHelper.java
index 2aab5bc4..0fb746f2 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ProtocolHelper.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3ProtocolHelper.java
@@ -14,7 +14,7 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.mqtt.handler.v3;
@@ -35,6 +35,7 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders;
 import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttReasonCodes;
 import io.netty.handler.codec.mqtt.MqttSubAckMessage;
 import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
@@ -53,6 +54,7 @@ import org.apache.bifromq.mqtt.handler.record.SubTask;
 import org.apache.bifromq.mqtt.handler.record.SubTasks;
 import org.apache.bifromq.mqtt.spi.IUserPropsCustomizer;
 import org.apache.bifromq.plugin.authprovider.type.CheckResult;
+import org.apache.bifromq.plugin.eventcollector.IEventCollector;
 import org.apache.bifromq.plugin.eventcollector.OutOfTenantResource;
 import 
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.BadPacket;
 import 
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.ByServer;
@@ -196,9 +198,13 @@ public class MQTT3ProtocolHelper implements 
IMQTTProtocolHelper {
 
     @Override
     public ProtocolResponse onSubBackPressured(MqttSubscribeMessage 
subMessage) {
-        return goAway((getLocal(ServerBusy.class)
-            .reason("Too many subscribe")
-            .clientInfo(clientInfo)));
+        return response(MqttMessageBuilders.subAck()
+                .packetId(subMessage.variableHeader().messageId())
+                .addGrantedQos(MqttQoS.FAILURE)
+                .build(),
+            getLocal(ServerBusy.class)
+                .reason("Too many subscribe")
+                .clientInfo(clientInfo));
     }
 
     @Override
@@ -253,9 +259,7 @@ public class MQTT3ProtocolHelper implements 
IMQTTProtocolHelper {
 
     @Override
     public ProtocolResponse onUnsubBackPressured(MqttUnsubscribeMessage 
unsubMessage) {
-        return goAway((getLocal(ServerBusy.class)
-            .reason("Too many unsubscribe")
-            .clientInfo(clientInfo)));
+        return responseNothing(getLocal(ServerBusy.class).reason("Too many 
unsubscribe").clientInfo(clientInfo));
     }
 
     @Override
@@ -396,9 +400,7 @@ public class MQTT3ProtocolHelper implements 
IMQTTProtocolHelper {
             String reason = result.retainResult() == 
RetainReply.Result.BACK_PRESSURE_REJECTED
                 ? "Too many retained qos0 publish"
                 : "Too many qos0 publish";
-            return goAway(getLocal(ServerBusy.class)
-                .reason(reason)
-                .clientInfo(clientInfo));
+            return 
responseNothing(getLocal(ServerBusy.class).reason(reason).clientInfo(clientInfo));
         } else {
             return responseNothing();
         }
@@ -425,9 +427,7 @@ public class MQTT3ProtocolHelper implements 
IMQTTProtocolHelper {
             String reason = result.retainResult() == 
RetainReply.Result.BACK_PRESSURE_REJECTED
                 ? "Too many retained qos1 publish"
                 : "Too many qos1 publish";
-            return goAway(getLocal(ServerBusy.class)
-                .reason(reason)
-                .clientInfo(clientInfo));
+            return 
responseNothing(getLocal(ServerBusy.class).reason(reason).clientInfo(clientInfo));
         } else {
             if (settings.debugMode) {
                 return response(MqttMessageBuilders.pubAck()
@@ -468,9 +468,7 @@ public class MQTT3ProtocolHelper implements 
IMQTTProtocolHelper {
             String reason = result.retainResult() == 
RetainReply.Result.BACK_PRESSURE_REJECTED
                 ? "Too many retained qos2 publish"
                 : "Too many qos2 publish";
-            return goAway(getLocal(ServerBusy.class)
-                .reason(reason)
-                .clientInfo(clientInfo));
+            return 
responseNothing(getLocal(ServerBusy.class).reason(reason).clientInfo(clientInfo));
         } else {
             if (settings.debugMode) {
                 return response(MQTT3MessageBuilders.pubRec()
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ProtocolHelper.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ProtocolHelper.java
index e9e28f7d..d8485c9e 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ProtocolHelper.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5ProtocolHelper.java
@@ -14,7 +14,7 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.mqtt.handler.v5;
@@ -23,6 +23,7 @@ import static 
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUS
 import static org.apache.bifromq.mqtt.handler.record.ProtocolResponse.farewell;
 import static 
org.apache.bifromq.mqtt.handler.record.ProtocolResponse.farewellNow;
 import static org.apache.bifromq.mqtt.handler.record.ProtocolResponse.response;
+import static 
org.apache.bifromq.mqtt.handler.record.ProtocolResponse.responseNothing;
 import static 
org.apache.bifromq.mqtt.handler.v5.MQTT5MessageUtils.isUTF8Payload;
 import static 
org.apache.bifromq.mqtt.handler.v5.MQTT5MessageUtils.messageExpiryInterval;
 import static 
org.apache.bifromq.mqtt.handler.v5.MQTT5MessageUtils.receiveMaximum;
@@ -73,6 +74,7 @@ import org.apache.bifromq.mqtt.spi.IUserPropsCustomizer;
 import org.apache.bifromq.mqtt.spi.UserProperty;
 import org.apache.bifromq.plugin.authprovider.type.CheckResult;
 import org.apache.bifromq.plugin.eventcollector.Event;
+import org.apache.bifromq.plugin.eventcollector.IEventCollector;
 import org.apache.bifromq.plugin.eventcollector.OutOfTenantResource;
 import 
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.BadPacket;
 import 
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.ByServer;
@@ -295,9 +297,15 @@ public class MQTT5ProtocolHelper implements 
IMQTTProtocolHelper {
 
     @Override
     public ProtocolResponse onSubBackPressured(MqttSubscribeMessage 
subMessage) {
-        return 
farewell(MQTT5MessageBuilders.disconnect().reasonCode(MQTT5DisconnectReasonCode.ServerBusy)
+        return response(MQTT5MessageBuilders
+                .subAck()
+                .packetId(subMessage.variableHeader().messageId())
+                .reasonCodes(MQTT5SubAckReasonCode.ImplementationSpecificError)
                 .reasonString("Too many subscribe").build(),
-            getLocal(ServerBusy.class).reason("Too many 
subscribe").clientInfo(clientInfo));
+            getLocal(ServerBusy.class)
+                .reason("Too many subscribe")
+                .clientInfo(clientInfo));
+
     }
 
     @Override
@@ -373,9 +381,15 @@ public class MQTT5ProtocolHelper implements 
IMQTTProtocolHelper {
 
     @Override
     public ProtocolResponse onUnsubBackPressured(MqttUnsubscribeMessage 
unsubMessage) {
-        return 
farewell(MQTT5MessageBuilders.disconnect().reasonCode(MQTT5DisconnectReasonCode.ServerBusy)
+        return response(MQTT5MessageBuilders
+                .unsubAck()
+                .packetId(unsubMessage.variableHeader().messageId())
+                
.addReasonCode(MQTT5UnsubAckReasonCode.ImplementationSpecificError)
                 .reasonString("Too many unsubscribe").build(),
-            getLocal(ServerBusy.class).reason("Too many 
unsubscribe").clientInfo(clientInfo));
+            getLocal(ServerBusy.class)
+                .reason("Too many unsubscribe")
+                .clientInfo(clientInfo));
+
     }
 
     @Override
@@ -719,9 +733,10 @@ public class MQTT5ProtocolHelper implements 
IMQTTProtocolHelper {
             String reason =
                 result.retainResult() == 
RetainReply.Result.BACK_PRESSURE_REJECTED ? "Too many retained qos0 publish" :
                     "Too many qos0 publish";
-            return farewell(
-                
MQTT5MessageBuilders.disconnect().reasonCode(MQTT5DisconnectReasonCode.ServerBusy).reasonString(reason)
-                    .userProps(userProps).build(), 
getLocal(ServerBusy.class).reason(reason).clientInfo(clientInfo));
+            return ProtocolResponse.responseNothing(
+                getLocal(ServerBusy.class)
+                    .reason(reason)
+                    .clientInfo(clientInfo));
         } else {
             return ProtocolResponse.responseNothing();
         }
@@ -763,9 +778,14 @@ public class MQTT5ProtocolHelper implements 
IMQTTProtocolHelper {
             String reason =
                 result.retainResult() == 
RetainReply.Result.BACK_PRESSURE_REJECTED ? "Too many retained qos1 publish" :
                     "Too many qos1 publish";
-            return farewell(
-                
MQTT5MessageBuilders.disconnect().reasonCode(MQTT5DisconnectReasonCode.ServerBusy).reasonString(reason)
-                    .build(), 
getLocal(ServerBusy.class).reason(reason).clientInfo(clientInfo));
+            return response(MQTT5MessageBuilders.pubAck(requestProblemInfo)
+                    .packetId(message.variableHeader().packetId())
+                    
.reasonCode(MQTT5PubAckReasonCode.ImplementationSpecificError)
+                    .reasonString(reason)
+                    .userProps(userProps).build(),
+                getLocal(ServerBusy.class)
+                    .reason(reason)
+                    .clientInfo(clientInfo));
         }
         int packetId = message.variableHeader().packetId();
         Event<?>[] debugEvents;
@@ -836,9 +856,14 @@ public class MQTT5ProtocolHelper implements 
IMQTTProtocolHelper {
             String reason =
                 result.retainResult() == 
RetainReply.Result.BACK_PRESSURE_REJECTED ? "Too many retained qos2 publish" :
                     "Too many qos2 publish";
-            return farewell(
-                
MQTT5MessageBuilders.disconnect().reasonCode(MQTT5DisconnectReasonCode.ServerBusy).reasonString(reason)
-                    .build(), 
getLocal(ServerBusy.class).reason(reason).clientInfo(clientInfo));
+            return response(MQTT5MessageBuilders.pubRec(requestProblemInfo)
+                    .packetId(message.variableHeader().packetId())
+                    
.reasonCode(MQTT5PubRecReasonCode.ImplementationSpecificError)
+                    .reasonString(reason)
+                    .userProps(userProps).build(),
+                getLocal(ServerBusy.class)
+                    .reason(reason)
+                    .clientInfo(clientInfo));
         }
         int packetId = message.variableHeader().packetId();
         Event<?>[] debugEvents;
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java
index 5cb0385d..bf108937 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java
@@ -20,6 +20,57 @@
 package org.apache.bifromq.mqtt.handler.v3;
 
 
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.mqtt.MqttDecoder;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttSubAckMessage;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
+import io.netty.handler.traffic.ChannelTrafficShapingHandler;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bifromq.basehlc.HLC;
+import org.apache.bifromq.dist.client.PubResult;
+import org.apache.bifromq.mqtt.handler.BaseSessionHandlerTest;
+import org.apache.bifromq.mqtt.handler.ChannelAttrs;
+import org.apache.bifromq.mqtt.handler.TenantSettings;
+import org.apache.bifromq.mqtt.session.IMQTTTransientSession;
+import org.apache.bifromq.mqtt.utils.MQTTMessageUtils;
+import org.apache.bifromq.plugin.authprovider.type.CheckResult;
+import org.apache.bifromq.plugin.authprovider.type.Granted;
+import org.apache.bifromq.plugin.authprovider.type.MQTTAction;
+import 
org.apache.bifromq.plugin.eventcollector.mqttbroker.pushhandling.QoS1Confirmed;
+import 
org.apache.bifromq.plugin.eventcollector.mqttbroker.pushhandling.QoS2Confirmed;
+import org.apache.bifromq.type.ClientInfo;
+import org.apache.bifromq.type.Message;
+import org.apache.bifromq.type.QoS;
+import org.apache.bifromq.type.TopicMessagePack;
+import org.mockito.ArgumentCaptor;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static io.netty.handler.codec.mqtt.MqttMessageType.PUBREL;
 import static org.apache.bifromq.mqtt.handler.MQTTSessionIdUtil.userSessionId;
 import static org.apache.bifromq.plugin.eventcollector.EventType.DISCARD;
 import static 
org.apache.bifromq.plugin.eventcollector.EventType.EXCEED_RECEIVING_LIMIT;
@@ -65,7 +116,6 @@ import static 
org.apache.bifromq.type.MQTTClientInfoConstants.MQTT_CLIENT_ID_KEY
 import static 
org.apache.bifromq.type.MQTTClientInfoConstants.MQTT_PROTOCOL_VER_KEY;
 import static org.apache.bifromq.type.MQTTClientInfoConstants.MQTT_TYPE_VALUE;
 import static org.apache.bifromq.type.MQTTClientInfoConstants.MQTT_USER_ID_KEY;
-import static io.netty.handler.codec.mqtt.MqttMessageType.PUBREL;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -82,55 +132,6 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
-import org.apache.bifromq.basehlc.HLC;
-import org.apache.bifromq.dist.client.PubResult;
-import org.apache.bifromq.mqtt.handler.BaseSessionHandlerTest;
-import org.apache.bifromq.mqtt.handler.ChannelAttrs;
-import org.apache.bifromq.mqtt.handler.TenantSettings;
-import org.apache.bifromq.mqtt.session.IMQTTTransientSession;
-import org.apache.bifromq.mqtt.utils.MQTTMessageUtils;
-import org.apache.bifromq.plugin.authprovider.type.CheckResult;
-import org.apache.bifromq.plugin.authprovider.type.Granted;
-import org.apache.bifromq.plugin.authprovider.type.MQTTAction;
-import 
org.apache.bifromq.plugin.eventcollector.mqttbroker.pushhandling.QoS1Confirmed;
-import 
org.apache.bifromq.plugin.eventcollector.mqttbroker.pushhandling.QoS2Confirmed;
-import org.apache.bifromq.type.ClientInfo;
-import org.apache.bifromq.type.Message;
-import org.apache.bifromq.type.QoS;
-import org.apache.bifromq.type.TopicMessagePack;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.codec.mqtt.MqttDecoder;
-import io.netty.handler.codec.mqtt.MqttMessage;
-import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
-import io.netty.handler.codec.mqtt.MqttMessageType;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
-import io.netty.handler.codec.mqtt.MqttSubAckMessage;
-import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
-import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
-import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
-import io.netty.handler.traffic.ChannelTrafficShapingHandler;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.mockito.ArgumentCaptor;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
 @Slf4j
 public class MQTT3TransientSessionHandlerTest extends BaseSessionHandlerTest {
 
@@ -416,8 +417,8 @@ public class MQTT3TransientSessionHandlerTest extends 
BaseSessionHandlerTest {
         channel.writeInbound(message);
         channel.advanceTimeBy(6, TimeUnit.SECONDS);
         channel.runScheduledPendingTasks();
-        assertFalse(channel.isOpen());
-        verifyEvent(MQTT_SESSION_START, QOS1_DIST_ERROR, SERVER_BUSY, 
MQTT_SESSION_STOP);
+        assertTrue(channel.isOpen());
+        verifyEvent(MQTT_SESSION_START, QOS1_DIST_ERROR, SERVER_BUSY);
     }
 
     @Test
@@ -487,8 +488,8 @@ public class MQTT3TransientSessionHandlerTest extends 
BaseSessionHandlerTest {
         channel.writeInbound(message);
         channel.advanceTimeBy(6, TimeUnit.SECONDS);
         channel.runScheduledPendingTasks();
-        assertFalse(channel.isOpen());
-        verifyEvent(MQTT_SESSION_START, QOS2_DIST_ERROR, SERVER_BUSY, 
MQTT_SESSION_STOP);
+        assertTrue(channel.isOpen());
+        verifyEvent(MQTT_SESSION_START, QOS2_DIST_ERROR, SERVER_BUSY);
     }
 
     @Test
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/TransientSessionHandlerTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/TransientSessionHandlerTest.java
index 24251915..2328b216 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/TransientSessionHandlerTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v5/TransientSessionHandlerTest.java
@@ -20,6 +20,62 @@
 package org.apache.bifromq.mqtt.handler.v5;
 
 
+import com.google.protobuf.ByteString;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.mqtt.MqttDecoder;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageBuilders;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttProperties;
+import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
+import io.netty.handler.codec.mqtt.MqttSubAckMessage;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
+import io.netty.handler.codec.mqtt.MqttVersion;
+import io.netty.handler.traffic.ChannelTrafficShapingHandler;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bifromq.basehlc.HLC;
+import org.apache.bifromq.dist.client.PubResult;
+import org.apache.bifromq.mqtt.handler.BaseSessionHandlerTest;
+import org.apache.bifromq.mqtt.handler.ChannelAttrs;
+import org.apache.bifromq.mqtt.handler.TenantSettings;
+import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5DisconnectReasonCode;
+import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5PubAckReasonCode;
+import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5PubRecReasonCode;
+import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5SubAckReasonCode;
+import org.apache.bifromq.mqtt.session.IMQTTTransientSession;
+import org.apache.bifromq.mqtt.utils.MQTTMessageUtils;
+import org.apache.bifromq.plugin.authprovider.type.CheckResult;
+import org.apache.bifromq.plugin.authprovider.type.Granted;
+import org.apache.bifromq.plugin.authprovider.type.MQTTAction;
+import 
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.ExceedReceivingLimit;
+import org.apache.bifromq.type.ClientInfo;
+import org.apache.bifromq.type.MQTTClientInfoConstants;
+import org.apache.bifromq.type.Message;
+import org.apache.bifromq.type.QoS;
+import org.apache.bifromq.type.TopicMessagePack;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
 import static io.netty.handler.codec.mqtt.MqttMessageType.DISCONNECT;
 import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
 import static io.netty.handler.codec.mqtt.MqttMessageType.PUBREC;
@@ -81,61 +137,6 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
-import com.google.protobuf.ByteString;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.codec.mqtt.MqttDecoder;
-import io.netty.handler.codec.mqtt.MqttMessage;
-import io.netty.handler.codec.mqtt.MqttMessageBuilders;
-import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
-import io.netty.handler.codec.mqtt.MqttMessageType;
-import io.netty.handler.codec.mqtt.MqttProperties;
-import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
-import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
-import io.netty.handler.codec.mqtt.MqttSubAckMessage;
-import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
-import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
-import io.netty.handler.codec.mqtt.MqttVersion;
-import io.netty.handler.traffic.ChannelTrafficShapingHandler;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bifromq.basehlc.HLC;
-import org.apache.bifromq.dist.client.PubResult;
-import org.apache.bifromq.mqtt.handler.BaseSessionHandlerTest;
-import org.apache.bifromq.mqtt.handler.ChannelAttrs;
-import org.apache.bifromq.mqtt.handler.TenantSettings;
-import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5DisconnectReasonCode;
-import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5PubAckReasonCode;
-import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5PubRecReasonCode;
-import org.apache.bifromq.mqtt.handler.v5.reason.MQTT5SubAckReasonCode;
-import org.apache.bifromq.mqtt.session.IMQTTTransientSession;
-import org.apache.bifromq.mqtt.utils.MQTTMessageUtils;
-import org.apache.bifromq.plugin.authprovider.type.CheckResult;
-import org.apache.bifromq.plugin.authprovider.type.Granted;
-import org.apache.bifromq.plugin.authprovider.type.MQTTAction;
-import 
org.apache.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.ExceedReceivingLimit;
-import org.apache.bifromq.type.ClientInfo;
-import org.apache.bifromq.type.MQTTClientInfoConstants;
-import org.apache.bifromq.type.Message;
-import org.apache.bifromq.type.QoS;
-import org.apache.bifromq.type.TopicMessagePack;
-import org.mockito.ArgumentCaptor;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
 @Slf4j
 public class TransientSessionHandlerTest extends BaseSessionHandlerTest {
 
@@ -425,8 +426,8 @@ public class TransientSessionHandlerTest extends 
BaseSessionHandlerTest {
         channel.advanceTimeBy(6, TimeUnit.SECONDS);
         channel.runScheduledPendingTasks();
         channel.runPendingTasks();
-        assertFalse(channel.isOpen());
-        verifyEvent(MQTT_SESSION_START, QOS1_DIST_ERROR, SERVER_BUSY, 
MQTT_SESSION_STOP);
+        assertTrue(channel.isOpen());
+        verifyEvent(MQTT_SESSION_START, QOS1_DIST_ERROR, SERVER_BUSY);
     }
 
     @Test
@@ -502,8 +503,8 @@ public class TransientSessionHandlerTest extends 
BaseSessionHandlerTest {
         channel.advanceTimeBy(6, TimeUnit.SECONDS);
         channel.runScheduledPendingTasks();
         channel.runPendingTasks();
-        assertFalse(channel.isOpen());
-        verifyEvent(MQTT_SESSION_START, QOS2_DIST_ERROR, SERVER_BUSY, 
MQTT_SESSION_STOP);
+        assertTrue(channel.isOpen());
+        verifyEvent(MQTT_SESSION_START, QOS2_DIST_ERROR, SERVER_BUSY);
     }
 
     @Test

Reply via email to