This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new b2ff764329 optimize: select channel handles based on protocol versions 
(#6634)
b2ff764329 is described below

commit b2ff764329571ed699ed1ee0405616faff7202ff
Author: funkye <jian...@apache.org>
AuthorDate: Mon Jul 1 15:34:43 2024 +0800

    optimize: select channel handles based on protocol versions (#6634)
---
 changes/en-us/2.x.md                               |  5 +-
 changes/zh-cn/2.x.md                               |  4 ++
 .../rpc/netty/AbstractNettyRemotingClient.java     |  6 +-
 .../rpc/netty/AbstractNettyRemotingServer.java     | 29 ++------
 .../core/rpc/netty/CompatibleProtocolEncoder.java  | 79 ----------------------
 ...tocolDecoder.java => MultiProtocolDecoder.java} | 46 +++++++++----
 .../seata/core/rpc/netty/NettyClientBootstrap.java | 10 +--
 .../seata/core/rpc/netty/NettyServerBootstrap.java | 11 ++-
 .../seata/core/rpc/netty/ProtocolDecoder.java      |  3 +-
 .../seata/core/rpc/netty/v0/ProtocolDecoderV0.java | 41 +++++++++--
 .../seata/core/rpc/netty/v0/ProtocolEncoderV0.java | 23 +++++--
 .../seata/core/rpc/netty/v1/ProtocolDecoderV1.java | 45 ++++++++++--
 .../seata/core/rpc/netty/v1/ProtocolEncoderV1.java | 20 +++++-
 .../core/rpc/netty/v1/ClientChannelHandler.java    |  4 +-
 .../seata/core/rpc/netty/v1/ProtocolV1Client.java  | 13 ++--
 .../rpc/netty/v1/ProtocolV1SerializerTest.java     |  7 +-
 .../seata/core/rpc/netty/v1/ProtocolV1Server.java  | 59 +++++++---------
 .../core/rpc/netty/v1/ServerChannelHandler.java    |  6 +-
 18 files changed, 213 insertions(+), 198 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 54c2a2c6eb..7b6b41522b 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -3,7 +3,7 @@ Add changes here for all PR submitted to the 2.x branch.
 <!-- Please add the `changes` to the following 
location(feature/bugfix/optimize/test) based on the type of PR -->
 
 ### feature:
-
+- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] multi-version 
seata protocol support
 
 ### bugfix:
 - [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async 
annotation not working in ClusterWatcherManager
@@ -16,8 +16,10 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the 
task thread pool for committing and rollbacking statuses
 - [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : 
load SeataSerializer by version
 - [[#6209](https://github.com/apache/incubator-seata/pull/6209)] Eliminate 
RpcMessage and Encoder/Decoder dependencies
+- [[#6634](https://github.com/apache/incubator-seata/pull/6634)] select 
channel handles based on protocol versions
 - [[#6523](https://github.com/apache/incubator-seata/pull/6523)] upgrade 
alibaba/druid version to 1.2.20
 
+
 ### refactor:
 - [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize: 
send async response
 
@@ -36,6 +38,7 @@ Thanks to these contributors for their code commits. Please 
report an unintended
 - [liuqiufeng](https://github.com/liuqiufeng)
 - [God-Gan](https://github.com/God-Gan)
 - [Bughue](https://github.com/Bughue)
+- [funky-eyes](https://github.com/funky-eyes)
 - [tanyaofei](https://github.com/tanyaofei)
 
 Also, we receive many valuable issues, questions and advices from our 
community. Thanks for you all.
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 3f59e14835..686908c99c 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -3,6 +3,7 @@
 <!-- 请根据PR的类型添加 `变更记录` 到以下对应位置(feature/bugfix/optimize/test) 下 -->
 
 ### feature:
+- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] 
支持seata私有协议多版本兼容
 
 ### bugfix:
 - [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix 
@Async注解ClusterWatcherManager中不生效的问题
@@ -15,8 +16,10 @@
 - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 
和 rollbacking 状态的任务线程池
 - [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化
 - [[#6209](https://github.com/apache/incubator-seata/pull/6209)] 解开 RpcMessage 
和 Encoder/Decoder 的互相依赖
+- [[#6634](https://github.com/apache/incubator-seata/pull/6634)] 
根据协议版本指定channel handle
 - [[#6523](https://github.com/apache/incubator-seata/pull/6523)] 升级 
alibaba/druid 的版本到1.2.20
 
+
 ### refactor:
 - [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应
 
@@ -34,6 +37,7 @@
 - [liuqiufeng](https://github.com/liuqiufeng)
 - [God-Gan](https://github.com/God-Gan)
 - [Bughue](https://github.com/Bughue)
+- [funky-eyes](https://github.com/funky-eyes)
 - [tanyaofei](https://github.com/tanyaofei)
 
 
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
index 2901eb8d3f..248e8f48f6 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
@@ -409,10 +409,8 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
 
         @Override
         public void channelRead(final ChannelHandlerContext ctx, Object msg) 
throws Exception {
-            RpcMessage rpcMessage = null;
-            if (msg instanceof ProtocolRpcMessage) {
-                rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
-                processMessage(ctx, rpcMessage);
+            if (msg instanceof RpcMessage) {
+                processMessage(ctx, (RpcMessage)msg);
             } else {
                 LOGGER.error("rpcMessage type error");
             }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
index 9be9e79c3b..4b79f20d95 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
@@ -69,7 +69,7 @@ public abstract class AbstractNettyRemotingServer extends 
AbstractNettyRemoting
         if (channel == null) {
             throw new RuntimeException("rm client is not connected. dbkey:" + 
resourceId + ",clientId:" + clientId);
         }
-        RpcMessage rpcMessage = buildRequestMessage(channel, msg, 
ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
+        RpcMessage rpcMessage = buildRequestMessage(msg, 
ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
         return super.sendSync(channel, rpcMessage, 
NettyServerConfig.getRpcRequestTimeout());
     }
 
@@ -78,7 +78,7 @@ public abstract class AbstractNettyRemotingServer extends 
AbstractNettyRemoting
         if (channel == null) {
             throw new RuntimeException("client is not connected");
         }
-        RpcMessage rpcMessage = buildRequestMessage(channel, msg, 
ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
+        RpcMessage rpcMessage = buildRequestMessage(msg, 
ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
         return super.sendSync(channel, rpcMessage, 
NettyServerConfig.getRpcRequestTimeout());
     }
 
@@ -87,7 +87,7 @@ public abstract class AbstractNettyRemotingServer extends 
AbstractNettyRemoting
         if (channel == null) {
             throw new RuntimeException("client is not connected");
         }
-        RpcMessage rpcMessage = buildRequestMessage(channel, msg, 
ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
+        RpcMessage rpcMessage = buildRequestMessage(msg, 
ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
         super.sendAsync(channel, rpcMessage);
     }
 
@@ -98,7 +98,7 @@ public abstract class AbstractNettyRemotingServer extends 
AbstractNettyRemoting
             clientChannel = ChannelManager.getSameClientChannel(channel);
         }
         if (clientChannel != null) {
-            RpcMessage rpcMsg = buildResponseMessage(channel, rpcMessage, msg, 
msg instanceof HeartbeatMessage
+            RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg 
instanceof HeartbeatMessage
                 ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE
                 : ProtocolConstants.MSGTYPE_RESPONSE);
             super.sendAsync(clientChannel, rpcMsg);
@@ -108,21 +108,6 @@ public abstract class AbstractNettyRemotingServer extends 
AbstractNettyRemoting
     }
 
 
-    private RpcMessage buildResponseMessage(Channel channel, RpcMessage 
fromRpcMessage, Object msg, byte messageType) {
-        RpcMessage rpcMessage = super.buildResponseMessage(fromRpcMessage, 
msg, messageType);
-        RpcContext rpcContext = 
ChannelManager.getContextFromIdentified(channel);
-        rpcMessage.setOtherSideVersion(rpcContext.getVersion());
-        return rpcMessage;
-    }
-
-    protected RpcMessage buildRequestMessage(Channel channel, Object msg, byte 
messageType) {
-        RpcMessage rpcMessage = super.buildRequestMessage(msg, messageType);
-        RpcContext rpcContext = 
ChannelManager.getContextFromIdentified(channel);
-        rpcMessage.setOtherSideVersion(rpcContext.getVersion());
-        return rpcMessage;
-    }
-
-
     @Override
     public void registerProcessor(int messageType, RemotingProcessor 
processor, ExecutorService executor) {
         Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, 
executor);
@@ -179,10 +164,8 @@ public abstract class AbstractNettyRemotingServer extends 
AbstractNettyRemoting
          */
         @Override
         public void channelRead(final ChannelHandlerContext ctx, Object msg) 
throws Exception {
-            RpcMessage rpcMessage = null;
-            if (msg instanceof ProtocolRpcMessage) {
-                rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
-                processMessage(ctx, rpcMessage);
+            if (msg instanceof RpcMessage) {
+                processMessage(ctx, (RpcMessage)msg);
             } else {
                 LOGGER.error("rpcMessage type error");
             }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java
deleted file mode 100644
index e588b92b8e..0000000000
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.seata.core.rpc.netty;
-
-import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToByteEncoder;
-import org.apache.seata.common.util.StringUtils;
-import org.apache.seata.core.protocol.ProtocolConstants;
-import org.apache.seata.core.protocol.RpcMessage;
-import org.apache.seata.core.protocol.Version;
-import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0;
-import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * Compatible Protocol Encoder
- * <p>
- * <li>Full Length: include all data </li>
- * <li>Head Length: include head data from magic code to head map. </li>
- * <li>Body Length: Full Length - Head Length</li>
- * </p>
- */
-public class CompatibleProtocolEncoder extends MessageToByteEncoder {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(CompatibleProtocolEncoder.class);
-
-    private static Map<Byte, ProtocolEncoder> protocolEncoderMap;
-
-    public CompatibleProtocolEncoder() {
-        super();
-        protocolEncoderMap = ImmutableMap.<Byte, ProtocolEncoder>builder()
-                .put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0())
-                .put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1())
-                .build();
-    }
-
-    @Override
-    public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
-        try {
-            if (msg instanceof RpcMessage) {
-                RpcMessage rpcMessage = (RpcMessage) msg;
-                String sdkVersion = rpcMessage.getOtherSideVersion();
-                if (StringUtils.isBlank(sdkVersion)) {
-                    sdkVersion = Version.getCurrent();
-                }
-                byte protocolVersion = Version.calcProtocolVersion(sdkVersion);
-                ProtocolEncoder encoder = 
protocolEncoderMap.get(protocolVersion);
-                if (encoder == null) {
-                    throw new UnsupportedOperationException("Unsupported 
protocolVersion: " + protocolVersion);
-                }
-
-                encoder.encode(rpcMessage, out);
-            } else {
-                throw new UnsupportedOperationException("Not support this 
class:" + msg.getClass());
-            }
-        } catch (Throwable e) {
-            LOGGER.error("Encode request error!", e);
-        }
-    }
-}
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java
 b/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java
similarity index 75%
rename from 
core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java
rename to 
core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java
index d066984c23..9bd9550369 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java
@@ -18,12 +18,15 @@ package org.apache.seata.core.rpc.netty;
 
 import com.google.common.collect.ImmutableMap;
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.seata.core.exception.DecodeException;
 import org.apache.seata.core.protocol.ProtocolConstants;
 import org.apache.seata.core.rpc.netty.v0.ProtocolDecoderV0;
+import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0;
 import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;
+import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,17 +55,26 @@ import java.util.Map;
  * <li>Body Length: Full Length - Head Length</li>
  * </p>
  */
-public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder {
+public class MultiProtocolDecoder extends LengthFieldBasedFrameDecoder {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(CompatibleProtocolDecoder.class);
-    private static Map<Byte, ProtocolDecoder> protocolDecoderMap;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiProtocolDecoder.class);
+    private final Map<Byte, ProtocolDecoder> protocolDecoderMap;
 
-    public CompatibleProtocolDecoder() {
+    private final Map<Byte, ProtocolEncoder> protocolEncoderMap;
+    
+    private final ChannelHandler[] channelHandlers;
+
+    public MultiProtocolDecoder(ChannelHandler... channelHandlers) {
         // default is 8M
-        this(ProtocolConstants.MAX_FRAME_LENGTH);
+        this(ProtocolConstants.MAX_FRAME_LENGTH, channelHandlers);
     }
 
-    public CompatibleProtocolDecoder(int maxFrameLength) {
+    public MultiProtocolDecoder() {
+        // default is 8M
+        this(ProtocolConstants.MAX_FRAME_LENGTH, null);
+    }
+
+    public MultiProtocolDecoder(int maxFrameLength, ChannelHandler[] 
channelHandlers) {
         /*
         int maxFrameLength,      
         int lengthFieldOffset,  magic code is 2B, and version is 1B, and then 
FullLength. so value is 3
@@ -71,10 +83,13 @@ public class CompatibleProtocolDecoder extends 
LengthFieldBasedFrameDecoder {
         int initialBytesToStrip we will check magic code and version self, so 
do not strip any bytes. so values is 0
         */
         super(maxFrameLength, 3, 4, -7, 0);
-        protocolDecoderMap = ImmutableMap.<Byte, ProtocolDecoder>builder()
-                .put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0())
-                .put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1())
-                .build();
+        this.protocolDecoderMap =
+            ImmutableMap.<Byte, 
ProtocolDecoder>builder().put(ProtocolConstants.VERSION_0, new 
ProtocolDecoderV0())
+                .put(ProtocolConstants.VERSION_1, new 
ProtocolDecoderV1()).build();
+        this.protocolEncoderMap =
+            ImmutableMap.<Byte, 
ProtocolEncoder>builder().put(ProtocolConstants.VERSION_0, new 
ProtocolEncoderV0())
+                .put(ProtocolConstants.VERSION_1, new 
ProtocolEncoderV1()).build();
+        this.channelHandlers = channelHandlers;
     }
 
     @Override
@@ -93,9 +108,10 @@ public class CompatibleProtocolDecoder extends 
LengthFieldBasedFrameDecoder {
 
             if (decoded instanceof ByteBuf) {
                 frame = (ByteBuf) decoded;
+                ProtocolDecoder decoder = protocolDecoderMap.get(version);
+                ProtocolEncoder encoder = protocolEncoderMap.get(version);
                 try {
-                    ProtocolDecoder decoder = protocolDecoderMap.get(version);
-                    if (decoder == null) {
+                    if (decoder == null || encoder == null) {
                         throw new UnsupportedOperationException("Unsupported 
version: " + version);
                     }
                     return decoder.decodeFrame(frame);
@@ -103,6 +119,12 @@ public class CompatibleProtocolDecoder extends 
LengthFieldBasedFrameDecoder {
                     if (version != ProtocolConstants.VERSION_0) {
                         frame.release();
                     }
+                    ctx.pipeline().addLast((ChannelHandler)decoder);
+                    ctx.pipeline().addLast((ChannelHandler)encoder);
+                    if (channelHandlers != null) {
+                        ctx.pipeline().addLast(channelHandlers);
+                    }
+                    ctx.pipeline().remove(this);
                 }
             }
         } catch (Exception exx) {
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java
index 4867f86bcf..4aaafc0acb 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java
@@ -35,6 +35,8 @@ import io.netty.util.internal.PlatformDependent;
 import org.apache.seata.common.exception.FrameworkException;
 import org.apache.seata.common.thread.NamedThreadFactory;
 import org.apache.seata.core.rpc.RemotingBootstrap;
+import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;
+import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -128,12 +130,12 @@ public class NettyClientBootstrap implements 
RemotingBootstrap {
                 @Override
                 public void initChannel(SocketChannel ch) {
                     ChannelPipeline pipeline = ch.pipeline();
-                    pipeline.addLast(
-                        new 
IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
+                    pipeline
+                        .addLast(new 
IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                             nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                             nettyClientConfig.getChannelMaxAllIdleSeconds()))
-                        .addLast(new CompatibleProtocolDecoder())
-                        .addLast(new CompatibleProtocolEncoder());
+                        .addLast(new ProtocolDecoderV1())
+                        .addLast(new ProtocolEncoderV1());
                     if (channelHandlers != null) {
                         addChannelPipelineLast(ch, channelHandlers);
                     }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
index b847b2a96d..c7b2aa57c2 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
@@ -158,13 +158,10 @@ public class NettyServerBootstrap implements 
RemotingBootstrap {
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) {
-                    ch.pipeline().addLast(new 
IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
-                        .addLast(new CompatibleProtocolDecoder())
-                        .addLast(new CompatibleProtocolEncoder());
-                    if (channelHandlers != null) {
-                        addChannelPipelineLast(ch, channelHandlers);
-                    }
-
+                    MultiProtocolDecoder multiProtocolDecoder = new 
MultiProtocolDecoder(channelHandlers);
+                    ch.pipeline()
+                        .addLast(new 
IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
+                        .addLast(multiProtocolDecoder);
                 }
             });
 
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java
index 42a7c75c04..d28506fd84 100644
--- a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java
@@ -17,6 +17,7 @@
 package org.apache.seata.core.rpc.netty;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.seata.core.protocol.RpcMessage;
 
 /**
  * the protocol decoder
@@ -24,6 +25,6 @@ import io.netty.buffer.ByteBuf;
  **/
 public interface ProtocolDecoder {
 
-    ProtocolRpcMessage decodeFrame(ByteBuf in);
+    RpcMessage decodeFrame(ByteBuf in);
 
 }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java
index 42e112a2f6..d14ce91cea 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java
@@ -17,9 +17,13 @@
 package org.apache.seata.core.rpc.netty.v0;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.seata.core.exception.DecodeException;
 import org.apache.seata.core.protocol.HeartbeatMessage;
 
 import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.protocol.RpcMessage;
 import org.apache.seata.core.rpc.netty.ProtocolDecoder;
 import org.apache.seata.core.serializer.Serializer;
 import org.apache.seata.core.serializer.SerializerServiceLoader;
@@ -53,13 +57,23 @@ import org.slf4j.LoggerFactory;
  *
  * @see ProtocolEncoderV0
  */
-public class ProtocolDecoderV0 implements ProtocolDecoder {
+public class ProtocolDecoderV0 extends LengthFieldBasedFrameDecoder implements 
ProtocolDecoder {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolDecoderV0.class);
 
+    public ProtocolDecoderV0() {
+        /*
+        int maxFrameLength,
+        int lengthFieldOffset,  magic code is 2B, and version is 1B, and then 
FullLength. so value is 3
+        int lengthFieldLength,  FullLength is int(4B). so values is 4
+        int lengthAdjustment,   FullLength include all data and read 7 bytes 
before, so the left length is (FullLength-7). so values is -7
+        int initialBytesToStrip we will check magic code and version self, so 
do not strip any bytes. so values is 0
+        */
+        super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4, -7, 0);
+    }
 
     @Override
-    public ProtocolRpcMessageV0 decodeFrame(ByteBuf in) {
+    public RpcMessage decodeFrame(ByteBuf in) {
         ProtocolRpcMessageV0 rpcMessage = new ProtocolRpcMessageV0();
         if (in.readableBytes() < ProtocolConstantsV0.HEAD_LENGTH) {
             throw new IllegalArgumentException("Nothing to decode.");
@@ -93,7 +107,7 @@ public class ProtocolDecoderV0 implements ProtocolDecoder {
                 rpcMessage.setBody(HeartbeatMessage.PONG);
             }
 
-            return rpcMessage;
+            return rpcMessage.protocolMsg2RpcMsg();
         }
 
         if (bodyLength > 0 && in.readableBytes() < bodyLength) {
@@ -125,8 +139,27 @@ public class ProtocolDecoderV0 implements ProtocolDecoder {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Receive:" + rpcMessage.getBody() + ", messageId:" + 
msgId);
         }
-        return rpcMessage;
+        return rpcMessage.protocolMsg2RpcMsg();
     }
 
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws 
Exception {
+        Object decoded;
+        try {
+            decoded = super.decode(ctx, in);
+            if (decoded instanceof ByteBuf) {
+                ByteBuf frame = (ByteBuf)decoded;
+                try {
+                    return decodeFrame(frame);
+                } finally {
+                    frame.release();
+                }
+            }
+        } catch (Exception exx) {
+            LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
+            throw new DecodeException(exx);
+        }
+        return decoded;
+    }
 
 }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java
index 3fc447b281..f217a84329 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java
@@ -17,6 +17,8 @@
 package org.apache.seata.core.rpc.netty.v0;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
 import org.apache.seata.core.protocol.HeartbeatMessage;
 import org.apache.seata.core.protocol.MessageTypeAware;
 import org.apache.seata.core.protocol.ProtocolConstants;
@@ -54,7 +56,7 @@ import org.slf4j.LoggerFactory;
  *
  * @see ProtocolDecoderV0
  */
-public class ProtocolEncoderV0 implements ProtocolEncoder {
+public class ProtocolEncoderV0 extends MessageToByteEncoder implements 
ProtocolEncoder {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolEncoderV0.class);
 
@@ -67,9 +69,9 @@ public class ProtocolEncoderV0 implements ProtocolEncoder {
 
             out.writeShort(ProtocolConstantsV0.MAGIC);
             int flag = (msg.isAsync() ? ProtocolConstantsV0.FLAG_ASYNC : 0)
-                    | (msg.isHeartbeat() ? ProtocolConstantsV0.FLAG_HEARTBEAT 
: 0)
-                    | (msg.isRequest() ? ProtocolConstantsV0.FLAG_REQUEST : 0)
-                    | (msg.isSeataCodec() ? 
ProtocolConstantsV0.FLAG_SEATA_CODEC : 0);
+                | (msg.isHeartbeat() ? ProtocolConstantsV0.FLAG_HEARTBEAT : 0)
+                | (msg.isRequest() ? ProtocolConstantsV0.FLAG_REQUEST : 0)
+                | (msg.isSeataCodec() ? ProtocolConstantsV0.FLAG_SEATA_CODEC : 
0);
 
             out.writeShort((short) flag);
 
@@ -103,4 +105,17 @@ public class ProtocolEncoderV0 implements ProtocolEncoder {
             LOGGER.error("Encode request error!", e);
         }
     }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) 
throws Exception {
+        try {
+            if (msg instanceof RpcMessage) {
+                encode((RpcMessage)msg, out);
+            } else {
+                throw new UnsupportedOperationException("Not support this 
class:" + msg.getClass());
+            }
+        } catch (Throwable e) {
+            LOGGER.error("Encode request error!", e);
+        }
+    }
 }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java
index 9ca4977944..ce48b3ce8c 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java
@@ -20,12 +20,15 @@ import java.util.List;
 import java.util.Map;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.seata.core.compressor.Compressor;
 import org.apache.seata.core.compressor.CompressorFactory;
+import org.apache.seata.core.exception.DecodeException;
 import org.apache.seata.core.protocol.HeartbeatMessage;
 import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.protocol.RpcMessage;
 import org.apache.seata.core.rpc.netty.ProtocolDecoder;
-import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
 import org.apache.seata.core.serializer.Serializer;
 import org.apache.seata.core.serializer.SerializerServiceLoader;
 import org.apache.seata.core.serializer.SerializerType;
@@ -61,23 +64,32 @@ import org.slf4j.LoggerFactory;
  * @see ProtocolEncoderV1
  * @since 0.7.0
  */
-public class ProtocolDecoderV1 implements ProtocolDecoder {
+public class ProtocolDecoderV1 extends LengthFieldBasedFrameDecoder implements 
ProtocolDecoder {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolDecoderV1.class);
     private final List<SerializerType> supportDeSerializerTypes;
 
     public ProtocolDecoderV1() {
+        /*
+        int maxFrameLength,
+        int lengthFieldOffset,  magic code is 2B, and version is 1B, and then 
FullLength. so value is 3
+        int lengthFieldLength,  FullLength is int(4B). so values is 4
+        int lengthAdjustment,   FullLength include all data and read 7 bytes 
before, so the left length is (FullLength-7). so values is -7
+        int initialBytesToStrip we will check magic code and version self, so 
do not strip any bytes. so values is 0
+        */
+        super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4, -7, 0);
         supportDeSerializerTypes = 
SerializerServiceLoader.getSupportedSerializers();
         if (supportDeSerializerTypes.isEmpty()) {
             throw new IllegalArgumentException("No serializer found");
-        }    }
+        }
+    }
 
     @Override
-    public ProtocolRpcMessage decodeFrame(ByteBuf frame) {
+    public RpcMessage decodeFrame(ByteBuf frame) {
         byte b0 = frame.readByte();
         byte b1 = frame.readByte();
         if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
-                || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
+            || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
             throw new IllegalArgumentException("Unknown magic code: " + b0 + 
", " + b1);
         }
 
@@ -125,6 +137,27 @@ public class ProtocolDecoderV1 implements ProtocolDecoder {
             }
         }
 
-        return rpcMessage;
+        return rpcMessage.protocolMsg2RpcMsg();
     }
+
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws 
Exception {
+        Object decoded;
+        try {
+            decoded = super.decode(ctx, in);
+            if (decoded instanceof ByteBuf) {
+                ByteBuf frame = (ByteBuf)decoded;
+                try {
+                    return decodeFrame(frame);
+                } finally {
+                    frame.release();
+                }
+            }
+        } catch (Exception exx) {
+            LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
+            throw new DecodeException(exx);
+        }
+        return decoded;
+    }
+
 }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java
index 14cbcdb55d..dd01b948db 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java
@@ -17,6 +17,8 @@
 package org.apache.seata.core.rpc.netty.v1;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
 import org.apache.seata.core.rpc.netty.ProtocolEncoder;
 import org.apache.seata.core.serializer.Serializer;
 import org.apache.seata.core.compressor.Compressor;
@@ -57,7 +59,7 @@ import java.util.Map;
  * @see ProtocolDecoderV1
  * @since 0.7.0
  */
-public class ProtocolEncoderV1 implements ProtocolEncoder {
+public class ProtocolEncoderV1 extends MessageToByteEncoder implements 
ProtocolEncoder {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolEncoderV1.class);
 
@@ -91,7 +93,7 @@ public class ProtocolEncoderV1 implements ProtocolEncoder {
 
             byte[] bodyBytes = null;
             if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
-                    && messageType != 
ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
+                && messageType != 
ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
                 // heartbeat has no body
                 Serializer serializer = 
SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), 
ProtocolConstants.VERSION_1);
                 bodyBytes = serializer.serialize(rpcMessage.getBody());
@@ -119,4 +121,18 @@ public class ProtocolEncoderV1 implements ProtocolEncoder {
             throw e;
         }
     }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) 
throws Exception {
+        try {
+            if (msg instanceof RpcMessage) {
+                this.encode((RpcMessage)msg, out);
+            } else {
+                throw new UnsupportedOperationException("Not support this 
class:" + msg.getClass());
+            }
+        } catch (Throwable e) {
+            LOGGER.error("Encode request error!", e);
+        }
+    }
+
 }
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java
 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java
index e35c124e30..14709e5f06 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java
@@ -50,8 +50,8 @@ public class ClientChannelHandler extends 
ChannelInboundHandlerAdapter {
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-        if (msg instanceof ProtocolRpcMessage) {
-            RpcMessage rpcMessage = ((ProtocolRpcMessage) 
msg).protocolMsg2RpcMsg();
+        if (msg instanceof RpcMessage) {
+            RpcMessage rpcMessage = (RpcMessage)msg;
             int msgId = rpcMessage.getId();
             DefaultPromise future = (DefaultPromise) 
client.futureMap.remove(msgId);
             if (future != null) {
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java
index 3f52ed63c5..a1fb25e1e3 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java
@@ -45,8 +45,6 @@ import org.apache.seata.core.protocol.ProtocolConstants;
 import org.apache.seata.core.protocol.RpcMessage;
 import org.apache.seata.core.protocol.transaction.BranchCommitRequest;
 import org.apache.seata.core.serializer.SerializerType;
-import org.apache.seata.core.rpc.netty.CompatibleProtocolDecoder;
-import org.apache.seata.core.rpc.netty.CompatibleProtocolEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,8 +80,9 @@ public class ProtocolV1Client {
             @Override
             protected void initChannel(Channel channel) throws Exception {
                 ChannelPipeline pipeline = channel.pipeline();
-                pipeline.addLast(new CompatibleProtocolEncoder());
-                pipeline.addLast(new CompatibleProtocolDecoder(8 * 1024 * 
1024));
+                pipeline
+                    .addLast(new ProtocolDecoderV1())
+                    .addLast(new ProtocolEncoderV1());
                 pipeline.addLast(new 
ClientChannelHandler(ProtocolV1Client.this));
             }
         });
@@ -95,13 +94,13 @@ public class ProtocolV1Client {
         } else {
             Throwable cause = channelFuture.cause();
             throw new RuntimeException("Failed to connect " + host + ":" + 
port +
-                    (cause != null ? ". Cause by: " + cause.getMessage() : 
"."));
+                (cause != null ? ". Cause by: " + cause.getMessage() : "."));
         }
     }
 
     private EventLoopGroup createWorkerGroup() {
         NamedThreadFactory threadName =
-                new NamedThreadFactory("CLI-WORKER", false);
+            new NamedThreadFactory("CLI-WORKER", false);
         return new NioEventLoopGroup(10, threadName);
     }
 
@@ -158,7 +157,7 @@ public class ProtocolV1Client {
         final AtomicLong cnt = new AtomicLong(0);
         // no queue
         final ThreadPoolExecutor service1 = new ThreadPoolExecutor(threads, 
threads, 0L, TimeUnit.MILLISECONDS,
-                new SynchronousQueue<Runnable>(), new 
NamedThreadFactory("client-", false));
+            new SynchronousQueue<Runnable>(), new 
NamedThreadFactory("client-", false));
         for (int i = 0; i < threads; i++) {
             service1.execute(() -> {
                 while (true) {
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java
 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java
index 5ee0df7dee..ac75db20c5 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java
@@ -27,8 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.seata.common.thread.NamedThreadFactory;
 import org.apache.seata.core.model.BranchType;
+import org.apache.seata.core.protocol.RpcMessage;
 import org.apache.seata.core.protocol.transaction.BranchCommitRequest;
-import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
@@ -80,7 +80,7 @@ public class ProtocolV1SerializerTest {
                     while (tag.getAndIncrement() < runTimes) {
                         try {
                             Future future = client.sendRpc(head, body);
-                            ProtocolRpcMessage resp = (ProtocolRpcMessage) 
future.get(10, TimeUnit.SECONDS);
+                            RpcMessage resp = (RpcMessage)future.get(10, 
TimeUnit.SECONDS);
                             if (resp != null) {
                                 success.incrementAndGet();
                             }
@@ -93,9 +93,10 @@ public class ProtocolV1SerializerTest {
                 });
             }
 
-            cnt.await();
+            cnt.await(10,TimeUnit.SECONDS);
             LOGGER.info("success {}/{}", success.get(), runTimes);
             Assertions.assertEquals(success.get(), runTimes);
+            service1.shutdown();
         } catch (InterruptedException e) {
             LOGGER.error("Thread interrupted", e);
         } finally {
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java
index 20a2fdbcc5..4c7565eedb 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java
@@ -35,12 +35,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import org.apache.seata.common.thread.NamedThreadFactory;
-import org.apache.seata.common.thread.NamedThreadFactory;
-import org.apache.seata.core.rpc.netty.CompatibleProtocolDecoder;
-import org.apache.seata.core.rpc.netty.CompatibleProtocolEncoder;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
+import org.apache.seata.core.rpc.netty.MultiProtocolDecoder;
 
 /**
  */
@@ -58,37 +53,31 @@ public class ProtocolV1Server {
         workerGroup = createWorkerGroup();
 
         serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
-                .channel(NioServerSocketChannel.class)
-                .option(ChannelOption.RCVBUF_ALLOCATOR, 
AdaptiveRecvByteBufAllocator.DEFAULT)
-                .option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT)
-                .childOption(ChannelOption.SO_KEEPALIVE, true)
-                .childOption(ChannelOption.TCP_NODELAY, true)
-                .childOption(ChannelOption.SO_RCVBUF, 8192 * 128)
-                .childOption(ChannelOption.SO_SNDBUF, 8192 * 128)
-                .handler(new LoggingHandler(LogLevel.DEBUG))
-                .childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT)
-                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new 
WriteBufferWaterMark(
-                        8192, 31768))
-                .childHandler(new ChannelInitializer() {
-                    @Override
-                    protected void initChannel(Channel channel) throws 
Exception {
-                        ChannelPipeline pipeline = channel.pipeline();
-                        pipeline.addLast(new CompatibleProtocolDecoder(8 * 
1024 * 1024));
-                        pipeline.addLast(new CompatibleProtocolEncoder());
-                        pipeline.addLast(new ServerChannelHandler());
-                    }
-                });
+            .channel(NioServerSocketChannel.class)
+            .option(ChannelOption.RCVBUF_ALLOCATOR, 
AdaptiveRecvByteBufAllocator.DEFAULT)
+            .option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT)
+            .childOption(ChannelOption.SO_KEEPALIVE, true)
+            .childOption(ChannelOption.TCP_NODELAY, true)
+            .childOption(ChannelOption.SO_RCVBUF, 8192 * 128)
+            .childOption(ChannelOption.SO_SNDBUF, 8192 * 128)
+            .handler(new LoggingHandler(LogLevel.DEBUG))
+            .childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT)
+            .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new 
WriteBufferWaterMark(
+                8192, 31768))
+            .childHandler(new ChannelInitializer() {
+                @Override
+                protected void initChannel(Channel channel) throws Exception {
+                    ChannelPipeline pipeline = channel.pipeline();
+                    pipeline.addLast(new MultiProtocolDecoder(new 
ServerChannelHandler()));
+                }
+            });
 
         String host = "0.0.0.0";
 
         ChannelFuture future = serverBootstrap.bind(new 
InetSocketAddress(host, port));
-        ChannelFuture channelFuture = future.addListener(new 
ChannelFutureListener() {
-
-            @Override
-            public void operationComplete(ChannelFuture future) throws 
Exception {
-                if (!future.isSuccess()) {
-                    throw new RuntimeException("Server start fail !", 
future.cause());
-                }
+        ChannelFuture channelFuture = 
future.addListener((ChannelFutureListener)future1 -> {
+            if (!future1.isSuccess()) {
+                throw new RuntimeException("Server start fail !", 
future1.cause());
             }
         });
 
@@ -111,13 +100,13 @@ public class ProtocolV1Server {
 
     private EventLoopGroup createBossGroup() {
         NamedThreadFactory threadName =
-                new NamedThreadFactory("SEV-BOSS-" + port, false);
+            new NamedThreadFactory("SEV-BOSS-" + port, false);
         return new NioEventLoopGroup(2, threadName);
     }
 
     private EventLoopGroup createWorkerGroup() {
         NamedThreadFactory threadName =
-                new NamedThreadFactory("SEV-WORKER-" + port, false);
+            new NamedThreadFactory("SEV-WORKER-" + port, false);
         return new NioEventLoopGroup(10, threadName);
     }
 
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java
 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java
index 8b468d0e8f..9a8fff0582 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java
@@ -39,10 +39,8 @@ public class ServerChannelHandler extends 
ChannelInboundHandlerAdapter {
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
         Channel channel = ctx.channel();
-
-        if (msg instanceof ProtocolRpcMessage) {
-            RpcMessage rpcMessage = ((ProtocolRpcMessage) 
msg).protocolMsg2RpcMsg();
-            channel.writeAndFlush(rpcMessage);
+        if (msg instanceof RpcMessage) {
+            channel.writeAndFlush(msg);
         } else {
             LOGGER.error("rpcMessage type error");
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org


Reply via email to