This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 2daf878fc5 opt triple remote flow controller (#15983)
2daf878fc5 is described below
commit 2daf878fc5f470db0e3699596c67d4bed996578c
Author: earthchen <[email protected]>
AuthorDate: Fri Jan 16 16:53:32 2026 +0800
opt triple remote flow controller (#15983)
* opt triple remote flow controller
* opt triple remote flow controller
---------
Co-authored-by: heliang <[email protected]>
---
.../rpc/protocol/tri/TripleHttp2Protocol.java | 7 ++-
.../transport/TripleHttp2LocalFlowController.java | 5 ++
.../transport/TripleHttp2RemoteFlowController.java | 73 ++++++++++++++++++++++
3 files changed, 83 insertions(+), 2 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 918bb82024..ff599369db 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -43,6 +43,7 @@ import
org.apache.dubbo.rpc.protocol.tri.h12.http1.DefaultHttp11ServerTransportL
import
org.apache.dubbo.rpc.protocol.tri.h12.http2.GenericHttp2ServerTransportListenerFactory;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleGoAwayHandler;
import
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2LocalFlowController;
+import
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2RemoteFlowController;
import
org.apache.dubbo.rpc.protocol.tri.transport.TripleServerConnectionHandler;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleTailHandler;
import
org.apache.dubbo.rpc.protocol.tri.websocket.DefaultWebSocketServerTransportListenerFactory;
@@ -127,7 +128,8 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
private Http2Connection createHttp2ClientConnection(TripleConfig
tripleConfig) {
Http2Connection connection = new DefaultHttp2Connection(false);
float windowUpdateRatio = tripleConfig.getWindowUpdateRatioOrDefault();
- connection.local().flowController(new
TripleHttp2LocalFlowController(connection, windowUpdateRatio));
+
connection.local().flowController(TripleHttp2LocalFlowController.newController(connection,
windowUpdateRatio));
+
connection.remote().flowController(TripleHttp2RemoteFlowController.newController(connection));
return connection;
}
@@ -264,7 +266,8 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
private Http2Connection createHttp2ServerConnection(TripleConfig
tripleConfig) {
Http2Connection connection = new DefaultHttp2Connection(true);
float windowUpdateRatio = tripleConfig.getWindowUpdateRatioOrDefault();
- connection.local().flowController(new
TripleHttp2LocalFlowController(connection, windowUpdateRatio));
+
connection.local().flowController(TripleHttp2LocalFlowController.newController(connection,
windowUpdateRatio));
+
connection.remote().flowController(TripleHttp2RemoteFlowController.newController(connection));
return connection;
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java
index aaea399401..1e54f9dbfc 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.protocol.tri.transport;
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
/**
* Custom HTTP/2 local flow controller for Triple protocol with manual flow
control.
@@ -62,4 +63,8 @@ public class TripleHttp2LocalFlowController extends
DefaultHttp2LocalFlowControl
public TripleHttp2LocalFlowController(Http2Connection connection, float
windowUpdateRatio) {
super(connection, windowUpdateRatio, true);
}
+
+ public static Http2LocalFlowController newController(Http2Connection
connection, float windowUpdateRatio) {
+ return new TripleHttp2LocalFlowController(connection,
windowUpdateRatio);
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2RemoteFlowController.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2RemoteFlowController.java
new file mode 100644
index 0000000000..55c8ab6755
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2RemoteFlowController.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.transport;
+
+import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2RemoteFlowController;
+import io.netty.handler.codec.http2.StreamByteDistributor;
+import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
+
+/**
+ * Triple-specific implementation of {@link Http2RemoteFlowController}.
+ *
+ * <p>This class extends the {@link DefaultHttp2RemoteFlowController} to
provide
+ * flow control management for Triple (Dubbo's gRPC-compatible protocol)
connections.
+ * It coordinates the distribution of outbound flow-controlled bytes across all
+ * active streams within a connection.
+ *
+ * <p>The controller utilizes a {@link WeightedFairQueueByteDistributor} to
ensure
+ * that bandwidth is allocated based on stream weights and priorities while
+ * maintaining fairness to prevent stream starvation.
+ *
+ * @see DefaultHttp2RemoteFlowController
+ * @see WeightedFairQueueByteDistributor
+ */
+public class TripleHttp2RemoteFlowController extends
DefaultHttp2RemoteFlowController {
+
+ /**
+ * Constructs a new TripleHttp2RemoteFlowController.
+ *
+ * @param connection the {@link Http2Connection} to be managed.
+ * @param streamByteDistributor the distributor responsible for
determining how
+ * available bytes are allocated among streams.
+ */
+ public TripleHttp2RemoteFlowController(Http2Connection connection,
StreamByteDistributor streamByteDistributor) {
+ super(connection, streamByteDistributor);
+ }
+
+ /**
+ * Factory method to create a pre-configured flow controller optimized for
Triple performance.
+ *
+ * <p>Configuration details:
+ * <ul>
+ * <li>Uses {@link WeightedFairQueueByteDistributor} for weighted-fair
resource allocation.</li>
+ * <li>Sets {@code allocationQuantum} to 16KB. This setting reduces
scheduling overhead
+ * by ensuring each stream is allocated a meaningful chunk of data before
switching
+ * contexts, which significantly improves throughput in high-load
scenarios.</li>
+ * </ul>
+ *
+ * @param connection the {@link Http2Connection} for which the controller
will be created.
+ * @return a fully initialized {@link Http2RemoteFlowController} instance.
+ */
+ public static Http2RemoteFlowController newController(Http2Connection
connection) {
+ WeightedFairQueueByteDistributor dist = new
WeightedFairQueueByteDistributor(connection);
+ // Optimization: 16KB quantum size to balance fairness and
high-throughput performance.
+ dist.allocationQuantum(16 * 1024);
+ return new TripleHttp2RemoteFlowController(connection, dist);
+ }
+}