This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 46af145730 refactor(triple): Clean up some unused classes to
facilitate reference analysis (#13619)
46af145730 is described below
commit 46af145730aa34417e3fcb74592f17a3190748f6
Author: Sean Yang <[email protected]>
AuthorDate: Fri Jan 19 14:46:30 2024 +0800
refactor(triple): Clean up some unused classes to facilitate reference
analysis (#13619)
---
.../dubbo/reactive/CreateObserverAdapter.java | 8 +-
.../TripleReflectionTypeDescriberRegistrar.java | 2 -
.../rpc/protocol/tri/call/AbstractServerCall.java | 438 ------------------
.../tri/call/AbstractServerCallListener.java | 102 -----
.../tri/call/BiStreamServerCallListener.java | 61 ---
.../tri/call/ReflectionAbstractServerCall.java | 214 ---------
.../dubbo/rpc/protocol/tri/call/ServerCall.java | 75 ---
.../tri/call/ServerStreamServerCallListener.java | 51 ---
.../protocol/tri/call/StubAbstractServerCall.java | 66 ---
.../protocol/tri/call/UnaryServerCallListener.java | 84 ----
.../tri/h12/AbstractServerCallListener.java | 3 +-
.../h12/grpc/GrpcHttp2ServerTransportListener.java | 4 +-
.../tri/observer/ServerCallToObserverAdapter.java | 136 ------
.../rpc/protocol/tri/stream/ServerStream.java | 64 ---
.../protocol/tri/stream/TripleServerStream.java | 505 ---------------------
.../transport/TripleHttp2FrameServerHandler.java | 115 -----
.../tri/call/ReflectionServerCallTest.java | 92 ----
.../rpc/protocol/tri/call/StubServerCallTest.java | 68 ---
18 files changed, 7 insertions(+), 2081 deletions(-)
diff --git
a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
index 99f44c7cc5..3f303b5f3f 100644
---
a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
+++
b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.reactive;
-import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
+import org.apache.dubbo.rpc.protocol.tri.ServerStreamObserver;
import java.util.concurrent.atomic.AtomicInteger;
@@ -28,7 +28,7 @@ import static org.mockito.Mockito.doAnswer;
public class CreateObserverAdapter {
- private ServerCallToObserverAdapter<String> responseObserver;
+ private ServerStreamObserver<String> responseObserver;
private AtomicInteger nextCounter;
private AtomicInteger completeCounter;
private AtomicInteger errorCounter;
@@ -39,7 +39,7 @@ public class CreateObserverAdapter {
completeCounter = new AtomicInteger();
errorCounter = new AtomicInteger();
- responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
+ responseObserver = Mockito.mock(ServerStreamObserver.class);
doAnswer(o ->
nextCounter.incrementAndGet()).when(responseObserver).onNext(anyString());
doAnswer(o ->
completeCounter.incrementAndGet()).when(responseObserver).onCompleted();
doAnswer(o ->
errorCounter.incrementAndGet()).when(responseObserver).onError(any(Throwable.class));
@@ -57,7 +57,7 @@ public class CreateObserverAdapter {
return errorCounter;
}
- public ServerCallToObserverAdapter<String> getResponseObserver() {
+ public ServerStreamObserver<String> getResponseObserver() {
return this.responseObserver;
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/aot/TripleReflectionTypeDescriberRegistrar.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/aot/TripleReflectionTypeDescriberRegistrar.java
index 0ac720184b..a0ab8198d4 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/aot/TripleReflectionTypeDescriberRegistrar.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/aot/TripleReflectionTypeDescriberRegistrar.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.aot.api.TypeDescriber;
import
org.apache.dubbo.rpc.protocol.tri.transport.TripleCommandOutBoundHandler;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleGoAwayHandler;
import
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2ClientResponseHandler;
-import
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2FrameServerHandler;
import
org.apache.dubbo.rpc.protocol.tri.transport.TripleServerConnectionHandler;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleTailHandler;
@@ -35,7 +34,6 @@ public class TripleReflectionTypeDescriberRegistrar
implements ReflectionTypeDes
@Override
public List<TypeDescriber> getTypeDescribers() {
List<TypeDescriber> typeDescribers = new ArrayList<>();
-
typeDescribers.add(buildTypeDescriberWithPublicMethod(TripleHttp2FrameServerHandler.class));
typeDescribers.add(buildTypeDescriberWithPublicMethod(TripleCommandOutBoundHandler.class));
typeDescribers.add(buildTypeDescriberWithPublicMethod(TripleTailHandler.class));
typeDescribers.add(buildTypeDescriberWithPublicMethod(TripleServerConnectionHandler.class));
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
deleted file mode 100644
index fb8f516367..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcContext;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.rpc.model.PackableMethod;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.ClassLoadUtil;
-import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
-import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
-import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
-import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
-import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
-import org.apache.dubbo.rpc.protocol.tri.stream.ServerStream;
-import org.apache.dubbo.rpc.protocol.tri.stream.StreamUtils;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.util.concurrent.Future;
-
-import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_CREATE_STREAM_TRIPLE;
-import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_PARSE;
-import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
-import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_SERIALIZE_TRIPLE;
-
-public abstract class AbstractServerCall implements ServerCall,
ServerStream.Listener {
-
- public static final String REMOTE_ADDRESS_KEY = "tri.remote.address";
- private static final ErrorTypeAwareLogger LOGGER =
LoggerFactory.getErrorTypeAwareLogger(AbstractServerCall.class);
-
- public final Invoker<?> invoker;
- public final FrameworkModel frameworkModel;
- public final ServerStream stream;
- public final Executor executor;
- public final String methodName;
- public final String serviceName;
- public final ServiceDescriptor serviceDescriptor;
- private final String acceptEncoding;
- public boolean autoRequestN = true;
- public Long timeout;
- ServerCall.Listener listener;
- private Compressor compressor;
- private boolean headerSent;
- private boolean closed;
- CancellationContext cancellationContext;
- protected MethodDescriptor methodDescriptor;
- protected PackableMethod packableMethod;
- protected Map<String, Object> requestMetadata;
-
- private Integer exceptionCode =
CommonConstants.TRI_EXCEPTION_CODE_NOT_EXISTS;
-
- public Integer getExceptionCode() {
- return exceptionCode;
- }
-
- public void setExceptionCode(Integer exceptionCode) {
- this.exceptionCode = exceptionCode;
- }
-
- private boolean isNeedReturnException = false;
-
- public boolean isNeedReturnException() {
- return isNeedReturnException;
- }
-
- public void setNeedReturnException(boolean needReturnException) {
- isNeedReturnException = needReturnException;
- }
-
- AbstractServerCall(
- Invoker<?> invoker,
- ServerStream stream,
- FrameworkModel frameworkModel,
- ServiceDescriptor serviceDescriptor,
- String acceptEncoding,
- String serviceName,
- String methodName,
- Executor executor) {
- Objects.requireNonNull(serviceDescriptor, "No service descriptor found
for " + invoker.getUrl());
- this.invoker = invoker;
- // is already serialized in the stream, so we don't need to serialize
it again.
- this.executor = executor;
- this.frameworkModel = frameworkModel;
- this.serviceDescriptor = serviceDescriptor;
- this.serviceName = serviceName;
- this.methodName = methodName;
- this.stream = stream;
- this.acceptEncoding = acceptEncoding;
- }
-
- // stream listener start
- @Override
- public void onHeader(Map<String, Object> requestMetadata) {
- this.requestMetadata = requestMetadata;
- if (serviceDescriptor == null) {
- responseErr(TriRpcStatus.UNIMPLEMENTED.withDescription("Service
not found:" + serviceName));
- return;
- }
- startCall();
- }
-
- protected void startCall() {
- RpcInvocation invocation = buildInvocation(methodDescriptor);
- listener = startInternalCall(invocation, methodDescriptor, invoker);
- }
-
- @Override
- public final void request(int numMessages) {
- stream.request(numMessages);
- }
-
- @Override
- public final void sendMessage(Object message) {
- if (closed) {
- throw new IllegalStateException("Stream has already canceled");
- }
- // is already in executor
- doSendMessage(message);
- }
-
- private void doSendMessage(Object message) {
- if (closed) {
- return;
- }
- if (!headerSent) {
- sendHeader();
- }
- final byte[] data;
- try {
- data = packableMethod.packResponse(message);
- } catch (Exception e) {
- close(
- TriRpcStatus.INTERNAL
- .withDescription("Serialize response failed")
- .withCause(e),
- null);
- LOGGER.error(
- PROTOCOL_FAILED_SERIALIZE_TRIPLE,
- "",
- "",
- String.format("Serialize triple response failed,
service=%s method=%s", serviceName, methodName),
- e);
- return;
- }
- if (data == null) {
- close(TriRpcStatus.INTERNAL.withDescription("Missing response"),
null);
- return;
- }
- Future<?> future;
- if (compressor != null) {
- int compressedFlag =
Identity.MESSAGE_ENCODING.equals(compressor.getMessageEncoding()) ? 0 : 1;
- final byte[] compressed = compressor.compress(data);
- future = stream.sendMessage(compressed, compressedFlag);
- } else {
- future = stream.sendMessage(data, 0);
- }
- future.addListener(f -> {
- if (!f.isSuccess()) {
- cancelDual(TriRpcStatus.CANCELLED
- .withDescription("Send message failed")
- .withCause(f.cause()));
- }
- });
- }
-
- @Override
- public final void onComplete() {
- if (listener == null) {
- // It will enter here when there is an error in the header
- return;
- }
- // Both 'onError' and 'onComplete' are termination operators.
- // The stream will be closed when 'onError' was called, and
'onComplete' is not allowed to be called again.
- if (isClosed()) {
- return;
- }
- listener.onComplete();
- }
-
- @Override
- public final void onMessage(byte[] message, boolean isReturnTriException) {
- ClassLoader tccl = Thread.currentThread().getContextClassLoader();
- try {
- Object instance = parseSingleMessage(message);
- listener.onMessage(instance, message.length);
- } catch (Exception e) {
- final TriRpcStatus status =
- TriRpcStatus.UNKNOWN.withDescription("Server
error").withCause(e);
- close(status, null);
- LOGGER.error(
- PROTOCOL_FAILED_REQUEST,
- "",
- "",
- "Process request failed. service=" + serviceName + "
method=" + methodName,
- e);
- } finally {
- ClassLoadUtil.switchContextLoader(tccl);
- }
- }
-
- protected abstract Object parseSingleMessage(byte[] data) throws Exception;
-
- @Override
- public final void onCancelByRemote(TriRpcStatus status) {
- closed = true;
- if (listener == null) {
- return;
- }
- cancellationContext.cancel(status.cause);
- listener.onCancel(status);
- }
- // stream listener end
-
- public final boolean isClosed() {
- return closed;
- }
-
- /**
- * Build the RpcInvocation with metadata and execute headerFilter
- *
- * @return RpcInvocation
- */
- protected RpcInvocation buildInvocation(MethodDescriptor methodDescriptor)
{
- final URL url = invoker.getUrl();
- RpcInvocation inv = new RpcInvocation(
- url.getServiceModel(),
- methodDescriptor.getMethodName(),
- serviceDescriptor.getInterfaceName(),
- url.getProtocolServiceKey(),
- methodDescriptor.getParameterClasses(),
- new Object[0]);
- inv.setTargetServiceUniqueName(url.getServiceKey());
- inv.setReturnTypes(methodDescriptor.getReturnTypes());
- inv.setObjectAttachments(StreamUtils.toAttachments(requestMetadata));
- inv.put(REMOTE_ADDRESS_KEY, stream.remoteAddress());
- // handle timeout
- String timeout = (String)
requestMetadata.get(TripleHeaderEnum.TIMEOUT.getHeader());
- try {
- if (Objects.nonNull(timeout)) {
- this.timeout = parseTimeoutToMills(timeout);
- }
- } catch (Throwable t) {
- LOGGER.warn(
- PROTOCOL_FAILED_PARSE,
- "",
- "",
- String.format(
- "Failed to parse request timeout set from:%s,
service=%s " + "method=%s",
- timeout, serviceDescriptor.getInterfaceName(),
methodName));
- }
- if (null !=
requestMetadata.get(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader())) {
- inv.put(
- TripleHeaderEnum.CONSUMER_APP_NAME_KEY,
-
requestMetadata.get(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader()));
- }
- return inv;
- }
-
- private void sendHeader() {
- if (closed) {
- return;
- }
- if (headerSent) {
- throw new IllegalStateException("Header has already sent");
- }
- headerSent = true;
- DefaultHttp2Headers headers = new DefaultHttp2Headers();
- headers.status(HttpResponseStatus.OK.codeAsText());
- headers.set(HttpHeaderNames.CONTENT_TYPE,
TripleConstant.CONTENT_PROTO);
- if (acceptEncoding != null) {
- headers.set(HttpHeaderNames.ACCEPT_ENCODING, acceptEncoding);
- }
- if (compressor != null) {
- headers.set(TripleHeaderEnum.GRPC_ENCODING.getHeader(),
compressor.getMessageEncoding());
- }
- if
(!exceptionCode.equals(CommonConstants.TRI_EXCEPTION_CODE_NOT_EXISTS)) {
- headers.set(TripleHeaderEnum.TRI_EXCEPTION_CODE.getHeader(),
String.valueOf(exceptionCode));
- }
- // send header failed will reset stream and close request observer
cause no more data will be sent
- stream.sendHeader(headers).addListener(f -> {
- if (!f.isSuccess()) {
- cancelDual(TriRpcStatus.INTERNAL.withCause(f.cause()));
- }
- });
- }
-
- private void cancelDual(TriRpcStatus status) {
- closed = true;
- listener.onCancel(status);
- cancellationContext.cancel(status.asException());
- }
-
- public void cancelByLocal(Throwable throwable) {
- if (closed) {
- return;
- }
- closed = true;
- cancellationContext.cancel(throwable);
- stream.cancelByLocal(TriRpcStatus.CANCELLED.withCause(throwable));
- }
-
- public void setCompression(String compression) {
- if (headerSent) {
- throw new IllegalStateException("Can not set compression after
header sent");
- }
- this.compressor = Compressor.getCompressor(frameworkModel,
compression);
- }
-
- public void disableAutoRequestN() {
- autoRequestN = false;
- }
-
- public boolean isAutoRequestN() {
- return autoRequestN;
- }
-
- public void close(TriRpcStatus status, Map<String, Object> attachments) {
- doClose(status, attachments);
- }
-
- private void doClose(TriRpcStatus status, Map<String, Object> attachments)
{
- if (closed) {
- return;
- }
- closed = true;
- stream.complete(status, attachments, isNeedReturnException,
exceptionCode);
- }
-
- protected Long parseTimeoutToMills(String timeoutVal) {
- if (StringUtils.isEmpty(timeoutVal) ||
StringUtils.isContains(timeoutVal, "null")) {
- return null;
- }
- long value = Long.parseLong(timeoutVal.substring(0,
timeoutVal.length() - 1));
- char unit = timeoutVal.charAt(timeoutVal.length() - 1);
- switch (unit) {
- case 'n':
- return TimeUnit.NANOSECONDS.toMillis(value);
- case 'u':
- return TimeUnit.MICROSECONDS.toMillis(value);
- case 'm':
- return value;
- case 'S':
- return TimeUnit.SECONDS.toMillis(value);
- case 'M':
- return TimeUnit.MINUTES.toMillis(value);
- case 'H':
- return TimeUnit.HOURS.toMillis(value);
- default:
- // invalid timeout config
- return null;
- }
- }
-
- /**
- * Error in create stream, unsupported config or triple protocol error.
- *
- * @param status response status
- */
- protected void responseErr(TriRpcStatus status) {
- if (closed) {
- return;
- }
- closed = true;
- stream.complete(status, null, false,
CommonConstants.TRI_EXCEPTION_CODE_NOT_EXISTS);
- LOGGER.error(
- PROTOCOL_FAILED_REQUEST,
- "",
- "",
- "Triple request error: service=" + serviceName + " method" +
methodName,
- status.asException());
- }
-
- protected ServerCall.Listener startInternalCall(
- RpcInvocation invocation, MethodDescriptor methodDescriptor,
Invoker<?> invoker) {
- this.cancellationContext = RpcContext.getCancellationContext();
- ServerCallToObserverAdapter<Object> responseObserver =
- new ServerCallToObserverAdapter<>(this, cancellationContext);
- try {
- ServerCall.Listener listener;
- switch (methodDescriptor.getRpcType()) {
- case UNARY:
- listener = new UnaryServerCallListener(
- invocation, invoker, responseObserver,
packableMethod.needWrapper());
- request(2);
- break;
- case SERVER_STREAM:
- listener = new ServerStreamServerCallListener(invocation,
invoker, responseObserver);
- request(2);
- break;
- case BI_STREAM:
- case CLIENT_STREAM:
- listener = new BiStreamServerCallListener(invocation,
invoker, responseObserver);
- request(1);
- break;
- default:
- throw new IllegalStateException("Can not reach here");
- }
- return listener;
- } catch (Exception e) {
- LOGGER.error(PROTOCOL_FAILED_CREATE_STREAM_TRIPLE, "", "", "Create
triple stream failed", e);
- responseErr(TriRpcStatus.INTERNAL
- .withDescription("Create stream failed")
- .withCause(e));
- }
- return null;
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCallListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCallListener.java
deleted file mode 100644
index 7fa838d8d7..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCallListener.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
-import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
-
-import java.net.InetSocketAddress;
-
-import static
org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY;
-import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_TIMEOUT_SERVER;
-
-public abstract class AbstractServerCallListener implements
AbstractServerCall.Listener {
-
- private static final ErrorTypeAwareLogger LOGGER =
-
LoggerFactory.getErrorTypeAwareLogger(AbstractServerCallListener.class);
- public final CancellationContext cancellationContext;
- final RpcInvocation invocation;
- final Invoker<?> invoker;
- final ServerCallToObserverAdapter<Object> responseObserver;
-
- public AbstractServerCallListener(
- RpcInvocation invocation, Invoker<?> invoker,
ServerCallToObserverAdapter<Object> responseObserver) {
- this.invocation = invocation;
- this.invoker = invoker;
- this.cancellationContext = responseObserver.cancellationContext;
- this.responseObserver = responseObserver;
- }
-
- public void invoke() {
- RpcContext.restoreCancellationContext(cancellationContext);
- InetSocketAddress remoteAddress =
- (InetSocketAddress)
invocation.getAttributes().remove(AbstractServerCall.REMOTE_ADDRESS_KEY);
- RpcContext.getServiceContext().setRemoteAddress(remoteAddress);
- String remoteApp = (String)
invocation.getAttributes().remove(TripleHeaderEnum.CONSUMER_APP_NAME_KEY);
- if (null != remoteApp) {
- RpcContext.getServiceContext().setRemoteApplicationName(remoteApp);
- invocation.setAttachmentIfAbsent(REMOTE_APPLICATION_KEY,
remoteApp);
- }
- final long stInMillis = System.currentTimeMillis();
- try {
- final Result response = invoker.invoke(invocation);
- response.whenCompleteWithContext((r, t) -> {
-
responseObserver.setResponseAttachments(response.getObjectAttachments());
- if (t != null) {
- responseObserver.onError(t);
- return;
- }
- if (response.hasException()) {
- doOnResponseHasException(response.getException());
- return;
- }
- final long cost = System.currentTimeMillis() - stInMillis;
- if (responseObserver.isTimeout(cost)) {
- LOGGER.error(
- PROTOCOL_TIMEOUT_SERVER,
- "",
- "",
- String.format(
- "Invoke timeout at server side, ignored to
send response. service=%s method=%s cost=%s",
- invocation.getTargetServiceUniqueName(),
invocation.getMethodName(), cost));
-
responseObserver.onCompleted(TriRpcStatus.DEADLINE_EXCEEDED);
- return;
- }
- onReturn(r.getValue());
- });
- } catch (Exception e) {
- responseObserver.onError(e);
- } finally {
- RpcContext.removeCancellationContext();
- RpcContext.removeContext();
- }
- }
-
- protected void doOnResponseHasException(Throwable t) {
- responseObserver.onError(t);
- }
-
- public abstract void onReturn(Object value);
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
deleted file mode 100644
index e37ac9d006..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
-
-public class BiStreamServerCallListener extends AbstractServerCallListener {
-
- private StreamObserver<Object> requestObserver;
-
- public BiStreamServerCallListener(
- RpcInvocation invocation, Invoker<?> invoker,
ServerCallToObserverAdapter<Object> responseObserver) {
- super(invocation, invoker, responseObserver);
- invocation.setArguments(new Object[] {responseObserver});
- invoke();
- }
-
- @Override
- public void onReturn(Object value) {
- this.requestObserver = (StreamObserver<Object>) value;
- }
-
- @Override
- public void onMessage(Object message, int actualContentLength) {
- if (message instanceof Object[]) {
- message = ((Object[]) message)[0];
- }
- requestObserver.onNext(message);
- if (responseObserver.isAutoRequestN()) {
- responseObserver.request(1);
- }
- }
-
- @Override
- public void onCancel(TriRpcStatus status) {
- requestObserver.onError(status.asException());
- }
-
- @Override
- public void onComplete() {
- requestObserver.onCompleted();
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionAbstractServerCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionAbstractServerCall.java
deleted file mode 100644
index 4be61a00df..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionAbstractServerCall.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.rpc.HeaderFilter;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.rpc.model.MethodDescriptor.RpcType;
-import org.apache.dubbo.rpc.model.PackableMethod;
-import org.apache.dubbo.rpc.model.PackableMethodFactory;
-import org.apache.dubbo.rpc.model.ProviderModel;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.ClassLoadUtil;
-import org.apache.dubbo.rpc.protocol.tri.TripleCustomerProtocolWapper;
-import org.apache.dubbo.rpc.protocol.tri.stream.ServerStream;
-import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
-import io.netty.handler.codec.http.HttpHeaderNames;
-
-import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
-import static
org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_METHOD_FACTORY;
-
-public class ReflectionAbstractServerCall extends AbstractServerCall {
-
- private static final String PACKABLE_METHOD_CACHE =
"PACKABLE_METHOD_CACHE";
- private final List<HeaderFilter> headerFilters;
- private List<MethodDescriptor> methodDescriptors;
-
- public ReflectionAbstractServerCall(
- Invoker<?> invoker,
- ServerStream serverStream,
- FrameworkModel frameworkModel,
- String acceptEncoding,
- String serviceName,
- String methodName,
- List<HeaderFilter> headerFilters,
- Executor executor) {
- super(
- invoker,
- serverStream,
- frameworkModel,
- getServiceDescriptor(invoker.getUrl()),
- acceptEncoding,
- serviceName,
- methodName,
- executor);
- this.headerFilters = headerFilters;
- }
-
- private static ServiceDescriptor getServiceDescriptor(URL url) {
- ProviderModel providerModel = (ProviderModel) url.getServiceModel();
- if (providerModel == null || providerModel.getServiceModel() == null) {
- return null;
- }
- return providerModel.getServiceModel();
- }
-
- private boolean isEcho(String methodName) {
- return CommonConstants.$ECHO.equals(methodName);
- }
-
- private boolean isGeneric(String methodName) {
- return CommonConstants.$INVOKE.equals(methodName) ||
CommonConstants.$INVOKE_ASYNC.equals(methodName);
- }
-
- @Override
- public void startCall() {
- if (isGeneric(methodName)) {
- // There should be one and only one
- methodDescriptor = ServiceDescriptorInternalCache.genericService()
- .getMethods(methodName)
- .get(0);
- } else if (isEcho(methodName)) {
- // There should be one and only one
- methodDescriptor = ServiceDescriptorInternalCache.echoService()
- .getMethods(methodName)
- .get(0);
- } else {
- methodDescriptors = serviceDescriptor.getMethods(methodName);
- // try lower-case method
- if (CollectionUtils.isEmpty(methodDescriptors)) {
- final String lowerMethod =
Character.toLowerCase(methodName.charAt(0)) + methodName.substring(1);
- methodDescriptors = serviceDescriptor.getMethods(lowerMethod);
- }
- if (CollectionUtils.isEmpty(methodDescriptors)) {
- responseErr(TriRpcStatus.UNIMPLEMENTED.withDescription(
- "Method : " + methodName + " not found of service:" +
serviceName));
- return;
- }
- // In most cases there is only one method
- if (methodDescriptors.size() == 1) {
- methodDescriptor = methodDescriptors.get(0);
- }
- // generated unary method ,use unary type
- // Response foo(Request)
- // void foo(Request,StreamObserver<Response>)
- if (methodDescriptors.size() == 2) {
- if (methodDescriptors.get(1).getRpcType() ==
RpcType.SERVER_STREAM) {
- methodDescriptor = methodDescriptors.get(0);
- } else if (methodDescriptors.get(0).getRpcType() ==
RpcType.SERVER_STREAM) {
- methodDescriptor = methodDescriptors.get(1);
- }
- }
- }
- if (methodDescriptor != null) {
- loadPackableMethod(invoker.getUrl());
- }
- trySetListener();
- if (listener == null) {
- // wrap request , need one message
- request(1);
- }
- }
-
- private void trySetListener() {
- if (listener != null) {
- return;
- }
- if (methodDescriptor == null) {
- return;
- }
- if (isClosed()) {
- return;
- }
- RpcInvocation invocation = buildInvocation(methodDescriptor);
- if (isClosed()) {
- return;
- }
- headerFilters.forEach(f -> f.invoke(invoker, invocation));
- if (isClosed()) {
- return;
- }
- listener =
ReflectionAbstractServerCall.this.startInternalCall(invocation,
methodDescriptor, invoker);
- }
-
- @Override
- protected Object parseSingleMessage(byte[] data) throws Exception {
- trySetMethodDescriptor(data);
- trySetListener();
- if (isClosed()) {
- return null;
- }
-
ClassLoadUtil.switchContextLoader(invoker.getUrl().getServiceModel().getClassLoader());
- return packableMethod.getRequestUnpack().unpack(data);
- }
-
- private void trySetMethodDescriptor(byte[] data) {
- if (methodDescriptor != null) {
- return;
- }
- final TripleCustomerProtocolWapper.TripleRequestWrapper request;
- request =
TripleCustomerProtocolWapper.TripleRequestWrapper.parseFrom(data);
-
- final String[] paramTypes =
- request.getArgTypes().toArray(new
String[request.getArgs().size()]);
- // wrapper mode the method can overload so maybe list
- for (MethodDescriptor descriptor : methodDescriptors) {
- // params type is array
- if (Arrays.equals(descriptor.getCompatibleParamSignatures(),
paramTypes)) {
- methodDescriptor = descriptor;
- break;
- }
- }
- if (methodDescriptor == null) {
- ReflectionAbstractServerCall.this.close(
- TriRpcStatus.UNIMPLEMENTED.withDescription(
- "Method :" + methodName + "[" +
Arrays.toString(paramTypes) + "] " + "not found of service:"
- + serviceDescriptor.getInterfaceName()),
- null);
- return;
- }
- loadPackableMethod(invoker.getUrl());
- }
-
- @SuppressWarnings("unchecked")
- private void loadPackableMethod(URL url) {
- Map<MethodDescriptor, PackableMethod> cacheMap =
(Map<MethodDescriptor, PackableMethod>) url.getServiceModel()
- .getServiceMetadata()
- .getAttributeMap()
- .computeIfAbsent(PACKABLE_METHOD_CACHE, (k) -> new
ConcurrentHashMap<>());
- packableMethod = cacheMap.computeIfAbsent(methodDescriptor, (md) ->
frameworkModel
- .getExtensionLoader(PackableMethodFactory.class)
-
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel())
- .getString(DUBBO_PACKABLE_METHOD_FACTORY, DEFAULT_KEY))
- .create(methodDescriptor, url, (String)
requestMetadata.get(HttpHeaderNames.CONTENT_TYPE.toString())));
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
deleted file mode 100644
index e5bdb36513..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.rpc.TriRpcStatus;
-
-import java.util.Map;
-
-/**
- * ServerCall manipulates server details of a RPC call. Request messages are
acquired by {@link
- * Listener}. Backpressure is supported by {@link #request(int)}.Response
messages are sent by
- * {@link ServerCall#sendMessage(Object)}.
- */
-public interface ServerCall {
-
- /**
- * A listener to receive request messages.
- */
- interface Listener {
-
- /**
- * Callback when a request message is received.
- *
- * @param message message received
- * @param actualContentLength actual content length from body
- */
- void onMessage(Object message, int actualContentLength);
-
- /**
- * @param status when the call is canceled.
- */
- void onCancel(TriRpcStatus status);
-
- /**
- * Request completed.
- */
- void onComplete();
- }
-
- /**
- * Send message to client
- *
- * @param message message to send
- */
- void sendMessage(Object message);
-
- /**
- * Request more request data from the client.
- *
- * @param numMessages max number of messages
- */
- void request(int numMessages);
-
- /**
- * Close the call.
- *
- * @param status status of the call to send to the client
- * @param responseAttrs response attachments
- */
- void close(TriRpcStatus status, Map<String, Object> responseAttrs);
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
deleted file mode 100644
index 8e5ea19caf..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
-
-public class ServerStreamServerCallListener extends AbstractServerCallListener
{
-
- public ServerStreamServerCallListener(
- RpcInvocation invocation, Invoker<?> invoker,
ServerCallToObserverAdapter<Object> responseObserver) {
- super(invocation, invoker, responseObserver);
- }
-
- @Override
- public void onReturn(Object value) {}
-
- @Override
- public void onMessage(Object message, int actualContentLength) {
- if (message instanceof Object[]) {
- message = ((Object[]) message)[0];
- }
- invocation.setArguments(new Object[] {message, responseObserver});
- }
-
- @Override
- public void onCancel(TriRpcStatus status) {
- responseObserver.onError(status.asException());
- }
-
- @Override
- public void onComplete() {
- invoke();
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/StubAbstractServerCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/StubAbstractServerCall.java
deleted file mode 100644
index 139c42c415..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/StubAbstractServerCall.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
-import org.apache.dubbo.rpc.model.StubMethodDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.stream.ServerStream;
-import org.apache.dubbo.rpc.stub.StubSuppliers;
-
-import java.util.concurrent.Executor;
-
-public class StubAbstractServerCall extends AbstractServerCall {
-
- public StubAbstractServerCall(
- Invoker<?> invoker,
- ServerStream serverStream,
- FrameworkModel frameworkModel,
- String acceptEncoding,
- String serviceName,
- String methodName,
- Executor executor) {
- super(
- invoker,
- serverStream,
- frameworkModel,
- getServiceDescriptor(invoker.getUrl(), serviceName),
- acceptEncoding,
- serviceName,
- methodName,
- executor);
- this.methodDescriptor =
serviceDescriptor.getMethods(methodName).get(0);
- this.packableMethod = (StubMethodDescriptor) methodDescriptor;
- }
-
- private static ServiceDescriptor getServiceDescriptor(URL url, String
serviceName) {
- ServiceDescriptor serviceDescriptor;
- if (url.getServiceModel() != null) {
- serviceDescriptor = url.getServiceModel().getServiceModel();
- } else {
- serviceDescriptor =
StubSuppliers.getServiceDescriptor(serviceName);
- }
- return serviceDescriptor;
- }
-
- @Override
- protected Object parseSingleMessage(byte[] data) throws Exception {
- return packableMethod.parseRequest(data);
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
deleted file mode 100644
index 9e6da9158f..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.remoting.Constants;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
-
-public class UnaryServerCallListener extends AbstractServerCallListener {
-
- private final boolean needWrapper;
-
- public UnaryServerCallListener(
- RpcInvocation invocation,
- Invoker<?> invoker,
- ServerCallToObserverAdapter<Object> responseObserver,
- boolean needWrapper) {
- super(invocation, invoker, responseObserver);
- this.needWrapper = needWrapper;
- }
-
- @Override
- public void onReturn(Object value) {
- responseObserver.onNext(value);
- responseObserver.onCompleted();
- }
-
- @Override
- public void onMessage(Object message, int actualContentLength) {
- if (message instanceof Object[]) {
- invocation.setArguments((Object[]) message);
- } else {
- invocation.setArguments(new Object[] {message});
- }
- invocation.put(Constants.CONTENT_LENGTH_KEY, actualContentLength);
- }
-
- @Override
- public void onCancel(TriRpcStatus status) {
- // ignored
- }
-
- @Override
- protected void doOnResponseHasException(Throwable t) {
- if (needWrapper) {
- onReturnException((Exception) t);
- } else {
- super.doOnResponseHasException(t);
- }
- }
-
- private void onReturnException(Exception value) {
- TriRpcStatus status = TriRpcStatus.getStatus(value);
- int exceptionCode = status.code.code;
- if (exceptionCode == TriRpcStatus.UNKNOWN.code.code) {
- exceptionCode = RpcException.BIZ_EXCEPTION;
- }
- responseObserver.setExceptionCode(exceptionCode);
- responseObserver.setNeedReturnException(true);
- onReturn(value);
- }
-
- @Override
- public void onComplete() {
- invoke();
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerCallListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerCallListener.java
index 689d9d03e3..6b365fc59c 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerCallListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerCallListener.java
@@ -26,7 +26,6 @@ import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
-import org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCall;
import java.net.InetSocketAddress;
@@ -57,7 +56,7 @@ public abstract class AbstractServerCallListener implements
ServerCallListener {
((Http2CancelableStreamObserver<Object>)
responseObserver).getCancellationContext());
}
InetSocketAddress remoteAddress =
- (InetSocketAddress)
invocation.getAttributes().remove(AbstractServerCall.REMOTE_ADDRESS_KEY);
+ (InetSocketAddress)
invocation.getAttributes().remove("tri.remote.address");
RpcContext.getServiceContext().setRemoteAddress(remoteAddress);
String remoteApp = (String)
invocation.getAttributes().remove(TripleHeaderEnum.CONSUMER_APP_NAME_KEY);
if (null != remoteApp) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
index de216cee56..f64f70cc7c 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
@@ -34,7 +34,6 @@ import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.protocol.tri.TripleCustomerProtocolWapper;
-import org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCall;
import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
import org.apache.dubbo.rpc.protocol.tri.h12.HttpMessageListener;
@@ -54,7 +53,8 @@ import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAI
public class GrpcHttp2ServerTransportListener extends
GenericHttp2ServerTransportListener
implements Http2TransportListener {
- private static final ErrorTypeAwareLogger LOGGER =
LoggerFactory.getErrorTypeAwareLogger(AbstractServerCall.class);
+ private static final ErrorTypeAwareLogger LOGGER =
+
LoggerFactory.getErrorTypeAwareLogger(GrpcHttp2ServerTransportListener.class);
public GrpcHttp2ServerTransportListener(H2StreamChannel h2StreamChannel,
URL url, FrameworkModel frameworkModel) {
super(h2StreamChannel, url, frameworkModel);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
deleted file mode 100644
index f0d84f1939..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.observer;
-
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.ServerStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCall;
-
-import java.util.Map;
-
-public class ServerCallToObserverAdapter<T> extends
CancelableStreamObserver<T> implements ServerStreamObserver<T> {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(CancelableStreamObserver.class);
- public final CancellationContext cancellationContext;
- private final AbstractServerCall call;
- private Map<String, Object> attachments;
- private boolean terminated = false;
-
- private boolean isNeedReturnException = false;
-
- private Integer exceptionCode =
CommonConstants.TRI_EXCEPTION_CODE_NOT_EXISTS;
-
- public Integer getExceptionCode() {
- return exceptionCode;
- }
-
- public void setExceptionCode(Integer exceptionCode) {
- this.exceptionCode = exceptionCode;
- }
-
- public boolean isNeedReturnException() {
- return isNeedReturnException;
- }
-
- public void setNeedReturnException(boolean needReturnException) {
- isNeedReturnException = needReturnException;
- }
-
- public ServerCallToObserverAdapter(AbstractServerCall call,
CancellationContext cancellationContext) {
- this.call = call;
- this.cancellationContext = cancellationContext;
- }
-
- public boolean isAutoRequestN() {
- return call.isAutoRequestN();
- }
-
- public boolean isTerminated() {
- return terminated;
- }
-
- private void setTerminated() {
- this.terminated = true;
- }
-
- @Override
- public void onNext(Object data) {
- if (isTerminated()) {
- throw new IllegalStateException("Stream observer has been
terminated, no more data is allowed");
- }
- call.setExceptionCode(exceptionCode);
- call.setNeedReturnException(isNeedReturnException);
- call.sendMessage(data);
- }
-
- @Override
- public void onError(Throwable throwable) {
- final TriRpcStatus status = TriRpcStatus.getStatus(throwable);
- onCompleted(status);
- }
-
- public void onCompleted(TriRpcStatus status) {
- if (isTerminated()) {
- return;
- }
- call.setExceptionCode(exceptionCode);
- call.setNeedReturnException(isNeedReturnException);
- call.close(status, attachments);
- setTerminated();
- }
-
- @Override
- public void onCompleted() {
- onCompleted(TriRpcStatus.OK);
- }
-
- public void setResponseAttachments(Map<String, Object> attachments) {
- this.attachments = attachments;
- }
-
- @Override
- public void setCompression(String compression) {
- call.setCompression(compression);
- }
-
- public void cancel(Throwable throwable) {
- if (terminated) {
- return;
- }
- setTerminated();
- call.cancelByLocal(throwable);
- }
-
- public boolean isTimeout(long cost) {
- return call.timeout != null && call.timeout < cost;
- }
-
- @Override
- public void disableAutoFlowControl() {
- call.disableAutoRequestN();
- }
-
- @Override
- public void request(int count) {
- call.request(count);
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ServerStream.java
deleted file mode 100644
index 88f45284d4..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ServerStream.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.stream;
-
-import org.apache.dubbo.rpc.TriRpcStatus;
-
-import java.util.Map;
-
-import io.netty.util.concurrent.Future;
-
-/**
- * ServerStream is used to send response to client and receive requests from
client. {@link
- * Listener} is used to receive requests from client.
- */
-public interface ServerStream extends Stream {
-
- interface Listener extends Stream.Listener {
-
- /**
- * Callback when receive headers
- *
- * @param headers headers received from remote peer
- */
- void onHeader(Map<String, Object> headers);
-
- /**
- * Callback when no more data from client side
- */
- void onComplete();
- }
-
- /**
- * Complete the stream, send response to client
- *
- * @param status response status
- * @param attachments response attachments
- * @return a future that indicates the completion of send trailers
- */
- Future<?> complete(
- TriRpcStatus status, Map<String, Object> attachments, boolean
isNeedReturnException, int exceptionCode);
-
- /**
- * Send message to client
- *
- * @param message raw message
- * @param compressFlag whether to compress the message
- * @return a future that indicates the completion of send message
- */
- Future<?> sendMessage(byte[] message, int compressFlag);
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
deleted file mode 100644
index 0aaca48491..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.stream;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.rpc.HeaderFilter;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.PathResolver;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.protocol.tri.ExceptionUtils;
-import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
-import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
-import org.apache.dubbo.rpc.protocol.tri.TripleProtocol;
-import org.apache.dubbo.rpc.protocol.tri.call.ReflectionAbstractServerCall;
-import org.apache.dubbo.rpc.protocol.tri.call.StubAbstractServerCall;
-import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
-import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
-import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
-import org.apache.dubbo.rpc.protocol.tri.command.TextDataQueueCommand;
-import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
-import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
-import org.apache.dubbo.rpc.protocol.tri.frame.Deframer;
-import org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder;
-import org.apache.dubbo.rpc.protocol.tri.transport.AbstractH2TransportListener;
-import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
-import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.Executor;
-
-import com.google.protobuf.Any;
-import com.google.rpc.DebugInfo;
-import com.google.rpc.Status;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpUtil;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.Http2Error;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2StreamChannel;
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.Future;
-
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
-
-public class TripleServerStream extends AbstractStream implements ServerStream
{
-
- private static final ErrorTypeAwareLogger LOGGER =
LoggerFactory.getErrorTypeAwareLogger(TripleServerStream.class);
- public final ServerTransportObserver transportObserver = new
ServerTransportObserver();
- private final TripleWriteQueue writeQueue;
- private final PathResolver pathResolver;
- private final List<HeaderFilter> filters;
- private final String acceptEncoding;
- private boolean headerSent;
- private boolean trailersSent;
- private volatile boolean reset;
- private ServerStream.Listener listener;
- private final InetSocketAddress remoteAddress;
- private Deframer deframer;
- private boolean rst = false;
- private final Http2StreamChannel http2StreamChannel;
- private final TripleStreamChannelFuture tripleStreamChannelFuture;
-
- public TripleServerStream(
- Http2StreamChannel channel,
- FrameworkModel frameworkModel,
- Executor executor,
- PathResolver pathResolver,
- String acceptEncoding,
- List<HeaderFilter> filters,
- TripleWriteQueue writeQueue) {
- super(executor, frameworkModel);
- this.pathResolver = pathResolver;
- this.acceptEncoding = acceptEncoding;
- this.filters = filters;
- this.writeQueue = writeQueue;
- this.remoteAddress = (InetSocketAddress) channel.remoteAddress();
- this.http2StreamChannel = channel;
- this.tripleStreamChannelFuture = new
TripleStreamChannelFuture(channel);
- }
-
- @Override
- public SocketAddress remoteAddress() {
- return remoteAddress;
- }
-
- @Override
- public void request(int n) {
- deframer.request(n);
- }
-
- public ChannelFuture reset(Http2Error cause) {
- ChannelFuture checkResult = preCheck();
- if (!checkResult.isSuccess()) {
- return checkResult;
- }
- this.rst = true;
- return
writeQueue.enqueue(CancelQueueCommand.createCommand(tripleStreamChannelFuture,
cause));
- }
-
- @Override
- public ChannelFuture sendHeader(Http2Headers headers) {
- if (reset) {
- return http2StreamChannel.newFailedFuture(
- new IllegalStateException("Stream already reset, no more
headers allowed"));
- }
- if (headerSent) {
- return http2StreamChannel.newFailedFuture(new
IllegalStateException("Header already sent"));
- }
- if (trailersSent) {
- return http2StreamChannel.newFailedFuture(new
IllegalStateException("Trailers already sent"));
- }
- ChannelFuture checkResult = preCheck();
- if (!checkResult.isSuccess()) {
- return checkResult;
- }
- headerSent = true;
- return writeQueue
-
.enqueue(HeaderQueueCommand.createHeaders(tripleStreamChannelFuture, headers,
false))
- .addListener(f -> {
- if (!f.isSuccess()) {
- reset(Http2Error.INTERNAL_ERROR);
- }
- });
- }
-
- @Override
- public Future<?> cancelByLocal(TriRpcStatus status) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("Cancel stream:%s by local: %s",
http2StreamChannel, status));
- }
- return reset(Http2Error.CANCEL);
- }
-
- @Override
- public ChannelFuture complete(
- TriRpcStatus status, Map<String, Object> attachments, boolean
isNeedReturnException, int exceptionCode) {
- Http2Headers trailers =
- getTrailers(status, attachments, isNeedReturnException,
CommonConstants.TRI_EXCEPTION_CODE_NOT_EXISTS);
- return sendTrailers(trailers);
- }
-
- private ChannelFuture sendTrailers(Http2Headers trailers) {
- if (reset) {
- return http2StreamChannel.newFailedFuture(
- new IllegalStateException("Stream already reset, no more
trailers allowed"));
- }
- if (trailersSent) {
- return http2StreamChannel.newFailedFuture(new
IllegalStateException("Trailers already sent"));
- }
- ChannelFuture checkResult = preCheck();
- if (!checkResult.isSuccess()) {
- return checkResult;
- }
- headerSent = true;
- trailersSent = true;
- return writeQueue
-
.enqueue(HeaderQueueCommand.createHeaders(tripleStreamChannelFuture, trailers,
true))
- .addListener(f -> {
- if (!f.isSuccess()) {
- reset(Http2Error.INTERNAL_ERROR);
- }
- });
- }
-
- private Http2Headers getTrailers(
- TriRpcStatus rpcStatus, Map<String, Object> attachments, boolean
isNeedReturnException, int exceptionCode) {
- DefaultHttp2Headers headers = new DefaultHttp2Headers();
- if (!headerSent) {
- headers.status(HttpResponseStatus.OK.codeAsText());
- headers.set(HttpHeaderNames.CONTENT_TYPE,
TripleConstant.CONTENT_PROTO);
- }
- StreamUtils.convertAttachment(headers, attachments,
TripleProtocol.CONVERT_NO_LOWER_HEADER);
- headers.set(TripleHeaderEnum.STATUS_KEY.getHeader(),
String.valueOf(rpcStatus.code.code));
- if (rpcStatus.isOk()) {
- return headers;
- }
- String grpcMessage = getGrpcMessage(rpcStatus);
- grpcMessage =
TriRpcStatus.encodeMessage(TriRpcStatus.limitSizeTo1KB(grpcMessage));
- headers.set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), grpcMessage);
- if (!getGrpcStatusDetailEnabled()) {
- return headers;
- }
- Status.Builder builder =
-
Status.newBuilder().setCode(rpcStatus.code.code).setMessage(grpcMessage);
- Throwable throwable = rpcStatus.cause;
- if (throwable == null) {
- Status status = builder.build();
- headers.set(
- TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
- StreamUtils.encodeBase64ASCII(status.toByteArray()));
- return headers;
- }
- DebugInfo debugInfo = DebugInfo.newBuilder()
-
.addAllStackEntries(ExceptionUtils.getStackFrameList(throwable, 6))
- // can not use now
- // .setDetail(throwable.getMessage())
- .build();
- builder.addDetails(Any.pack(debugInfo));
- Status status = builder.build();
- headers.set(
- TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
StreamUtils.encodeBase64ASCII(status.toByteArray()));
- return headers;
- }
-
- private String getGrpcMessage(TriRpcStatus status) {
- if (StringUtils.isNotEmpty(status.description)) {
- return status.description;
- }
- return
Optional.ofNullable(status.cause).map(Throwable::getMessage).orElse("unknown");
- }
-
- @Override
- public ChannelFuture sendMessage(byte[] message, int compressFlag) {
- if (reset) {
- return http2StreamChannel.newFailedFuture(
- new IllegalStateException("Stream already reset, no more
body allowed"));
- }
- if (!headerSent) {
- return http2StreamChannel.newFailedFuture(
- new IllegalStateException("Headers did not sent before
send body"));
- }
- if (trailersSent) {
- return http2StreamChannel.newFailedFuture(
- new IllegalStateException("Trailers already sent, no more
body allowed"));
- }
- ChannelFuture checkResult = preCheck();
- if (!checkResult.isSuccess()) {
- return checkResult;
- }
- return
writeQueue.enqueue(DataQueueCommand.create(tripleStreamChannelFuture, message,
false, compressFlag));
- }
-
- /**
- * Error before create server stream, http plain text will be returned
- *
- * @param code code of error
- * @param status status of error
- */
- private void responsePlainTextError(int code, TriRpcStatus status) {
- ChannelFuture checkResult = preCheck();
- if (!checkResult.isSuccess()) {
- return;
- }
- Http2Headers headers = new DefaultHttp2Headers(true)
- .status(String.valueOf(code))
- .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(),
status.code.code)
- .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(),
status.description)
- .set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(),
TripleConstant.TEXT_PLAIN_UTF8);
-
writeQueue.enqueue(HeaderQueueCommand.createHeaders(tripleStreamChannelFuture,
headers, false));
-
writeQueue.enqueue(TextDataQueueCommand.createCommand(tripleStreamChannelFuture,
status.description, true));
- }
-
- /**
- * Error in create stream, unsupported config or triple protocol error.
There is no return value
- * because stream will be reset if send trailers failed.
- *
- * @param status status of error
- */
- private void responseErr(TriRpcStatus status) {
- Http2Headers trailers = new DefaultHttp2Headers()
- .status(OK.codeAsText())
- .set(HttpHeaderNames.CONTENT_TYPE,
TripleConstant.CONTENT_PROTO)
- .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(),
status.code.code)
- .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(),
status.toEncodedMessage());
- sendTrailers(trailers);
- }
-
- private Invoker<?> getInvoker(Http2Headers headers, String serviceName) {
- final String version =
headers.contains(TripleHeaderEnum.SERVICE_VERSION.getHeader())
- ?
headers.get(TripleHeaderEnum.SERVICE_VERSION.getHeader()).toString()
- : null;
- final String group =
headers.contains(TripleHeaderEnum.SERVICE_GROUP.getHeader())
- ?
headers.get(TripleHeaderEnum.SERVICE_GROUP.getHeader()).toString()
- : null;
- final String key = URL.buildKey(serviceName, group, version);
- Invoker<?> invoker = pathResolver.resolve(key);
- if (invoker == null && TripleProtocol.RESOLVE_FALLBACK_TO_DEFAULT) {
- invoker = pathResolver.resolve(URL.buildKey(serviceName, group,
"1.0.0"));
- }
- if (invoker == null && TripleProtocol.RESOLVE_FALLBACK_TO_DEFAULT) {
- invoker = pathResolver.resolve(serviceName);
- }
- return invoker;
- }
-
- private ChannelFuture preCheck() {
- if (!http2StreamChannel.isActive()) {
- return http2StreamChannel.newFailedFuture(new IOException("stream
channel is closed"));
- }
- if (rst) {
- return http2StreamChannel.newFailedFuture(new IOException("stream
channel has reset"));
- }
- return http2StreamChannel.newSucceededFuture();
- }
-
- public class ServerTransportObserver extends AbstractH2TransportListener
implements H2TransportListener {
-
- /**
- * must starts from application/grpc
- */
- private boolean supportContentType(String contentType) {
- if (contentType == null) {
- return false;
- }
- return contentType.startsWith(TripleConstant.APPLICATION_GRPC);
- }
-
- @Override
- public void onHeader(Http2Headers headers, boolean endStream) {
- executor.execute(() -> processHeader(headers, endStream));
- }
-
- private void processHeader(Http2Headers headers, boolean endStream) {
- if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) {
- responsePlainTextError(
- HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
- TriRpcStatus.INTERNAL.withDescription(
- String.format("Method '%s' is not supported",
headers.method())));
- return;
- }
-
- if (headers.path() == null) {
- responsePlainTextError(
- HttpResponseStatus.NOT_FOUND.code(),
-
TriRpcStatus.fromCode(TriRpcStatus.Code.UNIMPLEMENTED.code)
- .withDescription("Expected path but is
missing"));
- return;
- }
-
- final String path = headers.path().toString();
- if (path.charAt(0) != '/') {
- responsePlainTextError(
- HttpResponseStatus.NOT_FOUND.code(),
-
TriRpcStatus.fromCode(TriRpcStatus.Code.UNIMPLEMENTED.code)
- .withDescription(String.format("Expected path
to start with /: %s", path)));
- return;
- }
-
- final CharSequence contentType =
HttpUtil.getMimeType(headers.get(HttpHeaderNames.CONTENT_TYPE));
- if (contentType == null) {
- responsePlainTextError(
- HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
- TriRpcStatus.fromCode(TriRpcStatus.Code.INTERNAL.code)
- .withDescription("Content-Type is missing from
the request"));
- return;
- }
-
- final String contentString = contentType.toString();
- if (!supportContentType(contentString)) {
- responsePlainTextError(
- HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
- TriRpcStatus.fromCode(TriRpcStatus.Code.INTERNAL.code)
- .withDescription(String.format("Content-Type
'%s' is not supported", contentString)));
- return;
- }
-
- String[] parts = path.split("/");
- if (parts.length != 3) {
- responseErr(TriRpcStatus.UNIMPLEMENTED.withDescription("Bad
path format:" + path));
- return;
- }
- String serviceName = parts[1];
- String originalMethodName = parts[2];
-
- Invoker<?> invoker = getInvoker(headers, serviceName);
- if (invoker == null) {
-
responseErr(TriRpcStatus.UNIMPLEMENTED.withDescription("Service not found:" +
serviceName));
- return;
- }
-
- if (endStream) {
- return;
- }
-
- DeCompressor deCompressor = DeCompressor.NONE;
- CharSequence messageEncoding =
headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
- if (null != messageEncoding) {
- String compressorStr = messageEncoding.toString();
- if (!Identity.MESSAGE_ENCODING.equals(compressorStr)) {
- DeCompressor compressor =
DeCompressor.getCompressor(frameworkModel, compressorStr);
- if (null == compressor) {
-
responseErr(TriRpcStatus.fromCode(TriRpcStatus.Code.UNIMPLEMENTED.code)
- .withDescription(String.format("Grpc-encoding
'%s' is not supported", compressorStr)));
- return;
- }
- deCompressor = compressor;
- }
- }
-
- Map<String, Object> requestMetadata = headersToMap(
- headers, () ->
Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()))
- .map(CharSequence::toString)
- .orElse(null));
- boolean hasStub = pathResolver.hasNativeStub(path);
- if (hasStub) {
- listener = new StubAbstractServerCall(
- invoker,
- TripleServerStream.this,
- frameworkModel,
- acceptEncoding,
- serviceName,
- originalMethodName,
- executor);
- } else {
- listener = new ReflectionAbstractServerCall(
- invoker,
- TripleServerStream.this,
- frameworkModel,
- acceptEncoding,
- serviceName,
- originalMethodName,
- filters,
- executor);
- }
- // must before onHeader
- deframer = new TriDecoder(deCompressor, new
ServerDecoderListener(listener));
- listener.onHeader(requestMetadata);
- }
-
- @Override
- public void onData(ByteBuf data, boolean endStream) {
- try {
- executor.execute(() -> doOnData(data, endStream));
- } catch (Throwable t) {
- // Tasks will be rejected when the thread pool is closed or
full,
- // ByteBuf needs to be released to avoid out of heap memory
leakage.
- // For example, ThreadLessExecutor will be shutdown when
request timeout {@link AsyncRpcResult}
- ReferenceCountUtil.release(data);
- LOGGER.error(PROTOCOL_FAILED_REQUEST, "", "", "submit onData
task failed", t);
- }
- }
-
- private void doOnData(ByteBuf data, boolean endStream) {
- if (deframer == null) {
- return;
- }
- deframer.deframe(data);
- if (endStream) {
- deframer.close();
- }
- }
-
- @Override
- public void cancelByRemote(long errorCode) {
- TripleServerStream.this.reset = true;
- if (!trailersSent) {
- // send rst if stream not closed
- reset(Http2Error.valueOf(errorCode));
- }
- if (listener == null) {
- return;
- }
- executor.execute(() -> listener.onCancelByRemote(
- TriRpcStatus.CANCELLED.withDescription("Canceled by client
,errorCode=" + errorCode)));
- }
- }
-
- private static class ServerDecoderListener implements TriDecoder.Listener {
-
- private final ServerStream.Listener listener;
-
- public ServerDecoderListener(ServerStream.Listener listener) {
- this.listener = listener;
- }
-
- @Override
- public void onRawMessage(byte[] data) {
- listener.onMessage(data, false);
- }
-
- @Override
- public void close() {
- listener.onComplete();
- }
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
deleted file mode 100644
index daebefc339..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.transport;
-
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.rpc.HeaderFilter;
-import org.apache.dubbo.rpc.PathResolver;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.executor.ExecutorSupport;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
-import org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream;
-
-import java.util.List;
-import java.util.concurrent.Executor;
-
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http2.Http2DataFrame;
-import io.netty.handler.codec.http2.Http2HeadersFrame;
-import io.netty.handler.codec.http2.Http2ResetFrame;
-import io.netty.handler.codec.http2.Http2StreamChannel;
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.ReferenceCounted;
-
-import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_RESPONSE;
-
-public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
-
- private static final ErrorTypeAwareLogger LOGGER =
-
LoggerFactory.getErrorTypeAwareLogger(TripleHttp2FrameServerHandler.class);
- private final PathResolver pathResolver;
- private final ExecutorSupport executorSupport;
- private final String acceptEncoding;
- private final TripleServerStream tripleServerStream;
-
- public TripleHttp2FrameServerHandler(
- FrameworkModel frameworkModel,
- ExecutorSupport executorSupport,
- List<HeaderFilter> filters,
- Http2StreamChannel channel,
- TripleWriteQueue writeQueue) {
- this.executorSupport = executorSupport;
- this.acceptEncoding = String.join(
- ",",
frameworkModel.getExtensionLoader(DeCompressor.class).getSupportedExtensions());
- this.pathResolver =
-
frameworkModel.getExtensionLoader(PathResolver.class).getDefaultExtension();
- // The executor will be assigned in onHeadersRead method
- tripleServerStream = new TripleServerStream(
- channel, frameworkModel, null, pathResolver, acceptEncoding,
filters, writeQueue);
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
- if (msg instanceof Http2HeadersFrame) {
- onHeadersRead(ctx, (Http2HeadersFrame) msg);
- } else if (msg instanceof Http2DataFrame) {
- onDataRead(ctx, (Http2DataFrame) msg);
- } else if (msg instanceof ReferenceCounted) {
- // ignored
- ReferenceCountUtil.release(msg);
- }
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
- if (evt instanceof Http2ResetFrame) {
- onResetRead(ctx, (Http2ResetFrame) evt);
- } else {
- super.userEventTriggered(ctx, evt);
- }
- }
-
- public void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame frame) {
- LOGGER.warn(
- PROTOCOL_FAILED_RESPONSE, "", "", "Triple Server received
remote reset errorCode=" + frame.errorCode());
- if (tripleServerStream != null) {
-
tripleServerStream.transportObserver.cancelByRemote(frame.errorCode());
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn(PROTOCOL_FAILED_RESPONSE, "", "", "Exception in
processing triple message", cause);
- }
- TriRpcStatus status = TriRpcStatus.getStatus(cause, "Provider's
error:\n" + cause.getMessage());
- tripleServerStream.cancelByLocal(status);
- }
-
- public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg)
throws Exception {
- tripleServerStream.transportObserver.onData(msg.content(),
msg.isEndStream());
- }
-
- public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame
msg) throws Exception {
- Executor executor = executorSupport.getExecutor(msg.headers());
- tripleServerStream.setExecutor(executor);
- tripleServerStream.transportObserver.onHeader(msg.headers(),
msg.isEndStream());
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionServerCallTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionServerCallTest.java
deleted file mode 100644
index c598305ce6..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionServerCallTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.rpc.model.ProviderModel;
-import org.apache.dubbo.rpc.model.ReflectionMethodDescriptor;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
-import org.apache.dubbo.rpc.model.ServiceMetadata;
-import org.apache.dubbo.rpc.protocol.tri.DescriptorService;
-import org.apache.dubbo.rpc.protocol.tri.HelloReply;
-import org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream;
-
-import java.lang.reflect.Method;
-import java.util.Collections;
-
-import io.netty.util.concurrent.ImmediateEventExecutor;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.when;
-
-class ReflectionServerCallTest {
-
- @Test
- void doStartCall() throws NoSuchMethodException {
- Invoker<?> invoker = Mockito.mock(Invoker.class);
- TripleServerStream serverStream =
Mockito.mock(TripleServerStream.class);
- ProviderModel providerModel = Mockito.mock(ProviderModel.class);
- ServiceMetadata serviceMetadata = new ServiceMetadata();
- Method method = DescriptorService.class.getMethod("sayHello",
HelloReply.class);
- MethodDescriptor methodDescriptor = new
ReflectionMethodDescriptor(method);
- URL url = Mockito.mock(URL.class);
- when(invoker.getUrl()).thenReturn(url);
- when(url.getServiceModel()).thenReturn(providerModel);
- when(providerModel.getServiceMetadata()).thenReturn(serviceMetadata);
-
- String service = "testService";
- String methodName = "method";
- try {
- ReflectionAbstractServerCall call = new
ReflectionAbstractServerCall(
- invoker,
- serverStream,
- new FrameworkModel(),
- "",
- service,
- methodName,
- Collections.emptyList(),
- ImmediateEventExecutor.INSTANCE);
- fail();
- } catch (Exception e) {
- // pass
- }
-
- ServiceDescriptor serviceDescriptor =
Mockito.mock(ServiceDescriptor.class);
-
when(serviceDescriptor.getMethods(anyString())).thenReturn(Collections.singletonList(methodDescriptor));
-
- when(providerModel.getServiceModel()).thenReturn(serviceDescriptor);
-
- ReflectionAbstractServerCall call2 = new ReflectionAbstractServerCall(
- invoker,
- serverStream,
- new FrameworkModel(),
- "",
- service,
- methodName,
- Collections.emptyList(),
- ImmediateEventExecutor.INSTANCE);
- call2.onHeader(Collections.emptyMap());
- call2.onMessage(new byte[0], false);
- call2.onComplete();
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/StubServerCallTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/StubServerCallTest.java
deleted file mode 100644
index b500e91bbc..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/StubServerCallTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.MethodDescriptor.RpcType;
-import org.apache.dubbo.rpc.model.ProviderModel;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
-import org.apache.dubbo.rpc.model.StubMethodDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream;
-
-import java.util.Collections;
-
-import io.netty.util.concurrent.ImmediateEventExecutor;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.when;
-
-class StubServerCallTest {
-
- @Test
- void doStartCall() throws Exception {
- Invoker<?> invoker = Mockito.mock(Invoker.class);
- TripleServerStream tripleServerStream =
Mockito.mock(TripleServerStream.class);
- ProviderModel providerModel = Mockito.mock(ProviderModel.class);
- ServiceDescriptor serviceDescriptor =
Mockito.mock(ServiceDescriptor.class);
- StubMethodDescriptor methodDescriptor =
Mockito.mock(StubMethodDescriptor.class);
- URL url = Mockito.mock(URL.class);
- when(invoker.getUrl()).thenReturn(url);
- when(url.getServiceModel()).thenReturn(providerModel);
- when(providerModel.getServiceModel()).thenReturn(serviceDescriptor);
-
when(serviceDescriptor.getMethods(anyString())).thenReturn(Collections.singletonList(methodDescriptor));
- when(methodDescriptor.getRpcType()).thenReturn(RpcType.UNARY);
-
when(methodDescriptor.parseRequest(any(byte[].class))).thenReturn("test");
- String service = "testService";
- String method = "method";
- StubAbstractServerCall call = new StubAbstractServerCall(
- invoker,
- tripleServerStream,
- new FrameworkModel(),
- "",
- service,
- method,
- ImmediateEventExecutor.INSTANCE);
- call.onHeader(Collections.emptyMap());
- call.onMessage(new byte[0], false);
- call.onComplete();
- }
-}