guohao commented on a change in pull request #9057:
URL: https://github.com/apache/dubbo/pull/9057#discussion_r732452838
##########
File path:
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
##########
@@ -63,85 +46,29 @@ public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise)
private void writeRequest(ChannelHandlerContext ctx, final Request req,
final ChannelPromise promise) {
DefaultFuture2.addTimeoutListener(req.getId(), ctx::close);
- final RpcInvocation inv = (RpcInvocation) req.getData();
- final URL url = inv.getInvoker().getUrl();
- ConsumerModel consumerModel = inv.getServiceModel() != null ?
(ConsumerModel) inv.getServiceModel() : (ConsumerModel) url.getServiceModel();
-
- MethodDescriptor methodDescriptor =
getTriMethodDescriptor(consumerModel, inv);
-
- ClassLoadUtil.switchContextLoader(consumerModel.getClassLoader());
- final AbstractClientStream stream =
AbstractClientStream.newClientStream(url, methodDescriptor.isUnary());
-
- String ssl = url.getParameter(CommonConstants.SSL_ENABLED_KEY);
- if (StringUtils.isNotEmpty(ssl)) {
-
ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).set(Boolean.parseBoolean(ssl));
- }
- // Compressor can not be set by dynamic config
- String compressorStr = ConfigurationUtils
- .getCachedDynamicProperty(inv.getModuleModel(), COMPRESSOR_KEY,
DEFAULT_COMPRESSOR);
-
- Compressor compressor =
Compressor.getCompressor(url.getOrDefaultFrameworkModel(), compressorStr);
- if (compressor != null) {
- stream.setCompressor(compressor);
- }
-
- stream.service(consumerModel)
- .connection(Connection.getConnectionFromChannel(ctx.channel()))
- .method(methodDescriptor)
- .methodName(methodDescriptor.getMethodName())
- .request(req)
- .serialize((String)
inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
- .subscribe(new ClientTransportObserver(ctx, stream, promise));
-
- if (methodDescriptor.isUnary()) {
- stream.asStreamObserver().onNext(inv);
- stream.asStreamObserver().onCompleted();
- } else {
- Response response = new Response(req.getId(), req.getVersion());
- AppResponse result;
- // the stream method params is fixed
- if (methodDescriptor.getRpcType() ==
MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM
- || methodDescriptor.getRpcType() ==
MethodDescriptor.RpcType.CLIENT_STREAM) {
- StreamObserver<Object> obServer = (StreamObserver<Object>)
inv.getArguments()[0];
- obServer = attachCancelContext(obServer,
stream.getCancellationContext());
- stream.subscribe(obServer);
- result = new AppResponse(stream.asStreamObserver());
- } else {
- StreamObserver<Object> obServer = (StreamObserver<Object>)
inv.getArguments()[1];
- obServer = attachCancelContext(obServer,
stream.getCancellationContext());
- stream.subscribe(obServer);
- result = new AppResponse();
- stream.asStreamObserver().onNext(inv.getArguments()[0]);
- stream.asStreamObserver().onCompleted();
- }
- response.setResult(result);
- DefaultFuture2.received(stream.getConnection(), response);
- }
- }
-
- /**
- * Get the tri protocol special MethodDescriptor
- */
- private MethodDescriptor getTriMethodDescriptor(ConsumerModel
consumerModel, RpcInvocation inv) {
- List<MethodDescriptor> methodDescriptors =
consumerModel.getServiceModel().getMethods(inv.getMethodName());
- if (CollectionUtils.isEmpty(methodDescriptors)) {
- throw new IllegalStateException("methodDescriptors must not be
null method=" + inv.getMethodName());
- }
- for (MethodDescriptor methodDescriptor : methodDescriptors) {
- if (Arrays.equals(inv.getParameterTypes(),
methodDescriptor.getRealParameterClasses())) {
- return methodDescriptor;
- }
- }
- throw new IllegalStateException("methodDescriptors must not be null
method=" + inv.getMethodName());
- }
-
-
- public <T> StreamObserver<T> attachCancelContext(StreamObserver<T>
observer, CancellationContext context) {
- if (observer instanceof CancelableStreamObserver) {
- CancelableStreamObserver<T> streamObserver =
((CancelableStreamObserver<T>) observer);
- streamObserver.setCancellationContext(context);
- return streamObserver;
- }
- return observer;
+ Connection connection =
Connection.getConnectionFromChannel(ctx.channel());
+ final AbstractClientStream stream =
AbstractClientStream.newClientStream(req, connection);
+ final ClientTransportObserver clientTransportObserver = new
ClientTransportObserver(ctx, promise);
+ final Http2StreamChannelBootstrap streamChannelBootstrap = new
Http2StreamChannelBootstrap(ctx.channel());
+ streamChannelBootstrap.open()
+ .addListener(future -> {
+ if (future.isSuccess()) {
+ final Http2StreamChannel curChannel = (Http2StreamChannel)
future.get();
Review comment:
promise.trySuccess
##########
File path:
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
##########
@@ -20,62 +20,51 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2StreamChannel;
-import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
-import io.netty.util.AsciiString;
public class ClientTransportObserver implements TransportObserver {
- private final AsciiString SCHEME;
private final ChannelHandlerContext ctx;
- private final Http2StreamChannel streamChannel;
private final ChannelPromise promise;
- private boolean headerSent = false;
- private boolean endStreamSent = false;
- private boolean resetSent = false;
+ private Http2StreamChannel streamChannel;
+ private volatile int initialized = DEFAULT;
- public ClientTransportObserver(ChannelHandlerContext ctx,
AbstractClientStream stream, ChannelPromise promise) {
- this.ctx = ctx;
- this.promise = promise;
- Boolean ssl =
ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).get();
- if (ssl != null && ssl) {
- SCHEME = TripleConstant.HTTPS_SCHEME;
- } else {
- SCHEME = TripleConstant.HTTP_SCHEME;
- }
- final Http2StreamChannelBootstrap streamChannelBootstrap = new
Http2StreamChannelBootstrap(ctx.channel());
- streamChannel =
streamChannelBootstrap.open().syncUninterruptibly().getNow();
+ private static final int DEFAULT = 0;
+ private static final int SUCCESS = 1;
+ private static final int FAIL = 2;
+
+ public void setStreamChannel(Http2StreamChannel streamChannel) {
+ this.streamChannel = streamChannel;
+ initialized = SUCCESS;
+ }
+
+ public void initializedFailed() {
+ initialized = FAIL;
+ }
- final TripleHttp2ClientResponseHandler responseHandler = new
TripleHttp2ClientResponseHandler();
- streamChannel.pipeline().addLast(responseHandler)
- .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
- .addLast(new TripleClientInboundHandler());
- streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).set(stream);
+ public ClientTransportObserver(ChannelHandlerContext ctx, ChannelPromise
promise) {
+ this.ctx = ctx;
+ this.promise = promise;
}
@Override
public void onMetadata(Metadata metadata, boolean endStream) {
- if (headerSent) {
- return;
+ while (initialized == DEFAULT) {
+ // wait channel initialized
}
- if (resetSent) {
+ //
Review comment:
Exception or log is need
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]