This is an automated email from the ASF dual-hosted git repository. albumenj pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit d1f0f0a28f92164616dba561a6c4ef66485603bb Author: Albumen Kevin <[email protected]> AuthorDate: Sat Mar 25 09:41:11 2023 +0800 Reject if response do not match any request (#11882) --- .../remoting/exchange/codec/ExchangeCodec.java | 27 ++++++++++++++-------- .../dubbo/remoting/codec/ExchangeCodecTest.java | 17 ++++++++++++++ .../dubbo/rpc/protocol/dubbo/DubboCodec.java | 4 ++-- 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java index 3319224014..882aedcf33 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java @@ -42,6 +42,8 @@ import org.apache.dubbo.remoting.transport.ExceedPayloadLimitException; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.util.Date; /** * ExchangeCodec. @@ -169,7 +171,7 @@ public class ExchangeCodec extends TelnetCodec { data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload); } } else { - data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id)); + data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel, res, id)); } res.setResult(data); } else { @@ -211,16 +213,21 @@ public class ExchangeCodec extends TelnetCodec { } } - protected Object getRequestData(long id) { + protected Object getRequestData(Channel channel, Response response, long id) { DefaultFuture future = DefaultFuture.getFuture(id); - if (future == null) { - return null; - } - Request req = future.getRequest(); - if (req == null) { - return null; + if (future != null) { + Request req = future.getRequest(); + if (req != null) { + return req.getData(); + } } - return req.getData(); + + logger.warn("The timeout response finally returned at " + + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + + ", response status is " + response.getStatus() + ", response id is " + response.getId() + + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result."); + throw new IllegalArgumentException("Failed to find any request match the response, response id: " + id); } protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { @@ -429,7 +436,7 @@ public class ExchangeCodec extends TelnetCodec { try { if (eventBytes != null) { int dataLen = eventBytes.length; - int threshold = ConfigurationUtils.getSystemConfiguration().getInt("deserialization.event.size", 50); + int threshold = ConfigurationUtils.getSystemConfiguration().getInt("deserialization.event.size", 15); if (dataLen > threshold) { throw new IllegalArgumentException("Event data too long, actual size " + dataLen + ", threshold " + threshold + " rejected for security consideration."); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java index b87147b35f..a1e4c3ddff 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java @@ -30,11 +30,13 @@ import org.apache.dubbo.remoting.buffer.ChannelBuffers; import org.apache.dubbo.remoting.exchange.Request; import org.apache.dubbo.remoting.exchange.Response; import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec; +import org.apache.dubbo.remoting.exchange.support.DefaultFuture; import org.apache.dubbo.remoting.telnet.codec.TelnetCodec; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -137,6 +139,8 @@ public class ExchangeCodecTest extends TelnetCodecTest { @Test public void test_Decode_Error_Length() throws IOException { + DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null); + byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 0x02, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; Person person = new Person(); byte[] request = getRequestBytes(person, header); @@ -148,6 +152,8 @@ public class ExchangeCodecTest extends TelnetCodecTest { Assertions.assertEquals(person, obj.getResult()); // only decode necessary bytes Assertions.assertEquals(request.length, buffer.readerIndex()); + + future.cancel(); } @Test @@ -226,6 +232,7 @@ public class ExchangeCodecTest extends TelnetCodecTest { @Test public void test_Decode_Return_Response_Person() throws IOException { + DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null); // 00000010-response/oneway/hearbeat=false/hessian |20-stats=ok|id=0|length=0 byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 2, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; Person person = new Person(); @@ -235,6 +242,8 @@ public class ExchangeCodecTest extends TelnetCodecTest { Assertions.assertEquals(20, obj.getStatus()); Assertions.assertEquals(person, obj.getResult()); System.out.println(obj); + + future.cancel(); } @Test //The status input has a problem, and the read information is wrong when the serialization is serialized. @@ -324,6 +333,7 @@ public class ExchangeCodecTest extends TelnetCodecTest { @Test public void test_Header_Response_NoSerializationFlag() throws IOException { + DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null); // 00000010-response/oneway/hearbeat=false/noset |20-stats=ok|id=0|length=0 byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte) 0x02, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; Person person = new Person(); @@ -333,10 +343,13 @@ public class ExchangeCodecTest extends TelnetCodecTest { Assertions.assertEquals(20, obj.getStatus()); Assertions.assertEquals(person, obj.getResult()); System.out.println(obj); + + future.cancel(); } @Test public void test_Header_Response_Heartbeat() throws IOException { + DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null); // 00000010-response/oneway/hearbeat=true |20-stats=ok|id=0|length=0 byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 0x02, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; Person person = new Person(); @@ -346,6 +359,8 @@ public class ExchangeCodecTest extends TelnetCodecTest { Assertions.assertEquals(20, obj.getStatus()); Assertions.assertEquals(person, obj.getResult()); System.out.println(obj); + + future.cancel(); } @Test @@ -371,6 +386,7 @@ public class ExchangeCodecTest extends TelnetCodecTest { @Test public void test_Encode_Response() throws IOException { + DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(1001), 100000, null); ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(1024); Channel channel = getCliendSideChannel(url); Response response = new Response(); @@ -396,6 +412,7 @@ public class ExchangeCodecTest extends TelnetCodecTest { // encode response verson ?? // Assertions.assertEquals(response.getProtocolVersion(), obj.getVersion()); + future.cancel(); } @Test diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java index c6c7994de3..44728a4d73 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java @@ -94,12 +94,12 @@ public class DubboCodec extends ExchangeCodec { DecodeableRpcResult result; if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) { result = new DecodeableRpcResult(channel, res, is, - (Invocation) getRequestData(id), proto); + (Invocation) getRequestData(channel, res, id), proto); result.decode(); } else { result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), - (Invocation) getRequestData(id), proto); + (Invocation) getRequestData(channel, res, id), proto); } data = result; }
