This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 46af145730 refactor(triple): Clean up some unused classes to 
facilitate reference analysis (#13619)
46af145730 is described below

commit 46af145730aa34417e3fcb74592f17a3190748f6
Author: Sean Yang <[email protected]>
AuthorDate: Fri Jan 19 14:46:30 2024 +0800

    refactor(triple): Clean up some unused classes to facilitate reference 
analysis (#13619)
---
 .../dubbo/reactive/CreateObserverAdapter.java      |   8 +-
 .../TripleReflectionTypeDescriberRegistrar.java    |   2 -
 .../rpc/protocol/tri/call/AbstractServerCall.java  | 438 ------------------
 .../tri/call/AbstractServerCallListener.java       | 102 -----
 .../tri/call/BiStreamServerCallListener.java       |  61 ---
 .../tri/call/ReflectionAbstractServerCall.java     | 214 ---------
 .../dubbo/rpc/protocol/tri/call/ServerCall.java    |  75 ---
 .../tri/call/ServerStreamServerCallListener.java   |  51 ---
 .../protocol/tri/call/StubAbstractServerCall.java  |  66 ---
 .../protocol/tri/call/UnaryServerCallListener.java |  84 ----
 .../tri/h12/AbstractServerCallListener.java        |   3 +-
 .../h12/grpc/GrpcHttp2ServerTransportListener.java |   4 +-
 .../tri/observer/ServerCallToObserverAdapter.java  | 136 ------
 .../rpc/protocol/tri/stream/ServerStream.java      |  64 ---
 .../protocol/tri/stream/TripleServerStream.java    | 505 ---------------------
 .../transport/TripleHttp2FrameServerHandler.java   | 115 -----
 .../tri/call/ReflectionServerCallTest.java         |  92 ----
 .../rpc/protocol/tri/call/StubServerCallTest.java  |  68 ---
 18 files changed, 7 insertions(+), 2081 deletions(-)

diff --git 
a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
 
b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
index 99f44c7cc5..3f303b5f3f 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
@@ -16,7 +16,7 @@
  */
 package org.apache.dubbo.reactive;
 
-import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
+import org.apache.dubbo.rpc.protocol.tri.ServerStreamObserver;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -28,7 +28,7 @@ import static org.mockito.Mockito.doAnswer;
 
 public class CreateObserverAdapter {
 
-    private ServerCallToObserverAdapter<String> responseObserver;
+    private ServerStreamObserver<String> responseObserver;
     private AtomicInteger nextCounter;
     private AtomicInteger completeCounter;
     private AtomicInteger errorCounter;
@@ -39,7 +39,7 @@ public class CreateObserverAdapter {
         completeCounter = new AtomicInteger();
         errorCounter = new AtomicInteger();
 
-        responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
+        responseObserver = Mockito.mock(ServerStreamObserver.class);
         doAnswer(o -> 
nextCounter.incrementAndGet()).when(responseObserver).onNext(anyString());
         doAnswer(o -> 
completeCounter.incrementAndGet()).when(responseObserver).onCompleted();
         doAnswer(o -> 
errorCounter.incrementAndGet()).when(responseObserver).onError(any(Throwable.class));
@@ -57,7 +57,7 @@ public class CreateObserverAdapter {
         return errorCounter;
     }
 
-    public ServerCallToObserverAdapter<String> getResponseObserver() {
+    public ServerStreamObserver<String> getResponseObserver() {
         return this.responseObserver;
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/aot/TripleReflectionTypeDescriberRegistrar.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/aot/TripleReflectionTypeDescriberRegistrar.java
index 0ac720184b..a0ab8198d4 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/aot/TripleReflectionTypeDescriberRegistrar.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/aot/TripleReflectionTypeDescriberRegistrar.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.aot.api.TypeDescriber;
 import 
org.apache.dubbo.rpc.protocol.tri.transport.TripleCommandOutBoundHandler;
 import org.apache.dubbo.rpc.protocol.tri.transport.TripleGoAwayHandler;
 import 
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2ClientResponseHandler;
-import 
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2FrameServerHandler;
 import 
org.apache.dubbo.rpc.protocol.tri.transport.TripleServerConnectionHandler;
 import org.apache.dubbo.rpc.protocol.tri.transport.TripleTailHandler;
 
@@ -35,7 +34,6 @@ public class TripleReflectionTypeDescriberRegistrar 
implements ReflectionTypeDes
     @Override
     public List<TypeDescriber> getTypeDescribers() {
         List<TypeDescriber> typeDescribers = new ArrayList<>();
-        
typeDescribers.add(buildTypeDescriberWithPublicMethod(TripleHttp2FrameServerHandler.class));
         
typeDescribers.add(buildTypeDescriberWithPublicMethod(TripleCommandOutBoundHandler.class));
         
typeDescribers.add(buildTypeDescriberWithPublicMethod(TripleTailHandler.class));
         
typeDescribers.add(buildTypeDescriberWithPublicMethod(TripleServerConnectionHandler.class));
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
deleted file mode 100644
index fb8f516367..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcContext;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.rpc.model.PackableMethod;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.ClassLoadUtil;
-import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
-import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
-import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
-import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
-import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
-import org.apache.dubbo.rpc.protocol.tri.stream.ServerStream;
-import org.apache.dubbo.rpc.protocol.tri.stream.StreamUtils;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.util.concurrent.Future;
-
-import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_CREATE_STREAM_TRIPLE;
-import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_PARSE;
-import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
-import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_SERIALIZE_TRIPLE;
-
-public abstract class AbstractServerCall implements ServerCall, 
ServerStream.Listener {
-
-    public static final String REMOTE_ADDRESS_KEY = "tri.remote.address";
-    private static final ErrorTypeAwareLogger LOGGER = 
LoggerFactory.getErrorTypeAwareLogger(AbstractServerCall.class);
-
-    public final Invoker<?> invoker;
-    public final FrameworkModel frameworkModel;
-    public final ServerStream stream;
-    public final Executor executor;
-    public final String methodName;
-    public final String serviceName;
-    public final ServiceDescriptor serviceDescriptor;
-    private final String acceptEncoding;
-    public boolean autoRequestN = true;
-    public Long timeout;
-    ServerCall.Listener listener;
-    private Compressor compressor;
-    private boolean headerSent;
-    private boolean closed;
-    CancellationContext cancellationContext;
-    protected MethodDescriptor methodDescriptor;
-    protected PackableMethod packableMethod;
-    protected Map<String, Object> requestMetadata;
-
-    private Integer exceptionCode = 
CommonConstants.TRI_EXCEPTION_CODE_NOT_EXISTS;
-
-    public Integer getExceptionCode() {
-        return exceptionCode;
-    }
-
-    public void setExceptionCode(Integer exceptionCode) {
-        this.exceptionCode = exceptionCode;
-    }
-
-    private boolean isNeedReturnException = false;
-
-    public boolean isNeedReturnException() {
-        return isNeedReturnException;
-    }
-
-    public void setNeedReturnException(boolean needReturnException) {
-        isNeedReturnException = needReturnException;
-    }
-
-    AbstractServerCall(
-            Invoker<?> invoker,
-            ServerStream stream,
-            FrameworkModel frameworkModel,
-            ServiceDescriptor serviceDescriptor,
-            String acceptEncoding,
-            String serviceName,
-            String methodName,
-            Executor executor) {
-        Objects.requireNonNull(serviceDescriptor, "No service descriptor found 
for " + invoker.getUrl());
-        this.invoker = invoker;
-        // is already serialized in the stream, so we don't need to serialize 
it again.
-        this.executor = executor;
-        this.frameworkModel = frameworkModel;
-        this.serviceDescriptor = serviceDescriptor;
-        this.serviceName = serviceName;
-        this.methodName = methodName;
-        this.stream = stream;
-        this.acceptEncoding = acceptEncoding;
-    }
-
-    // stream listener start
-    @Override
-    public void onHeader(Map<String, Object> requestMetadata) {
-        this.requestMetadata = requestMetadata;
-        if (serviceDescriptor == null) {
-            responseErr(TriRpcStatus.UNIMPLEMENTED.withDescription("Service 
not found:" + serviceName));
-            return;
-        }
-        startCall();
-    }
-
-    protected void startCall() {
-        RpcInvocation invocation = buildInvocation(methodDescriptor);
-        listener = startInternalCall(invocation, methodDescriptor, invoker);
-    }
-
-    @Override
-    public final void request(int numMessages) {
-        stream.request(numMessages);
-    }
-
-    @Override
-    public final void sendMessage(Object message) {
-        if (closed) {
-            throw new IllegalStateException("Stream has already canceled");
-        }
-        // is already in executor
-        doSendMessage(message);
-    }
-
-    private void doSendMessage(Object message) {
-        if (closed) {
-            return;
-        }
-        if (!headerSent) {
-            sendHeader();
-        }
-        final byte[] data;
-        try {
-            data = packableMethod.packResponse(message);
-        } catch (Exception e) {
-            close(
-                    TriRpcStatus.INTERNAL
-                            .withDescription("Serialize response failed")
-                            .withCause(e),
-                    null);
-            LOGGER.error(
-                    PROTOCOL_FAILED_SERIALIZE_TRIPLE,
-                    "",
-                    "",
-                    String.format("Serialize triple response failed, 
service=%s method=%s", serviceName, methodName),
-                    e);
-            return;
-        }
-        if (data == null) {
-            close(TriRpcStatus.INTERNAL.withDescription("Missing response"), 
null);
-            return;
-        }
-        Future<?> future;
-        if (compressor != null) {
-            int compressedFlag = 
Identity.MESSAGE_ENCODING.equals(compressor.getMessageEncoding()) ? 0 : 1;
-            final byte[] compressed = compressor.compress(data);
-            future = stream.sendMessage(compressed, compressedFlag);
-        } else {
-            future = stream.sendMessage(data, 0);
-        }
-        future.addListener(f -> {
-            if (!f.isSuccess()) {
-                cancelDual(TriRpcStatus.CANCELLED
-                        .withDescription("Send message failed")
-                        .withCause(f.cause()));
-            }
-        });
-    }
-
-    @Override
-    public final void onComplete() {
-        if (listener == null) {
-            // It will enter here when there is an error in the header
-            return;
-        }
-        // Both 'onError' and 'onComplete' are termination operators.
-        // The stream will be closed when 'onError' was called, and 
'onComplete' is not allowed to be called again.
-        if (isClosed()) {
-            return;
-        }
-        listener.onComplete();
-    }
-
-    @Override
-    public final void onMessage(byte[] message, boolean isReturnTriException) {
-        ClassLoader tccl = Thread.currentThread().getContextClassLoader();
-        try {
-            Object instance = parseSingleMessage(message);
-            listener.onMessage(instance, message.length);
-        } catch (Exception e) {
-            final TriRpcStatus status =
-                    TriRpcStatus.UNKNOWN.withDescription("Server 
error").withCause(e);
-            close(status, null);
-            LOGGER.error(
-                    PROTOCOL_FAILED_REQUEST,
-                    "",
-                    "",
-                    "Process request failed. service=" + serviceName + " 
method=" + methodName,
-                    e);
-        } finally {
-            ClassLoadUtil.switchContextLoader(tccl);
-        }
-    }
-
-    protected abstract Object parseSingleMessage(byte[] data) throws Exception;
-
-    @Override
-    public final void onCancelByRemote(TriRpcStatus status) {
-        closed = true;
-        if (listener == null) {
-            return;
-        }
-        cancellationContext.cancel(status.cause);
-        listener.onCancel(status);
-    }
-    // stream listener end
-
-    public final boolean isClosed() {
-        return closed;
-    }
-
-    /**
-     * Build the RpcInvocation with metadata and execute headerFilter
-     *
-     * @return RpcInvocation
-     */
-    protected RpcInvocation buildInvocation(MethodDescriptor methodDescriptor) 
{
-        final URL url = invoker.getUrl();
-        RpcInvocation inv = new RpcInvocation(
-                url.getServiceModel(),
-                methodDescriptor.getMethodName(),
-                serviceDescriptor.getInterfaceName(),
-                url.getProtocolServiceKey(),
-                methodDescriptor.getParameterClasses(),
-                new Object[0]);
-        inv.setTargetServiceUniqueName(url.getServiceKey());
-        inv.setReturnTypes(methodDescriptor.getReturnTypes());
-        inv.setObjectAttachments(StreamUtils.toAttachments(requestMetadata));
-        inv.put(REMOTE_ADDRESS_KEY, stream.remoteAddress());
-        // handle timeout
-        String timeout = (String) 
requestMetadata.get(TripleHeaderEnum.TIMEOUT.getHeader());
-        try {
-            if (Objects.nonNull(timeout)) {
-                this.timeout = parseTimeoutToMills(timeout);
-            }
-        } catch (Throwable t) {
-            LOGGER.warn(
-                    PROTOCOL_FAILED_PARSE,
-                    "",
-                    "",
-                    String.format(
-                            "Failed to parse request timeout set from:%s, 
service=%s " + "method=%s",
-                            timeout, serviceDescriptor.getInterfaceName(), 
methodName));
-        }
-        if (null != 
requestMetadata.get(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader())) {
-            inv.put(
-                    TripleHeaderEnum.CONSUMER_APP_NAME_KEY,
-                    
requestMetadata.get(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader()));
-        }
-        return inv;
-    }
-
-    private void sendHeader() {
-        if (closed) {
-            return;
-        }
-        if (headerSent) {
-            throw new IllegalStateException("Header has already sent");
-        }
-        headerSent = true;
-        DefaultHttp2Headers headers = new DefaultHttp2Headers();
-        headers.status(HttpResponseStatus.OK.codeAsText());
-        headers.set(HttpHeaderNames.CONTENT_TYPE, 
TripleConstant.CONTENT_PROTO);
-        if (acceptEncoding != null) {
-            headers.set(HttpHeaderNames.ACCEPT_ENCODING, acceptEncoding);
-        }
-        if (compressor != null) {
-            headers.set(TripleHeaderEnum.GRPC_ENCODING.getHeader(), 
compressor.getMessageEncoding());
-        }
-        if 
(!exceptionCode.equals(CommonConstants.TRI_EXCEPTION_CODE_NOT_EXISTS)) {
-            headers.set(TripleHeaderEnum.TRI_EXCEPTION_CODE.getHeader(), 
String.valueOf(exceptionCode));
-        }
-        // send header failed will reset stream and close request observer 
cause no more data will be sent
-        stream.sendHeader(headers).addListener(f -> {
-            if (!f.isSuccess()) {
-                cancelDual(TriRpcStatus.INTERNAL.withCause(f.cause()));
-            }
-        });
-    }
-
-    private void cancelDual(TriRpcStatus status) {
-        closed = true;
-        listener.onCancel(status);
-        cancellationContext.cancel(status.asException());
-    }
-
-    public void cancelByLocal(Throwable throwable) {
-        if (closed) {
-            return;
-        }
-        closed = true;
-        cancellationContext.cancel(throwable);
-        stream.cancelByLocal(TriRpcStatus.CANCELLED.withCause(throwable));
-    }
-
-    public void setCompression(String compression) {
-        if (headerSent) {
-            throw new IllegalStateException("Can not set compression after 
header sent");
-        }
-        this.compressor = Compressor.getCompressor(frameworkModel, 
compression);
-    }
-
-    public void disableAutoRequestN() {
-        autoRequestN = false;
-    }
-
-    public boolean isAutoRequestN() {
-        return autoRequestN;
-    }
-
-    public void close(TriRpcStatus status, Map<String, Object> attachments) {
-        doClose(status, attachments);
-    }
-
-    private void doClose(TriRpcStatus status, Map<String, Object> attachments) 
{
-        if (closed) {
-            return;
-        }
-        closed = true;
-        stream.complete(status, attachments, isNeedReturnException, 
exceptionCode);
-    }
-
-    protected Long parseTimeoutToMills(String timeoutVal) {
-        if (StringUtils.isEmpty(timeoutVal) || 
StringUtils.isContains(timeoutVal, "null")) {
-            return null;
-        }
-        long value = Long.parseLong(timeoutVal.substring(0, 
timeoutVal.length() - 1));
-        char unit = timeoutVal.charAt(timeoutVal.length() - 1);
-        switch (unit) {
-            case 'n':
-                return TimeUnit.NANOSECONDS.toMillis(value);
-            case 'u':
-                return TimeUnit.MICROSECONDS.toMillis(value);
-            case 'm':
-                return value;
-            case 'S':
-                return TimeUnit.SECONDS.toMillis(value);
-            case 'M':
-                return TimeUnit.MINUTES.toMillis(value);
-            case 'H':
-                return TimeUnit.HOURS.toMillis(value);
-            default:
-                // invalid timeout config
-                return null;
-        }
-    }
-
-    /**
-     * Error in create stream, unsupported config or triple protocol error.
-     *
-     * @param status response status
-     */
-    protected void responseErr(TriRpcStatus status) {
-        if (closed) {
-            return;
-        }
-        closed = true;
-        stream.complete(status, null, false, 
CommonConstants.TRI_EXCEPTION_CODE_NOT_EXISTS);
-        LOGGER.error(
-                PROTOCOL_FAILED_REQUEST,
-                "",
-                "",
-                "Triple request error: service=" + serviceName + " method" + 
methodName,
-                status.asException());
-    }
-
-    protected ServerCall.Listener startInternalCall(
-            RpcInvocation invocation, MethodDescriptor methodDescriptor, 
Invoker<?> invoker) {
-        this.cancellationContext = RpcContext.getCancellationContext();
-        ServerCallToObserverAdapter<Object> responseObserver =
-                new ServerCallToObserverAdapter<>(this, cancellationContext);
-        try {
-            ServerCall.Listener listener;
-            switch (methodDescriptor.getRpcType()) {
-                case UNARY:
-                    listener = new UnaryServerCallListener(
-                            invocation, invoker, responseObserver, 
packableMethod.needWrapper());
-                    request(2);
-                    break;
-                case SERVER_STREAM:
-                    listener = new ServerStreamServerCallListener(invocation, 
invoker, responseObserver);
-                    request(2);
-                    break;
-                case BI_STREAM:
-                case CLIENT_STREAM:
-                    listener = new BiStreamServerCallListener(invocation, 
invoker, responseObserver);
-                    request(1);
-                    break;
-                default:
-                    throw new IllegalStateException("Can not reach here");
-            }
-            return listener;
-        } catch (Exception e) {
-            LOGGER.error(PROTOCOL_FAILED_CREATE_STREAM_TRIPLE, "", "", "Create 
triple stream failed", e);
-            responseErr(TriRpcStatus.INTERNAL
-                    .withDescription("Create stream failed")
-                    .withCause(e));
-        }
-        return null;
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCallListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCallListener.java
deleted file mode 100644
index 7fa838d8d7..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCallListener.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
-import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
-
-import java.net.InetSocketAddress;
-
-import static 
org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY;
-import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_TIMEOUT_SERVER;
-
-public abstract class AbstractServerCallListener implements 
AbstractServerCall.Listener {
-
-    private static final ErrorTypeAwareLogger LOGGER =
-            
LoggerFactory.getErrorTypeAwareLogger(AbstractServerCallListener.class);
-    public final CancellationContext cancellationContext;
-    final RpcInvocation invocation;
-    final Invoker<?> invoker;
-    final ServerCallToObserverAdapter<Object> responseObserver;
-
-    public AbstractServerCallListener(
-            RpcInvocation invocation, Invoker<?> invoker, 
ServerCallToObserverAdapter<Object> responseObserver) {
-        this.invocation = invocation;
-        this.invoker = invoker;
-        this.cancellationContext = responseObserver.cancellationContext;
-        this.responseObserver = responseObserver;
-    }
-
-    public void invoke() {
-        RpcContext.restoreCancellationContext(cancellationContext);
-        InetSocketAddress remoteAddress =
-                (InetSocketAddress) 
invocation.getAttributes().remove(AbstractServerCall.REMOTE_ADDRESS_KEY);
-        RpcContext.getServiceContext().setRemoteAddress(remoteAddress);
-        String remoteApp = (String) 
invocation.getAttributes().remove(TripleHeaderEnum.CONSUMER_APP_NAME_KEY);
-        if (null != remoteApp) {
-            RpcContext.getServiceContext().setRemoteApplicationName(remoteApp);
-            invocation.setAttachmentIfAbsent(REMOTE_APPLICATION_KEY, 
remoteApp);
-        }
-        final long stInMillis = System.currentTimeMillis();
-        try {
-            final Result response = invoker.invoke(invocation);
-            response.whenCompleteWithContext((r, t) -> {
-                
responseObserver.setResponseAttachments(response.getObjectAttachments());
-                if (t != null) {
-                    responseObserver.onError(t);
-                    return;
-                }
-                if (response.hasException()) {
-                    doOnResponseHasException(response.getException());
-                    return;
-                }
-                final long cost = System.currentTimeMillis() - stInMillis;
-                if (responseObserver.isTimeout(cost)) {
-                    LOGGER.error(
-                            PROTOCOL_TIMEOUT_SERVER,
-                            "",
-                            "",
-                            String.format(
-                                    "Invoke timeout at server side, ignored to 
send response. service=%s method=%s cost=%s",
-                                    invocation.getTargetServiceUniqueName(), 
invocation.getMethodName(), cost));
-                    
responseObserver.onCompleted(TriRpcStatus.DEADLINE_EXCEEDED);
-                    return;
-                }
-                onReturn(r.getValue());
-            });
-        } catch (Exception e) {
-            responseObserver.onError(e);
-        } finally {
-            RpcContext.removeCancellationContext();
-            RpcContext.removeContext();
-        }
-    }
-
-    protected void doOnResponseHasException(Throwable t) {
-        responseObserver.onError(t);
-    }
-
-    public abstract void onReturn(Object value);
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
deleted file mode 100644
index e37ac9d006..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
-
-public class BiStreamServerCallListener extends AbstractServerCallListener {
-
-    private StreamObserver<Object> requestObserver;
-
-    public BiStreamServerCallListener(
-            RpcInvocation invocation, Invoker<?> invoker, 
ServerCallToObserverAdapter<Object> responseObserver) {
-        super(invocation, invoker, responseObserver);
-        invocation.setArguments(new Object[] {responseObserver});
-        invoke();
-    }
-
-    @Override
-    public void onReturn(Object value) {
-        this.requestObserver = (StreamObserver<Object>) value;
-    }
-
-    @Override
-    public void onMessage(Object message, int actualContentLength) {
-        if (message instanceof Object[]) {
-            message = ((Object[]) message)[0];
-        }
-        requestObserver.onNext(message);
-        if (responseObserver.isAutoRequestN()) {
-            responseObserver.request(1);
-        }
-    }
-
-    @Override
-    public void onCancel(TriRpcStatus status) {
-        requestObserver.onError(status.asException());
-    }
-
-    @Override
-    public void onComplete() {
-        requestObserver.onCompleted();
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionAbstractServerCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionAbstractServerCall.java
deleted file mode 100644
index 4be61a00df..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionAbstractServerCall.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.rpc.HeaderFilter;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.rpc.model.MethodDescriptor.RpcType;
-import org.apache.dubbo.rpc.model.PackableMethod;
-import org.apache.dubbo.rpc.model.PackableMethodFactory;
-import org.apache.dubbo.rpc.model.ProviderModel;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.ClassLoadUtil;
-import org.apache.dubbo.rpc.protocol.tri.TripleCustomerProtocolWapper;
-import org.apache.dubbo.rpc.protocol.tri.stream.ServerStream;
-import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
-import io.netty.handler.codec.http.HttpHeaderNames;
-
-import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
-import static 
org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_METHOD_FACTORY;
-
-public class ReflectionAbstractServerCall extends AbstractServerCall {
-
-    private static final String PACKABLE_METHOD_CACHE = 
"PACKABLE_METHOD_CACHE";
-    private final List<HeaderFilter> headerFilters;
-    private List<MethodDescriptor> methodDescriptors;
-
-    public ReflectionAbstractServerCall(
-            Invoker<?> invoker,
-            ServerStream serverStream,
-            FrameworkModel frameworkModel,
-            String acceptEncoding,
-            String serviceName,
-            String methodName,
-            List<HeaderFilter> headerFilters,
-            Executor executor) {
-        super(
-                invoker,
-                serverStream,
-                frameworkModel,
-                getServiceDescriptor(invoker.getUrl()),
-                acceptEncoding,
-                serviceName,
-                methodName,
-                executor);
-        this.headerFilters = headerFilters;
-    }
-
-    private static ServiceDescriptor getServiceDescriptor(URL url) {
-        ProviderModel providerModel = (ProviderModel) url.getServiceModel();
-        if (providerModel == null || providerModel.getServiceModel() == null) {
-            return null;
-        }
-        return providerModel.getServiceModel();
-    }
-
-    private boolean isEcho(String methodName) {
-        return CommonConstants.$ECHO.equals(methodName);
-    }
-
-    private boolean isGeneric(String methodName) {
-        return CommonConstants.$INVOKE.equals(methodName) || 
CommonConstants.$INVOKE_ASYNC.equals(methodName);
-    }
-
-    @Override
-    public void startCall() {
-        if (isGeneric(methodName)) {
-            // There should be one and only one
-            methodDescriptor = ServiceDescriptorInternalCache.genericService()
-                    .getMethods(methodName)
-                    .get(0);
-        } else if (isEcho(methodName)) {
-            // There should be one and only one
-            methodDescriptor = ServiceDescriptorInternalCache.echoService()
-                    .getMethods(methodName)
-                    .get(0);
-        } else {
-            methodDescriptors = serviceDescriptor.getMethods(methodName);
-            // try lower-case method
-            if (CollectionUtils.isEmpty(methodDescriptors)) {
-                final String lowerMethod = 
Character.toLowerCase(methodName.charAt(0)) + methodName.substring(1);
-                methodDescriptors = serviceDescriptor.getMethods(lowerMethod);
-            }
-            if (CollectionUtils.isEmpty(methodDescriptors)) {
-                responseErr(TriRpcStatus.UNIMPLEMENTED.withDescription(
-                        "Method : " + methodName + " not found of service:" + 
serviceName));
-                return;
-            }
-            // In most cases there is only one method
-            if (methodDescriptors.size() == 1) {
-                methodDescriptor = methodDescriptors.get(0);
-            }
-            // generated unary method ,use unary type
-            // Response foo(Request)
-            // void foo(Request,StreamObserver<Response>)
-            if (methodDescriptors.size() == 2) {
-                if (methodDescriptors.get(1).getRpcType() == 
RpcType.SERVER_STREAM) {
-                    methodDescriptor = methodDescriptors.get(0);
-                } else if (methodDescriptors.get(0).getRpcType() == 
RpcType.SERVER_STREAM) {
-                    methodDescriptor = methodDescriptors.get(1);
-                }
-            }
-        }
-        if (methodDescriptor != null) {
-            loadPackableMethod(invoker.getUrl());
-        }
-        trySetListener();
-        if (listener == null) {
-            // wrap request , need one message
-            request(1);
-        }
-    }
-
-    private void trySetListener() {
-        if (listener != null) {
-            return;
-        }
-        if (methodDescriptor == null) {
-            return;
-        }
-        if (isClosed()) {
-            return;
-        }
-        RpcInvocation invocation = buildInvocation(methodDescriptor);
-        if (isClosed()) {
-            return;
-        }
-        headerFilters.forEach(f -> f.invoke(invoker, invocation));
-        if (isClosed()) {
-            return;
-        }
-        listener = 
ReflectionAbstractServerCall.this.startInternalCall(invocation, 
methodDescriptor, invoker);
-    }
-
-    @Override
-    protected Object parseSingleMessage(byte[] data) throws Exception {
-        trySetMethodDescriptor(data);
-        trySetListener();
-        if (isClosed()) {
-            return null;
-        }
-        
ClassLoadUtil.switchContextLoader(invoker.getUrl().getServiceModel().getClassLoader());
-        return packableMethod.getRequestUnpack().unpack(data);
-    }
-
-    private void trySetMethodDescriptor(byte[] data) {
-        if (methodDescriptor != null) {
-            return;
-        }
-        final TripleCustomerProtocolWapper.TripleRequestWrapper request;
-        request = 
TripleCustomerProtocolWapper.TripleRequestWrapper.parseFrom(data);
-
-        final String[] paramTypes =
-                request.getArgTypes().toArray(new 
String[request.getArgs().size()]);
-        // wrapper mode the method can overload so maybe list
-        for (MethodDescriptor descriptor : methodDescriptors) {
-            // params type is array
-            if (Arrays.equals(descriptor.getCompatibleParamSignatures(), 
paramTypes)) {
-                methodDescriptor = descriptor;
-                break;
-            }
-        }
-        if (methodDescriptor == null) {
-            ReflectionAbstractServerCall.this.close(
-                    TriRpcStatus.UNIMPLEMENTED.withDescription(
-                            "Method :" + methodName + "[" + 
Arrays.toString(paramTypes) + "] " + "not found of service:"
-                                    + serviceDescriptor.getInterfaceName()),
-                    null);
-            return;
-        }
-        loadPackableMethod(invoker.getUrl());
-    }
-
-    @SuppressWarnings("unchecked")
-    private void loadPackableMethod(URL url) {
-        Map<MethodDescriptor, PackableMethod> cacheMap = 
(Map<MethodDescriptor, PackableMethod>) url.getServiceModel()
-                .getServiceMetadata()
-                .getAttributeMap()
-                .computeIfAbsent(PACKABLE_METHOD_CACHE, (k) -> new 
ConcurrentHashMap<>());
-        packableMethod = cacheMap.computeIfAbsent(methodDescriptor, (md) -> 
frameworkModel
-                .getExtensionLoader(PackableMethodFactory.class)
-                
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel())
-                        .getString(DUBBO_PACKABLE_METHOD_FACTORY, DEFAULT_KEY))
-                .create(methodDescriptor, url, (String) 
requestMetadata.get(HttpHeaderNames.CONTENT_TYPE.toString())));
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
deleted file mode 100644
index e5bdb36513..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.rpc.TriRpcStatus;
-
-import java.util.Map;
-
-/**
- * ServerCall manipulates server details of a RPC call. Request messages are 
acquired by {@link
- * Listener}. Backpressure is supported by {@link #request(int)}.Response 
messages are sent by
- * {@link ServerCall#sendMessage(Object)}.
- */
-public interface ServerCall {
-
-    /**
-     * A listener to receive request messages.
-     */
-    interface Listener {
-
-        /**
-         * Callback when a request message is received.
-         *
-         * @param message message received
-         * @param actualContentLength actual content length from body
-         */
-        void onMessage(Object message, int actualContentLength);
-
-        /**
-         * @param status when the call is canceled.
-         */
-        void onCancel(TriRpcStatus status);
-
-        /**
-         * Request completed.
-         */
-        void onComplete();
-    }
-
-    /**
-     * Send message to client
-     *
-     * @param message message to send
-     */
-    void sendMessage(Object message);
-
-    /**
-     * Request more request data from the client.
-     *
-     * @param numMessages max number of messages
-     */
-    void request(int numMessages);
-
-    /**
-     * Close the call.
-     *
-     * @param status        status of the call to send to the client
-     * @param responseAttrs response attachments
-     */
-    void close(TriRpcStatus status, Map<String, Object> responseAttrs);
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
deleted file mode 100644
index 8e5ea19caf..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
-
-public class ServerStreamServerCallListener extends AbstractServerCallListener 
{
-
-    public ServerStreamServerCallListener(
-            RpcInvocation invocation, Invoker<?> invoker, 
ServerCallToObserverAdapter<Object> responseObserver) {
-        super(invocation, invoker, responseObserver);
-    }
-
-    @Override
-    public void onReturn(Object value) {}
-
-    @Override
-    public void onMessage(Object message, int actualContentLength) {
-        if (message instanceof Object[]) {
-            message = ((Object[]) message)[0];
-        }
-        invocation.setArguments(new Object[] {message, responseObserver});
-    }
-
-    @Override
-    public void onCancel(TriRpcStatus status) {
-        responseObserver.onError(status.asException());
-    }
-
-    @Override
-    public void onComplete() {
-        invoke();
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/StubAbstractServerCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/StubAbstractServerCall.java
deleted file mode 100644
index 139c42c415..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/StubAbstractServerCall.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
-import org.apache.dubbo.rpc.model.StubMethodDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.stream.ServerStream;
-import org.apache.dubbo.rpc.stub.StubSuppliers;
-
-import java.util.concurrent.Executor;
-
-public class StubAbstractServerCall extends AbstractServerCall {
-
-    public StubAbstractServerCall(
-            Invoker<?> invoker,
-            ServerStream serverStream,
-            FrameworkModel frameworkModel,
-            String acceptEncoding,
-            String serviceName,
-            String methodName,
-            Executor executor) {
-        super(
-                invoker,
-                serverStream,
-                frameworkModel,
-                getServiceDescriptor(invoker.getUrl(), serviceName),
-                acceptEncoding,
-                serviceName,
-                methodName,
-                executor);
-        this.methodDescriptor = 
serviceDescriptor.getMethods(methodName).get(0);
-        this.packableMethod = (StubMethodDescriptor) methodDescriptor;
-    }
-
-    private static ServiceDescriptor getServiceDescriptor(URL url, String 
serviceName) {
-        ServiceDescriptor serviceDescriptor;
-        if (url.getServiceModel() != null) {
-            serviceDescriptor = url.getServiceModel().getServiceModel();
-        } else {
-            serviceDescriptor = 
StubSuppliers.getServiceDescriptor(serviceName);
-        }
-        return serviceDescriptor;
-    }
-
-    @Override
-    protected Object parseSingleMessage(byte[] data) throws Exception {
-        return packableMethod.parseRequest(data);
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
deleted file mode 100644
index 9e6da9158f..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.remoting.Constants;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
-
-public class UnaryServerCallListener extends AbstractServerCallListener {
-
-    private final boolean needWrapper;
-
-    public UnaryServerCallListener(
-            RpcInvocation invocation,
-            Invoker<?> invoker,
-            ServerCallToObserverAdapter<Object> responseObserver,
-            boolean needWrapper) {
-        super(invocation, invoker, responseObserver);
-        this.needWrapper = needWrapper;
-    }
-
-    @Override
-    public void onReturn(Object value) {
-        responseObserver.onNext(value);
-        responseObserver.onCompleted();
-    }
-
-    @Override
-    public void onMessage(Object message, int actualContentLength) {
-        if (message instanceof Object[]) {
-            invocation.setArguments((Object[]) message);
-        } else {
-            invocation.setArguments(new Object[] {message});
-        }
-        invocation.put(Constants.CONTENT_LENGTH_KEY, actualContentLength);
-    }
-
-    @Override
-    public void onCancel(TriRpcStatus status) {
-        // ignored
-    }
-
-    @Override
-    protected void doOnResponseHasException(Throwable t) {
-        if (needWrapper) {
-            onReturnException((Exception) t);
-        } else {
-            super.doOnResponseHasException(t);
-        }
-    }
-
-    private void onReturnException(Exception value) {
-        TriRpcStatus status = TriRpcStatus.getStatus(value);
-        int exceptionCode = status.code.code;
-        if (exceptionCode == TriRpcStatus.UNKNOWN.code.code) {
-            exceptionCode = RpcException.BIZ_EXCEPTION;
-        }
-        responseObserver.setExceptionCode(exceptionCode);
-        responseObserver.setNeedReturnException(true);
-        onReturn(value);
-    }
-
-    @Override
-    public void onComplete() {
-        invoke();
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerCallListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerCallListener.java
index 689d9d03e3..6b365fc59c 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerCallListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerCallListener.java
@@ -26,7 +26,6 @@ import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
-import org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCall;
 
 import java.net.InetSocketAddress;
 
@@ -57,7 +56,7 @@ public abstract class AbstractServerCallListener implements 
ServerCallListener {
                     ((Http2CancelableStreamObserver<Object>) 
responseObserver).getCancellationContext());
         }
         InetSocketAddress remoteAddress =
-                (InetSocketAddress) 
invocation.getAttributes().remove(AbstractServerCall.REMOTE_ADDRESS_KEY);
+                (InetSocketAddress) 
invocation.getAttributes().remove("tri.remote.address");
         RpcContext.getServiceContext().setRemoteAddress(remoteAddress);
         String remoteApp = (String) 
invocation.getAttributes().remove(TripleHeaderEnum.CONSUMER_APP_NAME_KEY);
         if (null != remoteApp) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
index de216cee56..f64f70cc7c 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
@@ -34,7 +34,6 @@ import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
 import org.apache.dubbo.rpc.model.ServiceDescriptor;
 import org.apache.dubbo.rpc.protocol.tri.TripleCustomerProtocolWapper;
-import org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCall;
 import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
 import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
 import org.apache.dubbo.rpc.protocol.tri.h12.HttpMessageListener;
@@ -54,7 +53,8 @@ import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAI
 public class GrpcHttp2ServerTransportListener extends 
GenericHttp2ServerTransportListener
         implements Http2TransportListener {
 
-    private static final ErrorTypeAwareLogger LOGGER = 
LoggerFactory.getErrorTypeAwareLogger(AbstractServerCall.class);
+    private static final ErrorTypeAwareLogger LOGGER =
+            
LoggerFactory.getErrorTypeAwareLogger(GrpcHttp2ServerTransportListener.class);
 
     public GrpcHttp2ServerTransportListener(H2StreamChannel h2StreamChannel, 
URL url, FrameworkModel frameworkModel) {
         super(h2StreamChannel, url, frameworkModel);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
deleted file mode 100644
index f0d84f1939..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.observer;
-
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.rpc.CancellationContext;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.ServerStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCall;
-
-import java.util.Map;
-
-public class ServerCallToObserverAdapter<T> extends 
CancelableStreamObserver<T> implements ServerStreamObserver<T> {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(CancelableStreamObserver.class);
-    public final CancellationContext cancellationContext;
-    private final AbstractServerCall call;
-    private Map<String, Object> attachments;
-    private boolean terminated = false;
-
-    private boolean isNeedReturnException = false;
-
-    private Integer exceptionCode = 
CommonConstants.TRI_EXCEPTION_CODE_NOT_EXISTS;
-
-    public Integer getExceptionCode() {
-        return exceptionCode;
-    }
-
-    public void setExceptionCode(Integer exceptionCode) {
-        this.exceptionCode = exceptionCode;
-    }
-
-    public boolean isNeedReturnException() {
-        return isNeedReturnException;
-    }
-
-    public void setNeedReturnException(boolean needReturnException) {
-        isNeedReturnException = needReturnException;
-    }
-
-    public ServerCallToObserverAdapter(AbstractServerCall call, 
CancellationContext cancellationContext) {
-        this.call = call;
-        this.cancellationContext = cancellationContext;
-    }
-
-    public boolean isAutoRequestN() {
-        return call.isAutoRequestN();
-    }
-
-    public boolean isTerminated() {
-        return terminated;
-    }
-
-    private void setTerminated() {
-        this.terminated = true;
-    }
-
-    @Override
-    public void onNext(Object data) {
-        if (isTerminated()) {
-            throw new IllegalStateException("Stream observer has been 
terminated, no more data is allowed");
-        }
-        call.setExceptionCode(exceptionCode);
-        call.setNeedReturnException(isNeedReturnException);
-        call.sendMessage(data);
-    }
-
-    @Override
-    public void onError(Throwable throwable) {
-        final TriRpcStatus status = TriRpcStatus.getStatus(throwable);
-        onCompleted(status);
-    }
-
-    public void onCompleted(TriRpcStatus status) {
-        if (isTerminated()) {
-            return;
-        }
-        call.setExceptionCode(exceptionCode);
-        call.setNeedReturnException(isNeedReturnException);
-        call.close(status, attachments);
-        setTerminated();
-    }
-
-    @Override
-    public void onCompleted() {
-        onCompleted(TriRpcStatus.OK);
-    }
-
-    public void setResponseAttachments(Map<String, Object> attachments) {
-        this.attachments = attachments;
-    }
-
-    @Override
-    public void setCompression(String compression) {
-        call.setCompression(compression);
-    }
-
-    public void cancel(Throwable throwable) {
-        if (terminated) {
-            return;
-        }
-        setTerminated();
-        call.cancelByLocal(throwable);
-    }
-
-    public boolean isTimeout(long cost) {
-        return call.timeout != null && call.timeout < cost;
-    }
-
-    @Override
-    public void disableAutoFlowControl() {
-        call.disableAutoRequestN();
-    }
-
-    @Override
-    public void request(int count) {
-        call.request(count);
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ServerStream.java
deleted file mode 100644
index 88f45284d4..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ServerStream.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.stream;
-
-import org.apache.dubbo.rpc.TriRpcStatus;
-
-import java.util.Map;
-
-import io.netty.util.concurrent.Future;
-
-/**
- * ServerStream is used to send response to client and receive requests from 
client. {@link
- * Listener} is used to receive requests from client.
- */
-public interface ServerStream extends Stream {
-
-    interface Listener extends Stream.Listener {
-
-        /**
-         * Callback when receive headers
-         *
-         * @param headers headers received from remote peer
-         */
-        void onHeader(Map<String, Object> headers);
-
-        /**
-         * Callback when no more data from client side
-         */
-        void onComplete();
-    }
-
-    /**
-     * Complete the stream, send response to client
-     *
-     * @param status      response status
-     * @param attachments response attachments
-     * @return a future that indicates the completion of send trailers
-     */
-    Future<?> complete(
-            TriRpcStatus status, Map<String, Object> attachments, boolean 
isNeedReturnException, int exceptionCode);
-
-    /**
-     * Send message to client
-     *
-     * @param message      raw message
-     * @param compressFlag whether to compress the message
-     * @return a future that indicates the completion of send message
-     */
-    Future<?> sendMessage(byte[] message, int compressFlag);
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
deleted file mode 100644
index 0aaca48491..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.stream;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.rpc.HeaderFilter;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.PathResolver;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.protocol.tri.ExceptionUtils;
-import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
-import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
-import org.apache.dubbo.rpc.protocol.tri.TripleProtocol;
-import org.apache.dubbo.rpc.protocol.tri.call.ReflectionAbstractServerCall;
-import org.apache.dubbo.rpc.protocol.tri.call.StubAbstractServerCall;
-import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
-import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
-import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
-import org.apache.dubbo.rpc.protocol.tri.command.TextDataQueueCommand;
-import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
-import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
-import org.apache.dubbo.rpc.protocol.tri.frame.Deframer;
-import org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder;
-import org.apache.dubbo.rpc.protocol.tri.transport.AbstractH2TransportListener;
-import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
-import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.Executor;
-
-import com.google.protobuf.Any;
-import com.google.rpc.DebugInfo;
-import com.google.rpc.Status;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpUtil;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.Http2Error;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2StreamChannel;
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.Future;
-
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
-
-public class TripleServerStream extends AbstractStream implements ServerStream 
{
-
-    private static final ErrorTypeAwareLogger LOGGER = 
LoggerFactory.getErrorTypeAwareLogger(TripleServerStream.class);
-    public final ServerTransportObserver transportObserver = new 
ServerTransportObserver();
-    private final TripleWriteQueue writeQueue;
-    private final PathResolver pathResolver;
-    private final List<HeaderFilter> filters;
-    private final String acceptEncoding;
-    private boolean headerSent;
-    private boolean trailersSent;
-    private volatile boolean reset;
-    private ServerStream.Listener listener;
-    private final InetSocketAddress remoteAddress;
-    private Deframer deframer;
-    private boolean rst = false;
-    private final Http2StreamChannel http2StreamChannel;
-    private final TripleStreamChannelFuture tripleStreamChannelFuture;
-
-    public TripleServerStream(
-            Http2StreamChannel channel,
-            FrameworkModel frameworkModel,
-            Executor executor,
-            PathResolver pathResolver,
-            String acceptEncoding,
-            List<HeaderFilter> filters,
-            TripleWriteQueue writeQueue) {
-        super(executor, frameworkModel);
-        this.pathResolver = pathResolver;
-        this.acceptEncoding = acceptEncoding;
-        this.filters = filters;
-        this.writeQueue = writeQueue;
-        this.remoteAddress = (InetSocketAddress) channel.remoteAddress();
-        this.http2StreamChannel = channel;
-        this.tripleStreamChannelFuture = new 
TripleStreamChannelFuture(channel);
-    }
-
-    @Override
-    public SocketAddress remoteAddress() {
-        return remoteAddress;
-    }
-
-    @Override
-    public void request(int n) {
-        deframer.request(n);
-    }
-
-    public ChannelFuture reset(Http2Error cause) {
-        ChannelFuture checkResult = preCheck();
-        if (!checkResult.isSuccess()) {
-            return checkResult;
-        }
-        this.rst = true;
-        return 
writeQueue.enqueue(CancelQueueCommand.createCommand(tripleStreamChannelFuture, 
cause));
-    }
-
-    @Override
-    public ChannelFuture sendHeader(Http2Headers headers) {
-        if (reset) {
-            return http2StreamChannel.newFailedFuture(
-                    new IllegalStateException("Stream already reset, no more 
headers allowed"));
-        }
-        if (headerSent) {
-            return http2StreamChannel.newFailedFuture(new 
IllegalStateException("Header already sent"));
-        }
-        if (trailersSent) {
-            return http2StreamChannel.newFailedFuture(new 
IllegalStateException("Trailers already sent"));
-        }
-        ChannelFuture checkResult = preCheck();
-        if (!checkResult.isSuccess()) {
-            return checkResult;
-        }
-        headerSent = true;
-        return writeQueue
-                
.enqueue(HeaderQueueCommand.createHeaders(tripleStreamChannelFuture, headers, 
false))
-                .addListener(f -> {
-                    if (!f.isSuccess()) {
-                        reset(Http2Error.INTERNAL_ERROR);
-                    }
-                });
-    }
-
-    @Override
-    public Future<?> cancelByLocal(TriRpcStatus status) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(String.format("Cancel stream:%s by local: %s", 
http2StreamChannel, status));
-        }
-        return reset(Http2Error.CANCEL);
-    }
-
-    @Override
-    public ChannelFuture complete(
-            TriRpcStatus status, Map<String, Object> attachments, boolean 
isNeedReturnException, int exceptionCode) {
-        Http2Headers trailers =
-                getTrailers(status, attachments, isNeedReturnException, 
CommonConstants.TRI_EXCEPTION_CODE_NOT_EXISTS);
-        return sendTrailers(trailers);
-    }
-
-    private ChannelFuture sendTrailers(Http2Headers trailers) {
-        if (reset) {
-            return http2StreamChannel.newFailedFuture(
-                    new IllegalStateException("Stream already reset, no more 
trailers allowed"));
-        }
-        if (trailersSent) {
-            return http2StreamChannel.newFailedFuture(new 
IllegalStateException("Trailers already sent"));
-        }
-        ChannelFuture checkResult = preCheck();
-        if (!checkResult.isSuccess()) {
-            return checkResult;
-        }
-        headerSent = true;
-        trailersSent = true;
-        return writeQueue
-                
.enqueue(HeaderQueueCommand.createHeaders(tripleStreamChannelFuture, trailers, 
true))
-                .addListener(f -> {
-                    if (!f.isSuccess()) {
-                        reset(Http2Error.INTERNAL_ERROR);
-                    }
-                });
-    }
-
-    private Http2Headers getTrailers(
-            TriRpcStatus rpcStatus, Map<String, Object> attachments, boolean 
isNeedReturnException, int exceptionCode) {
-        DefaultHttp2Headers headers = new DefaultHttp2Headers();
-        if (!headerSent) {
-            headers.status(HttpResponseStatus.OK.codeAsText());
-            headers.set(HttpHeaderNames.CONTENT_TYPE, 
TripleConstant.CONTENT_PROTO);
-        }
-        StreamUtils.convertAttachment(headers, attachments, 
TripleProtocol.CONVERT_NO_LOWER_HEADER);
-        headers.set(TripleHeaderEnum.STATUS_KEY.getHeader(), 
String.valueOf(rpcStatus.code.code));
-        if (rpcStatus.isOk()) {
-            return headers;
-        }
-        String grpcMessage = getGrpcMessage(rpcStatus);
-        grpcMessage = 
TriRpcStatus.encodeMessage(TriRpcStatus.limitSizeTo1KB(grpcMessage));
-        headers.set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), grpcMessage);
-        if (!getGrpcStatusDetailEnabled()) {
-            return headers;
-        }
-        Status.Builder builder =
-                
Status.newBuilder().setCode(rpcStatus.code.code).setMessage(grpcMessage);
-        Throwable throwable = rpcStatus.cause;
-        if (throwable == null) {
-            Status status = builder.build();
-            headers.set(
-                    TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
-                    StreamUtils.encodeBase64ASCII(status.toByteArray()));
-            return headers;
-        }
-        DebugInfo debugInfo = DebugInfo.newBuilder()
-                
.addAllStackEntries(ExceptionUtils.getStackFrameList(throwable, 6))
-                // can not use now
-                // .setDetail(throwable.getMessage())
-                .build();
-        builder.addDetails(Any.pack(debugInfo));
-        Status status = builder.build();
-        headers.set(
-                TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(), 
StreamUtils.encodeBase64ASCII(status.toByteArray()));
-        return headers;
-    }
-
-    private String getGrpcMessage(TriRpcStatus status) {
-        if (StringUtils.isNotEmpty(status.description)) {
-            return status.description;
-        }
-        return 
Optional.ofNullable(status.cause).map(Throwable::getMessage).orElse("unknown");
-    }
-
-    @Override
-    public ChannelFuture sendMessage(byte[] message, int compressFlag) {
-        if (reset) {
-            return http2StreamChannel.newFailedFuture(
-                    new IllegalStateException("Stream already reset, no more 
body allowed"));
-        }
-        if (!headerSent) {
-            return http2StreamChannel.newFailedFuture(
-                    new IllegalStateException("Headers did not sent before 
send body"));
-        }
-        if (trailersSent) {
-            return http2StreamChannel.newFailedFuture(
-                    new IllegalStateException("Trailers already sent, no more 
body allowed"));
-        }
-        ChannelFuture checkResult = preCheck();
-        if (!checkResult.isSuccess()) {
-            return checkResult;
-        }
-        return 
writeQueue.enqueue(DataQueueCommand.create(tripleStreamChannelFuture, message, 
false, compressFlag));
-    }
-
-    /**
-     * Error before create server stream, http plain text will be returned
-     *
-     * @param code   code of error
-     * @param status status of error
-     */
-    private void responsePlainTextError(int code, TriRpcStatus status) {
-        ChannelFuture checkResult = preCheck();
-        if (!checkResult.isSuccess()) {
-            return;
-        }
-        Http2Headers headers = new DefaultHttp2Headers(true)
-                .status(String.valueOf(code))
-                .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), 
status.code.code)
-                .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), 
status.description)
-                .set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), 
TripleConstant.TEXT_PLAIN_UTF8);
-        
writeQueue.enqueue(HeaderQueueCommand.createHeaders(tripleStreamChannelFuture, 
headers, false));
-        
writeQueue.enqueue(TextDataQueueCommand.createCommand(tripleStreamChannelFuture,
 status.description, true));
-    }
-
-    /**
-     * Error in create stream, unsupported config or triple protocol error. 
There is no return value
-     * because stream will be reset if send trailers failed.
-     *
-     * @param status status of error
-     */
-    private void responseErr(TriRpcStatus status) {
-        Http2Headers trailers = new DefaultHttp2Headers()
-                .status(OK.codeAsText())
-                .set(HttpHeaderNames.CONTENT_TYPE, 
TripleConstant.CONTENT_PROTO)
-                .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), 
status.code.code)
-                .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), 
status.toEncodedMessage());
-        sendTrailers(trailers);
-    }
-
-    private Invoker<?> getInvoker(Http2Headers headers, String serviceName) {
-        final String version = 
headers.contains(TripleHeaderEnum.SERVICE_VERSION.getHeader())
-                ? 
headers.get(TripleHeaderEnum.SERVICE_VERSION.getHeader()).toString()
-                : null;
-        final String group = 
headers.contains(TripleHeaderEnum.SERVICE_GROUP.getHeader())
-                ? 
headers.get(TripleHeaderEnum.SERVICE_GROUP.getHeader()).toString()
-                : null;
-        final String key = URL.buildKey(serviceName, group, version);
-        Invoker<?> invoker = pathResolver.resolve(key);
-        if (invoker == null && TripleProtocol.RESOLVE_FALLBACK_TO_DEFAULT) {
-            invoker = pathResolver.resolve(URL.buildKey(serviceName, group, 
"1.0.0"));
-        }
-        if (invoker == null && TripleProtocol.RESOLVE_FALLBACK_TO_DEFAULT) {
-            invoker = pathResolver.resolve(serviceName);
-        }
-        return invoker;
-    }
-
-    private ChannelFuture preCheck() {
-        if (!http2StreamChannel.isActive()) {
-            return http2StreamChannel.newFailedFuture(new IOException("stream 
channel is closed"));
-        }
-        if (rst) {
-            return http2StreamChannel.newFailedFuture(new IOException("stream 
channel has reset"));
-        }
-        return http2StreamChannel.newSucceededFuture();
-    }
-
-    public class ServerTransportObserver extends AbstractH2TransportListener 
implements H2TransportListener {
-
-        /**
-         * must starts from application/grpc
-         */
-        private boolean supportContentType(String contentType) {
-            if (contentType == null) {
-                return false;
-            }
-            return contentType.startsWith(TripleConstant.APPLICATION_GRPC);
-        }
-
-        @Override
-        public void onHeader(Http2Headers headers, boolean endStream) {
-            executor.execute(() -> processHeader(headers, endStream));
-        }
-
-        private void processHeader(Http2Headers headers, boolean endStream) {
-            if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) {
-                responsePlainTextError(
-                        HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
-                        TriRpcStatus.INTERNAL.withDescription(
-                                String.format("Method '%s' is not supported", 
headers.method())));
-                return;
-            }
-
-            if (headers.path() == null) {
-                responsePlainTextError(
-                        HttpResponseStatus.NOT_FOUND.code(),
-                        
TriRpcStatus.fromCode(TriRpcStatus.Code.UNIMPLEMENTED.code)
-                                .withDescription("Expected path but is 
missing"));
-                return;
-            }
-
-            final String path = headers.path().toString();
-            if (path.charAt(0) != '/') {
-                responsePlainTextError(
-                        HttpResponseStatus.NOT_FOUND.code(),
-                        
TriRpcStatus.fromCode(TriRpcStatus.Code.UNIMPLEMENTED.code)
-                                .withDescription(String.format("Expected path 
to start with /: %s", path)));
-                return;
-            }
-
-            final CharSequence contentType = 
HttpUtil.getMimeType(headers.get(HttpHeaderNames.CONTENT_TYPE));
-            if (contentType == null) {
-                responsePlainTextError(
-                        HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
-                        TriRpcStatus.fromCode(TriRpcStatus.Code.INTERNAL.code)
-                                .withDescription("Content-Type is missing from 
the request"));
-                return;
-            }
-
-            final String contentString = contentType.toString();
-            if (!supportContentType(contentString)) {
-                responsePlainTextError(
-                        HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
-                        TriRpcStatus.fromCode(TriRpcStatus.Code.INTERNAL.code)
-                                .withDescription(String.format("Content-Type 
'%s' is not supported", contentString)));
-                return;
-            }
-
-            String[] parts = path.split("/");
-            if (parts.length != 3) {
-                responseErr(TriRpcStatus.UNIMPLEMENTED.withDescription("Bad 
path format:" + path));
-                return;
-            }
-            String serviceName = parts[1];
-            String originalMethodName = parts[2];
-
-            Invoker<?> invoker = getInvoker(headers, serviceName);
-            if (invoker == null) {
-                
responseErr(TriRpcStatus.UNIMPLEMENTED.withDescription("Service not found:" + 
serviceName));
-                return;
-            }
-
-            if (endStream) {
-                return;
-            }
-
-            DeCompressor deCompressor = DeCompressor.NONE;
-            CharSequence messageEncoding = 
headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
-            if (null != messageEncoding) {
-                String compressorStr = messageEncoding.toString();
-                if (!Identity.MESSAGE_ENCODING.equals(compressorStr)) {
-                    DeCompressor compressor = 
DeCompressor.getCompressor(frameworkModel, compressorStr);
-                    if (null == compressor) {
-                        
responseErr(TriRpcStatus.fromCode(TriRpcStatus.Code.UNIMPLEMENTED.code)
-                                .withDescription(String.format("Grpc-encoding 
'%s' is not supported", compressorStr)));
-                        return;
-                    }
-                    deCompressor = compressor;
-                }
-            }
-
-            Map<String, Object> requestMetadata = headersToMap(
-                    headers, () -> 
Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()))
-                            .map(CharSequence::toString)
-                            .orElse(null));
-            boolean hasStub = pathResolver.hasNativeStub(path);
-            if (hasStub) {
-                listener = new StubAbstractServerCall(
-                        invoker,
-                        TripleServerStream.this,
-                        frameworkModel,
-                        acceptEncoding,
-                        serviceName,
-                        originalMethodName,
-                        executor);
-            } else {
-                listener = new ReflectionAbstractServerCall(
-                        invoker,
-                        TripleServerStream.this,
-                        frameworkModel,
-                        acceptEncoding,
-                        serviceName,
-                        originalMethodName,
-                        filters,
-                        executor);
-            }
-            // must before onHeader
-            deframer = new TriDecoder(deCompressor, new 
ServerDecoderListener(listener));
-            listener.onHeader(requestMetadata);
-        }
-
-        @Override
-        public void onData(ByteBuf data, boolean endStream) {
-            try {
-                executor.execute(() -> doOnData(data, endStream));
-            } catch (Throwable t) {
-                // Tasks will be rejected when the thread pool is closed or 
full,
-                // ByteBuf needs to be released to avoid out of heap memory 
leakage.
-                // For example, ThreadLessExecutor will be shutdown when 
request timeout {@link AsyncRpcResult}
-                ReferenceCountUtil.release(data);
-                LOGGER.error(PROTOCOL_FAILED_REQUEST, "", "", "submit onData 
task failed", t);
-            }
-        }
-
-        private void doOnData(ByteBuf data, boolean endStream) {
-            if (deframer == null) {
-                return;
-            }
-            deframer.deframe(data);
-            if (endStream) {
-                deframer.close();
-            }
-        }
-
-        @Override
-        public void cancelByRemote(long errorCode) {
-            TripleServerStream.this.reset = true;
-            if (!trailersSent) {
-                // send rst if stream not closed
-                reset(Http2Error.valueOf(errorCode));
-            }
-            if (listener == null) {
-                return;
-            }
-            executor.execute(() -> listener.onCancelByRemote(
-                    TriRpcStatus.CANCELLED.withDescription("Canceled by client 
,errorCode=" + errorCode)));
-        }
-    }
-
-    private static class ServerDecoderListener implements TriDecoder.Listener {
-
-        private final ServerStream.Listener listener;
-
-        public ServerDecoderListener(ServerStream.Listener listener) {
-            this.listener = listener;
-        }
-
-        @Override
-        public void onRawMessage(byte[] data) {
-            listener.onMessage(data, false);
-        }
-
-        @Override
-        public void close() {
-            listener.onComplete();
-        }
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
deleted file mode 100644
index daebefc339..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.transport;
-
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.rpc.HeaderFilter;
-import org.apache.dubbo.rpc.PathResolver;
-import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.executor.ExecutorSupport;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
-import org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream;
-
-import java.util.List;
-import java.util.concurrent.Executor;
-
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http2.Http2DataFrame;
-import io.netty.handler.codec.http2.Http2HeadersFrame;
-import io.netty.handler.codec.http2.Http2ResetFrame;
-import io.netty.handler.codec.http2.Http2StreamChannel;
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.ReferenceCounted;
-
-import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_RESPONSE;
-
-public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
-
-    private static final ErrorTypeAwareLogger LOGGER =
-            
LoggerFactory.getErrorTypeAwareLogger(TripleHttp2FrameServerHandler.class);
-    private final PathResolver pathResolver;
-    private final ExecutorSupport executorSupport;
-    private final String acceptEncoding;
-    private final TripleServerStream tripleServerStream;
-
-    public TripleHttp2FrameServerHandler(
-            FrameworkModel frameworkModel,
-            ExecutorSupport executorSupport,
-            List<HeaderFilter> filters,
-            Http2StreamChannel channel,
-            TripleWriteQueue writeQueue) {
-        this.executorSupport = executorSupport;
-        this.acceptEncoding = String.join(
-                ",", 
frameworkModel.getExtensionLoader(DeCompressor.class).getSupportedExtensions());
-        this.pathResolver =
-                
frameworkModel.getExtensionLoader(PathResolver.class).getDefaultExtension();
-        // The executor will be assigned in onHeadersRead method
-        tripleServerStream = new TripleServerStream(
-                channel, frameworkModel, null, pathResolver, acceptEncoding, 
filters, writeQueue);
-    }
-
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-        if (msg instanceof Http2HeadersFrame) {
-            onHeadersRead(ctx, (Http2HeadersFrame) msg);
-        } else if (msg instanceof Http2DataFrame) {
-            onDataRead(ctx, (Http2DataFrame) msg);
-        } else if (msg instanceof ReferenceCounted) {
-            // ignored
-            ReferenceCountUtil.release(msg);
-        }
-    }
-
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
-        if (evt instanceof Http2ResetFrame) {
-            onResetRead(ctx, (Http2ResetFrame) evt);
-        } else {
-            super.userEventTriggered(ctx, evt);
-        }
-    }
-
-    public void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame frame) {
-        LOGGER.warn(
-                PROTOCOL_FAILED_RESPONSE, "", "", "Triple Server received 
remote reset errorCode=" + frame.errorCode());
-        if (tripleServerStream != null) {
-            
tripleServerStream.transportObserver.cancelByRemote(frame.errorCode());
-        }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
-        if (LOGGER.isWarnEnabled()) {
-            LOGGER.warn(PROTOCOL_FAILED_RESPONSE, "", "", "Exception in 
processing triple message", cause);
-        }
-        TriRpcStatus status = TriRpcStatus.getStatus(cause, "Provider's 
error:\n" + cause.getMessage());
-        tripleServerStream.cancelByLocal(status);
-    }
-
-    public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) 
throws Exception {
-        tripleServerStream.transportObserver.onData(msg.content(), 
msg.isEndStream());
-    }
-
-    public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame 
msg) throws Exception {
-        Executor executor = executorSupport.getExecutor(msg.headers());
-        tripleServerStream.setExecutor(executor);
-        tripleServerStream.transportObserver.onHeader(msg.headers(), 
msg.isEndStream());
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionServerCallTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionServerCallTest.java
deleted file mode 100644
index c598305ce6..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionServerCallTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.rpc.model.ProviderModel;
-import org.apache.dubbo.rpc.model.ReflectionMethodDescriptor;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
-import org.apache.dubbo.rpc.model.ServiceMetadata;
-import org.apache.dubbo.rpc.protocol.tri.DescriptorService;
-import org.apache.dubbo.rpc.protocol.tri.HelloReply;
-import org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream;
-
-import java.lang.reflect.Method;
-import java.util.Collections;
-
-import io.netty.util.concurrent.ImmediateEventExecutor;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.when;
-
-class ReflectionServerCallTest {
-
-    @Test
-    void doStartCall() throws NoSuchMethodException {
-        Invoker<?> invoker = Mockito.mock(Invoker.class);
-        TripleServerStream serverStream = 
Mockito.mock(TripleServerStream.class);
-        ProviderModel providerModel = Mockito.mock(ProviderModel.class);
-        ServiceMetadata serviceMetadata = new ServiceMetadata();
-        Method method = DescriptorService.class.getMethod("sayHello", 
HelloReply.class);
-        MethodDescriptor methodDescriptor = new 
ReflectionMethodDescriptor(method);
-        URL url = Mockito.mock(URL.class);
-        when(invoker.getUrl()).thenReturn(url);
-        when(url.getServiceModel()).thenReturn(providerModel);
-        when(providerModel.getServiceMetadata()).thenReturn(serviceMetadata);
-
-        String service = "testService";
-        String methodName = "method";
-        try {
-            ReflectionAbstractServerCall call = new 
ReflectionAbstractServerCall(
-                    invoker,
-                    serverStream,
-                    new FrameworkModel(),
-                    "",
-                    service,
-                    methodName,
-                    Collections.emptyList(),
-                    ImmediateEventExecutor.INSTANCE);
-            fail();
-        } catch (Exception e) {
-            // pass
-        }
-
-        ServiceDescriptor serviceDescriptor = 
Mockito.mock(ServiceDescriptor.class);
-        
when(serviceDescriptor.getMethods(anyString())).thenReturn(Collections.singletonList(methodDescriptor));
-
-        when(providerModel.getServiceModel()).thenReturn(serviceDescriptor);
-
-        ReflectionAbstractServerCall call2 = new ReflectionAbstractServerCall(
-                invoker,
-                serverStream,
-                new FrameworkModel(),
-                "",
-                service,
-                methodName,
-                Collections.emptyList(),
-                ImmediateEventExecutor.INSTANCE);
-        call2.onHeader(Collections.emptyMap());
-        call2.onMessage(new byte[0], false);
-        call2.onComplete();
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/StubServerCallTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/StubServerCallTest.java
deleted file mode 100644
index b500e91bbc..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/StubServerCallTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.MethodDescriptor.RpcType;
-import org.apache.dubbo.rpc.model.ProviderModel;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
-import org.apache.dubbo.rpc.model.StubMethodDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream;
-
-import java.util.Collections;
-
-import io.netty.util.concurrent.ImmediateEventExecutor;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.when;
-
-class StubServerCallTest {
-
-    @Test
-    void doStartCall() throws Exception {
-        Invoker<?> invoker = Mockito.mock(Invoker.class);
-        TripleServerStream tripleServerStream = 
Mockito.mock(TripleServerStream.class);
-        ProviderModel providerModel = Mockito.mock(ProviderModel.class);
-        ServiceDescriptor serviceDescriptor = 
Mockito.mock(ServiceDescriptor.class);
-        StubMethodDescriptor methodDescriptor = 
Mockito.mock(StubMethodDescriptor.class);
-        URL url = Mockito.mock(URL.class);
-        when(invoker.getUrl()).thenReturn(url);
-        when(url.getServiceModel()).thenReturn(providerModel);
-        when(providerModel.getServiceModel()).thenReturn(serviceDescriptor);
-        
when(serviceDescriptor.getMethods(anyString())).thenReturn(Collections.singletonList(methodDescriptor));
-        when(methodDescriptor.getRpcType()).thenReturn(RpcType.UNARY);
-        
when(methodDescriptor.parseRequest(any(byte[].class))).thenReturn("test");
-        String service = "testService";
-        String method = "method";
-        StubAbstractServerCall call = new StubAbstractServerCall(
-                invoker,
-                tripleServerStream,
-                new FrameworkModel(),
-                "",
-                service,
-                method,
-                ImmediateEventExecutor.INSTANCE);
-        call.onHeader(Collections.emptyMap());
-        call.onMessage(new byte[0], false);
-        call.onComplete();
-    }
-}

Reply via email to