guohao commented on a change in pull request #9057:
URL: https://github.com/apache/dubbo/pull/9057#discussion_r732373208
##########
File path:
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
##########
@@ -20,62 +20,35 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2StreamChannel;
-import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
-import io.netty.util.AsciiString;
public class ClientTransportObserver implements TransportObserver {
- private final AsciiString SCHEME;
private final ChannelHandlerContext ctx;
- private final Http2StreamChannel streamChannel;
private final ChannelPromise promise;
- private boolean headerSent = false;
- private boolean endStreamSent = false;
- private boolean resetSent = false;
+ private volatile Http2StreamChannel streamChannel;
+ public void setStreamChannel(Http2StreamChannel streamChannel) {
+ this.streamChannel = streamChannel;
+ }
- public ClientTransportObserver(ChannelHandlerContext ctx,
AbstractClientStream stream, ChannelPromise promise) {
+ public ClientTransportObserver(ChannelHandlerContext ctx, ChannelPromise
promise) {
this.ctx = ctx;
this.promise = promise;
- Boolean ssl =
ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).get();
- if (ssl != null && ssl) {
- SCHEME = TripleConstant.HTTPS_SCHEME;
- } else {
- SCHEME = TripleConstant.HTTP_SCHEME;
- }
-
- final Http2StreamChannelBootstrap streamChannelBootstrap = new
Http2StreamChannelBootstrap(ctx.channel());
- streamChannel =
streamChannelBootstrap.open().syncUninterruptibly().getNow();
-
- final TripleHttp2ClientResponseHandler responseHandler = new
TripleHttp2ClientResponseHandler();
- streamChannel.pipeline().addLast(responseHandler)
- .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
- .addLast(new TripleClientInboundHandler());
- streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).set(stream);
}
@Override
public void onMetadata(Metadata metadata, boolean endStream) {
- if (headerSent) {
- return;
+ while (streamChannel == null) {
+ // wait channel initialized
Review comment:
streamChannel may be null if `open` failed, and will be blocked here
forever
##########
File path:
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
##########
@@ -18,34 +18,58 @@
package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.api.Connection;
+import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ConsumerModel;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.ServiceModel;
import org.apache.dubbo.triple.TripleWrapper;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.util.AsciiString;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
+import static org.apache.dubbo.rpc.Constants.COMPRESSOR_KEY;
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
public abstract class AbstractClientStream extends AbstractStream implements
Stream {
+
+ private final AsciiString scheme;
private ConsumerModel consumerModel;
private Connection connection;
+ private RpcInvocation rpcInvocation;
+ private long requestId;
protected AbstractClientStream(URL url) {
super(url);
+ this.scheme = getSchemeFromUrl(url);
+ // for client cancel,send rst frame to server
+ this.getCancellationContext().addListener(context -> {
+ Throwable throwable =
this.getCancellationContext().getCancellationCause();
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.warn("Cancel by local throwable is ", throwable);
Review comment:
```suggestion
LOGGER.warn("Triple request to {service+method} was canceled
by local exception ", throwable);
```
##########
File path:
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
##########
@@ -55,38 +79,107 @@ public static ClientStream stream(URL url) {
return new ClientStream(url);
}
- public static AbstractClientStream newClientStream(URL url, boolean unary)
{
- AbstractClientStream stream = unary ? unary(url) : stream(url);
- final CancellationContext cancellationContext =
stream.getCancellationContext();
- // for client cancel,send rst frame to server
- cancellationContext.addListener(context -> {
- if (LOGGER.isWarnEnabled()) {
- Throwable throwable =
cancellationContext.getCancellationCause();
- LOGGER.warn("Cancel by local throwable is ", throwable);
- }
- stream.asTransportObserver().onReset(Http2Error.CANCEL);
- });
+ public static AbstractClientStream newClientStream(Request req, Connection
connection) {
+ final RpcInvocation inv = (RpcInvocation) req.getData();
+ final URL url = inv.getInvoker().getUrl();
+ ConsumerModel consumerModel = inv.getServiceModel() != null ?
(ConsumerModel) inv.getServiceModel() : (ConsumerModel) url.getServiceModel();
+ MethodDescriptor methodDescriptor =
getTriMethodDescriptor(consumerModel, inv);
+ ClassLoadUtil.switchContextLoader(consumerModel.getClassLoader());
+ AbstractClientStream stream = methodDescriptor.isUnary() ? unary(url)
: stream(url);
+ Compressor compressor = getCompressor(url, consumerModel);
+ stream.request(req)
+ .service(consumerModel)
+ .connection(connection)
+ .serialize((String)
inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
+ .method(methodDescriptor)
+ .setCompressor(compressor)
+ ;
Review comment:
fix format
##########
File path:
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
##########
@@ -20,62 +20,35 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2StreamChannel;
-import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
-import io.netty.util.AsciiString;
public class ClientTransportObserver implements TransportObserver {
- private final AsciiString SCHEME;
private final ChannelHandlerContext ctx;
- private final Http2StreamChannel streamChannel;
private final ChannelPromise promise;
- private boolean headerSent = false;
- private boolean endStreamSent = false;
- private boolean resetSent = false;
+ private volatile Http2StreamChannel streamChannel;
+ public void setStreamChannel(Http2StreamChannel streamChannel) {
+ this.streamChannel = streamChannel;
+ }
- public ClientTransportObserver(ChannelHandlerContext ctx,
AbstractClientStream stream, ChannelPromise promise) {
+ public ClientTransportObserver(ChannelHandlerContext ctx, ChannelPromise
promise) {
this.ctx = ctx;
this.promise = promise;
- Boolean ssl =
ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).get();
- if (ssl != null && ssl) {
- SCHEME = TripleConstant.HTTPS_SCHEME;
- } else {
- SCHEME = TripleConstant.HTTP_SCHEME;
- }
-
- final Http2StreamChannelBootstrap streamChannelBootstrap = new
Http2StreamChannelBootstrap(ctx.channel());
- streamChannel =
streamChannelBootstrap.open().syncUninterruptibly().getNow();
-
- final TripleHttp2ClientResponseHandler responseHandler = new
TripleHttp2ClientResponseHandler();
- streamChannel.pipeline().addLast(responseHandler)
- .addLast(new GrpcDataDecoder(Integer.MAX_VALUE, true))
- .addLast(new TripleClientInboundHandler());
- streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY).set(stream);
}
@Override
public void onMetadata(Metadata metadata, boolean endStream) {
- if (headerSent) {
- return;
+ while (streamChannel == null) {
+ // wait channel initialized
Review comment:
How about using async send after channel created
##########
File path:
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
##########
@@ -55,38 +79,107 @@ public static ClientStream stream(URL url) {
return new ClientStream(url);
}
- public static AbstractClientStream newClientStream(URL url, boolean unary)
{
- AbstractClientStream stream = unary ? unary(url) : stream(url);
- final CancellationContext cancellationContext =
stream.getCancellationContext();
- // for client cancel,send rst frame to server
- cancellationContext.addListener(context -> {
- if (LOGGER.isWarnEnabled()) {
- Throwable throwable =
cancellationContext.getCancellationCause();
- LOGGER.warn("Cancel by local throwable is ", throwable);
- }
- stream.asTransportObserver().onReset(Http2Error.CANCEL);
- });
+ public static AbstractClientStream newClientStream(Request req, Connection
connection) {
+ final RpcInvocation inv = (RpcInvocation) req.getData();
+ final URL url = inv.getInvoker().getUrl();
+ ConsumerModel consumerModel = inv.getServiceModel() != null ?
(ConsumerModel) inv.getServiceModel() : (ConsumerModel) url.getServiceModel();
+ MethodDescriptor methodDescriptor =
getTriMethodDescriptor(consumerModel, inv);
+ ClassLoadUtil.switchContextLoader(consumerModel.getClassLoader());
+ AbstractClientStream stream = methodDescriptor.isUnary() ? unary(url)
: stream(url);
+ Compressor compressor = getCompressor(url, consumerModel);
+ stream.request(req)
+ .service(consumerModel)
+ .connection(connection)
+ .serialize((String)
inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
+ .method(methodDescriptor)
+ .setCompressor(compressor)
+ ;
return stream;
}
- public AbstractClientStream service(ConsumerModel model) {
- this.consumerModel = model;
- return this;
+ protected void startCall() {
+ try {
+ doOnStartCall();
+ } catch (Throwable throwable) {
+ cancel(throwable);
+ DefaultFuture2.getFuture(getRequestId()).cancel();
+ }
}
- public ConsumerModel getConsumerModel() {
- return consumerModel;
+ protected abstract void doOnStartCall();
+
+ @Override
+ protected StreamObserver<Object> createStreamObserver() {
+ return new ClientStreamObserverImpl(getCancellationContext());
}
- public AbstractClientStream connection(Connection connection) {
- this.connection = connection;
- return this;
+ protected class ClientStreamObserverImpl extends
CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
+
+ public ClientStreamObserverImpl(CancellationContext
cancellationContext) {
+ super(cancellationContext);
+ }
+
+ @Override
+ public void onNext(Object data) {
+ if (getState().allowSendMeta()) {
+ getState().setMetaSend();
+ final Metadata metadata =
createRequestMeta(getRpcInvocation());
+ getTransportSubscriber().onMetadata(metadata, false);
+ }
+ if (getState().allowSendData()) {
+ final byte[] bytes = encodeRequest(data);
+ getTransportSubscriber().onData(bytes, false);
+ }
+ }
+
+ /**
+ * Handle all exceptions in the request process, other procedures
directly throw
+ * <p>
+ * other procedures is {@link ClientStreamObserver#onNext(Object)} and
{@link ClientStreamObserver#onCompleted()}
+ */
+ @Override
+ public void onError(Throwable throwable) {
+ if (getState().allowSendEndStream()) {
+ getState().setEndStreamSend();
+ GrpcStatus status = GrpcStatus.getStatus(throwable);
+ transportError(status, null, getState().allowSendMeta());
+ } else {
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("client request error ", throwable);
Review comment:
Add service and method descriprtion to log
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]