This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 20a4444fbe Fix flow controller trigger application init (#11112)
20a4444fbe is described below
commit 20a4444fbe601d3c6022102aa2b69291976a7c84
Author: Albumen Kevin <[email protected]>
AuthorDate: Sun Dec 11 09:26:09 2022 +0800
Fix flow controller trigger application init (#11112)
---
.../protocol/tri/TriHttp2RemoteFlowController.java | 56 ++++++++++++----------
.../rpc/protocol/tri/TripleHttp2Protocol.java | 4 +-
2 files changed, 33 insertions(+), 27 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java
index 3cbb08bf09..b29439fe3e 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java
@@ -16,30 +16,33 @@
package org.apache.dubbo.rpc.protocol.tri;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2RemoteFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.StreamByteDistributor;
+import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
+
import java.util.ArrayDeque;
import java.util.Deque;
-import io.netty.handler.codec.http2.Http2Error;
+
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
-import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
-import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
-import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
-import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
-import io.netty.handler.codec.http2.StreamByteDistributor;
-import io.netty.handler.codec.http2.Http2StreamVisitor;
-import io.netty.handler.codec.http2.Http2Exception;
-import io.netty.handler.codec.http2.Http2ConnectionAdapter;
-import io.netty.handler.codec.http2.Http2Connection;
-import io.netty.handler.codec.http2.Http2RemoteFlowController;
-import io.netty.handler.codec.http2.Http2Stream;
-import org.apache.dubbo.common.config.Configuration;
-import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.rpc.model.ApplicationModel;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
@@ -60,30 +63,33 @@ public class TriHttp2RemoteFlowController implements
Http2RemoteFlowController {
private final Http2Connection.PropertyKey stateKey;
private final StreamByteDistributor streamByteDistributor;
private final FlowState connectionState;
- private Configuration config = ConfigurationUtils.getGlobalConfiguration(
- ApplicationModel.defaultModel());
- private int initialWindowSize =
config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_SIZE);
+ private final Configuration config;
+ private int initialWindowSize;
private WritabilityMonitor monitor;
private ChannelHandlerContext ctx;
- public TriHttp2RemoteFlowController(Http2Connection connection) {
- this(connection, (Listener) null);
+ public TriHttp2RemoteFlowController(Http2Connection connection,
ApplicationModel applicationModel) {
+ this(connection, (Listener) null, applicationModel);
}
public TriHttp2RemoteFlowController(Http2Connection connection,
- StreamByteDistributor
streamByteDistributor) {
- this(connection, streamByteDistributor, null);
+ StreamByteDistributor
streamByteDistributor,
+ ApplicationModel applicationModel) {
+ this(connection, streamByteDistributor, null, applicationModel);
}
- public TriHttp2RemoteFlowController(Http2Connection connection, final
Listener listener) {
- this(connection, new WeightedFairQueueByteDistributor(connection),
listener);
+ public TriHttp2RemoteFlowController(Http2Connection connection, final
Listener listener, ApplicationModel applicationModel) {
+ this(connection, new WeightedFairQueueByteDistributor(connection),
listener, applicationModel);
}
public TriHttp2RemoteFlowController(Http2Connection connection,
StreamByteDistributor
streamByteDistributor,
- final Listener listener) {
+ final Listener listener,
+ ApplicationModel applicationModel) {
this.connection = checkNotNull(connection, "connection");
this.streamByteDistributor = checkNotNull(streamByteDistributor,
"streamWriteDistributor");
+ this.config =
ConfigurationUtils.getGlobalConfiguration(applicationModel);
+ this.initialWindowSize =
config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_SIZE);
// Add a flow state for the connection.
stateKey = connection.newKey();
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 9441af3dcf..10af245b2c 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -127,7 +127,7 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
.frameLogger(SERVER_LOGGER)
.build();
ExecutorSupport executorSupport =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).getExecutorSupport(url);
- codec.connection().remote().flowController(new
TriHttp2RemoteFlowController(codec.connection()));
+ codec.connection().remote().flowController(new
TriHttp2RemoteFlowController(codec.connection(),
url.getOrDefaultApplicationModel()));
codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter());
TripleWriteQueue writeQueue = new TripleWriteQueue();
final Http2MultiplexHandler handler = new Http2MultiplexHandler(
@@ -166,7 +166,7 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
DEFAULT_MAX_HEADER_LIST_SIZE)))
.frameLogger(CLIENT_LOGGER)
.build();
- codec.connection().remote().flowController(new
TriHttp2RemoteFlowController(codec.connection()));
+ codec.connection().remote().flowController(new
TriHttp2RemoteFlowController(codec.connection(),
url.getOrDefaultApplicationModel()));
codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter());
final Http2MultiplexHandler handler = new Http2MultiplexHandler(
new TripleClientHandler(frameworkModel));