Copilot commented on code in PR #7000: URL: https://github.com/apache/incubator-seata/pull/7000#discussion_r2442317421
########## serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MultiVersionCodecHelper.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.serializer.seata; + +import org.apache.seata.core.protocol.IncompatibleVersionException; +import org.apache.seata.core.protocol.Version; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * the type MultiVersionCodecHelper + * + **/ +public class MultiVersionCodecHelper { + + private static final Logger LOGGER = LoggerFactory.getLogger(MultiVersionCodecHelper.class); + + public static MessageSeataCodec match(String v, MessageSeataCodec messageCodec) { + try { + if (!(messageCodec instanceof MultiVersionCodec)) { + return null; + } + Map<MultiVersionCodec.VersionRange, MessageSeataCodec> map = + ((MultiVersionCodec) messageCodec).oldVersionCodec(); + long version = Version.convertVersion(v); + for (MultiVersionCodec.VersionRange range : map.keySet()) { + if (version > Version.convertVersion(range.getBegin()) + && version <= Version.convertVersion(range.getEnd())) { + return map.get(version); Review Comment: Incorrect key used in map.get(). The code retrieves the MessageSeataCodec using 'version' (a long) as the key, but the map is keyed by VersionRange objects. This will always return null since the types don't match. Should use 'range' instead: `return map.get(range);` ```suggestion return map.get(range); ``` ########## test/src/test/java/org/apache/seata/core/rpc/netty/multiversion/MultiVersionCompatibilityTest.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.multiversion; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.handler.timeout.IdleStateHandler; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.protocol.RegisterTMRequest; +import org.apache.seata.core.protocol.RegisterTMResponse; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.rpc.netty.MultiProtocolDecoderTest; +import org.apache.seata.core.rpc.netty.TestClientHandler; +import org.apache.seata.core.rpc.netty.TestServerHandler; +import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; +import org.apache.seata.core.rpc.netty.v2.ProtocolEncoderV2; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Prerequisites for running these tests: + * If maven environment has dependency issues, run these commands first: + * 1. chmod -R u+rwx ./* + * 2. mvn -Prelease-seata -Dmaven.test.skip=true clean install -U + * This provides common utilities for testing multi-version protocol compatibility + * and simulates realistic server and client construction flows. + */ +public abstract class MultiVersionCompatibilityTest { + + // LOG instance + private static final Logger LOGGER = LoggerFactory.getLogger(MultiVersionCompatibilityTest.class); + + protected EventLoopGroup bossGroup; + protected EventLoopGroup workerGroup; + protected EventLoopGroup clientGroup; + protected Channel serverChannel; + protected Channel clientChannel; + protected final AtomicReference<Object> requestRef = new AtomicReference<>(); + protected final AtomicReference<Object> responseRef = new AtomicReference<>(); + protected final CountDownLatch responseLatch = new CountDownLatch(1); + private final MultiProtocolDecoderTest decoderTestHelper = new MultiProtocolDecoderTest(); + + @BeforeEach + public void setUp() { + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + clientGroup = new NioEventLoopGroup(); + requestRef.set(null); + responseRef.set(null); + } + + @AfterEach + public void tearDown() throws InterruptedException { + if (clientChannel != null) { + clientChannel.close().sync(); + } + if (serverChannel != null) { + serverChannel.close().sync(); + } + bossGroup.shutdownGracefully().sync(); + workerGroup.shutdownGracefully().sync(); + clientGroup.shutdownGracefully().sync(); + } + + protected void startV1Server(int port) throws InterruptedException { + startServerByVersion(ProtocolConstants.VERSION_1, port); + } + + protected void startV2Server(int port) throws InterruptedException { + startServerByVersion(ProtocolConstants.VERSION_2, port); + } + + protected void connectV1Client(String host, int port, int connectTimeout) { + connectClientByVersion(new ProtocolEncoderV1(), ProtocolConstants.VERSION_1, host, port, connectTimeout); + } + + protected void connectV2Client(String host, int port, int connectTimeout) { + connectClientByVersion(new ProtocolEncoderV2(), ProtocolConstants.VERSION_2, host, port, connectTimeout); + } + + private void startServerByVersion(byte version1, int port) throws InterruptedException { + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + // Simulate V1 server with forced V1 version in MultiProtocolDecoder + pipeline.addLast(new IdleStateHandler(0, 0, 30)); + pipeline.addLast( + decoderTestHelper.createMultiProtocolDecoder(version1, createTestServerHandler())); + } + }); + + ChannelFuture future = serverBootstrap.bind(port).sync(); + serverChannel = future.channel(); + } + + private void connectClientByVersion( + MessageToByteEncoder encoder, byte version, String host, int port, int connectTimeout) { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(clientGroup).channel(NioSocketChannel.class); + bootstrap.option(ChannelOption.SO_KEEPALIVE, true); + bootstrap.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); + bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + // Simulate V1 client with forced V1 version in MultiProtocolDecoder + pipeline.addLast(new IdleStateHandler(0, 0, 15)); + pipeline.addLast(encoder); // V1 client uses V1 encoder + pipeline.addLast(decoderTestHelper.createMultiProtocolDecoder(version, createTestClientHandler())); + } + }); + + ChannelFuture channelFuture = bootstrap.connect(host, port); + channelFuture.awaitUninterruptibly(connectTimeout, TimeUnit.MILLISECONDS); + if (channelFuture.isSuccess()) { + clientChannel = channelFuture.channel(); + } + } + + /** + * Send request through client channel + */ + protected void sendRequest(Object request) { + if (clientChannel != null && clientChannel.isActive()) { + // Wrap request in RpcMessage as real clients do + RpcMessage rpcMessage = buildRequestMessage(request); + clientChannel.writeAndFlush(rpcMessage); + } + } + + /** + * Build RpcMessage as real clients do + */ + private RpcMessage buildRequestMessage(Object msg) { + RpcMessage rpcMessage = new RpcMessage(); + rpcMessage.setId(getNextMessageId()); + rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_SYNC); Review Comment: Corrected spelling of 'MSGTYPE_RESQUEST_SYNC' to 'MSGTYPE_REQUEST_SYNC'. ```suggestion rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_REQUEST_SYNC); ``` ########## core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java: ########## @@ -64,17 +68,27 @@ public class MultiProtocolDecoder extends LengthFieldBasedFrameDecoder { private final ChannelHandler[] channelHandlers; + private final byte maxCurrentVersion; // For testing purposes + public MultiProtocolDecoder(ChannelHandler... channelHandlers) { // default is 8M this(ProtocolConstants.MAX_FRAME_LENGTH, channelHandlers); } - public MultiProtocolDecoder() { - // default is 8M - this(ProtocolConstants.MAX_FRAME_LENGTH, null); + /** + * Constructor for testing purposes to force a specific protocol version + * @param maxCurrentVersion the protocol version to force + * @param channelHandlers additional channel handlers + */ + MultiProtocolDecoder(byte maxCurrentVersion, ChannelHandler... channelHandlers) { + this(ProtocolConstants.MAX_FRAME_LENGTH, maxCurrentVersion, channelHandlers); } public MultiProtocolDecoder(int maxFrameLength, ChannelHandler[] channelHandlers) { + this(maxFrameLength, ProtocolConstants.VERSION, channelHandlers); + } + Review Comment: Missing javadoc for this package-private constructor. The parameters maxFrameLength and maxCurrentVersion should be documented to clarify their purpose, especially since maxCurrentVersion is a new parameter introduced for testing. ```suggestion /** * Package-private constructor for advanced usage and testing. * * @param maxFrameLength the maximum length of the frame. Used to prevent excessive memory allocation for large frames. * @param maxCurrentVersion the protocol version to use. This is primarily intended for testing different protocol versions. * @param channelHandlers additional channel handlers to be used in the pipeline. */ ``` ########## core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java: ########## @@ -64,17 +68,27 @@ public class MultiProtocolDecoder extends LengthFieldBasedFrameDecoder { private final ChannelHandler[] channelHandlers; + private final byte maxCurrentVersion; // For testing purposes + public MultiProtocolDecoder(ChannelHandler... channelHandlers) { // default is 8M this(ProtocolConstants.MAX_FRAME_LENGTH, channelHandlers); } - public MultiProtocolDecoder() { - // default is 8M - this(ProtocolConstants.MAX_FRAME_LENGTH, null); + /** + * Constructor for testing purposes to force a specific protocol version + * @param maxCurrentVersion the protocol version to force + * @param channelHandlers additional channel handlers + */ Review Comment: [nitpick] The constructor has package-private visibility but lacks documentation explaining it's for testing purposes. This is inconsistent with the public constructors which have clear javadoc. Consider adding javadoc or making the visibility explicit with a comment indicating this is test-only. ```suggestion * Package-private constructor intended for test-only usage to force a specific protocol version. * <p> * This constructor should only be used in tests. For production code, use the public constructors. * </p> * @param maxCurrentVersion the protocol version to force * @param channelHandlers additional channel handlers */ // package-private for test-only usage ``` ########## core/src/main/java/org/apache/seata/core/protocol/Version.java: ########## @@ -97,6 +100,15 @@ public static boolean isV0(String version) { return !isAboveOrEqualVersion(version, VERSION_0_7_1); } + public static boolean isAboveOrEqualV2(String version) { + // todo + return true; Review Comment: The method isAboveOrEqualV2() always returns true with a 'todo' comment, which makes it non-functional. This will incorrectly report all versions as V2 or above. The method should implement proper version comparison logic similar to other version checking methods in this class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
