This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit d1f0f0a28f92164616dba561a6c4ef66485603bb
Author: Albumen Kevin <[email protected]>
AuthorDate: Sat Mar 25 09:41:11 2023 +0800

    Reject if response do not match any request (#11882)
---
 .../remoting/exchange/codec/ExchangeCodec.java     | 27 ++++++++++++++--------
 .../dubbo/remoting/codec/ExchangeCodecTest.java    | 17 ++++++++++++++
 .../dubbo/rpc/protocol/dubbo/DubboCodec.java       |  4 ++--
 3 files changed, 36 insertions(+), 12 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 3319224014..882aedcf33 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -42,6 +42,8 @@ import 
org.apache.dubbo.remoting.transport.ExceedPayloadLimitException;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 
 /**
  * ExchangeCodec.
@@ -169,7 +171,7 @@ public class ExchangeCodec extends TelnetCodec {
                             data = decodeEventData(channel, 
CodecSupport.deserialize(channel.getUrl(), new 
ByteArrayInputStream(eventPayload), proto), eventPayload);
                         }
                     } else {
-                        data = decodeResponseData(channel, 
CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
+                        data = decodeResponseData(channel, 
CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel, 
res, id));
                     }
                     res.setResult(data);
                 } else {
@@ -211,16 +213,21 @@ public class ExchangeCodec extends TelnetCodec {
         }
     }
 
-    protected Object getRequestData(long id) {
+    protected Object getRequestData(Channel channel, Response response, long 
id) {
         DefaultFuture future = DefaultFuture.getFuture(id);
-        if (future == null) {
-            return null;
-        }
-        Request req = future.getRequest();
-        if (req == null) {
-            return null;
+        if (future != null) {
+            Request req = future.getRequest();
+            if (req != null) {
+                return req.getData();
+            }
         }
-        return req.getData();
+
+        logger.warn("The timeout response finally returned at "
+            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new 
Date()))
+            + ", response status is " + response.getStatus() + ", response id 
is " + response.getId()
+            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+            + " -> " + channel.getRemoteAddress()) + ", please check provider 
side for detailed result.");
+        throw new IllegalArgumentException("Failed to find any request match 
the response, response id: " + id);
     }
 
     protected void encodeRequest(Channel channel, ChannelBuffer buffer, 
Request req) throws IOException {
@@ -429,7 +436,7 @@ public class ExchangeCodec extends TelnetCodec {
         try {
             if (eventBytes != null) {
                 int dataLen = eventBytes.length;
-                int threshold = 
ConfigurationUtils.getSystemConfiguration().getInt("deserialization.event.size",
 50);
+                int threshold = 
ConfigurationUtils.getSystemConfiguration().getInt("deserialization.event.size",
 15);
                 if (dataLen > threshold) {
                     throw new IllegalArgumentException("Event data too long, 
actual size " + dataLen + ", threshold " + threshold + " rejected for security 
consideration.");
                 }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
index b87147b35f..a1e4c3ddff 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
@@ -30,11 +30,13 @@ import org.apache.dubbo.remoting.buffer.ChannelBuffers;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
 import org.apache.dubbo.remoting.telnet.codec.TelnetCodec;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -137,6 +139,8 @@ public class ExchangeCodecTest extends TelnetCodecTest {
 
     @Test
     public void test_Decode_Error_Length() throws IOException {
+        DefaultFuture future = 
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, 
null);
+
         byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 0x02, 20, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0};
         Person person = new Person();
         byte[] request = getRequestBytes(person, header);
@@ -148,6 +152,8 @@ public class ExchangeCodecTest extends TelnetCodecTest {
         Assertions.assertEquals(person, obj.getResult());
         // only decode necessary bytes
         Assertions.assertEquals(request.length, buffer.readerIndex());
+
+        future.cancel();
     }
 
     @Test
@@ -226,6 +232,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
 
     @Test
     public void test_Decode_Return_Response_Person() throws IOException {
+        DefaultFuture future = 
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, 
null);
         // 00000010-response/oneway/hearbeat=false/hessian 
|20-stats=ok|id=0|length=0
         byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 2, 20, 0, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0};
         Person person = new Person();
@@ -235,6 +242,8 @@ public class ExchangeCodecTest extends TelnetCodecTest {
         Assertions.assertEquals(20, obj.getStatus());
         Assertions.assertEquals(person, obj.getResult());
         System.out.println(obj);
+
+        future.cancel();
     }
 
     @Test //The status input has a problem, and the read information is wrong 
when the serialization is serialized.
@@ -324,6 +333,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
 
     @Test
     public void test_Header_Response_NoSerializationFlag() throws IOException {
+        DefaultFuture future = 
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, 
null);
         // 00000010-response/oneway/hearbeat=false/noset 
|20-stats=ok|id=0|length=0
         byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte) 0x02, 20, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
         Person person = new Person();
@@ -333,10 +343,13 @@ public class ExchangeCodecTest extends TelnetCodecTest {
         Assertions.assertEquals(20, obj.getStatus());
         Assertions.assertEquals(person, obj.getResult());
         System.out.println(obj);
+
+        future.cancel();
     }
 
     @Test
     public void test_Header_Response_Heartbeat() throws IOException {
+        DefaultFuture future = 
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, 
null);
         // 00000010-response/oneway/hearbeat=true |20-stats=ok|id=0|length=0
         byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 0x02, 20, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0};
         Person person = new Person();
@@ -346,6 +359,8 @@ public class ExchangeCodecTest extends TelnetCodecTest {
         Assertions.assertEquals(20, obj.getStatus());
         Assertions.assertEquals(person, obj.getResult());
         System.out.println(obj);
+
+        future.cancel();
     }
 
     @Test
@@ -371,6 +386,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
 
     @Test
     public void test_Encode_Response() throws IOException {
+        DefaultFuture future = 
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(1001), 100000, 
null);
         ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(1024);
         Channel channel = getCliendSideChannel(url);
         Response response = new Response();
@@ -396,6 +412,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
         // encode response verson ??
 //        Assertions.assertEquals(response.getProtocolVersion(), 
obj.getVersion());
 
+        future.cancel();
     }
 
     @Test
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index c6c7994de3..44728a4d73 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -94,12 +94,12 @@ public class DubboCodec extends ExchangeCodec {
                         DecodeableRpcResult result;
                         if 
(channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, 
DEFAULT_DECODE_IN_IO_THREAD)) {
                             result = new DecodeableRpcResult(channel, res, is,
-                                    (Invocation) getRequestData(id), proto);
+                                    (Invocation) getRequestData(channel, res, 
id), proto);
                             result.decode();
                         } else {
                             result = new DecodeableRpcResult(channel, res,
                                     new 
UnsafeByteArrayInputStream(readMessageData(is)),
-                                    (Invocation) getRequestData(id), proto);
+                                    (Invocation) getRequestData(channel, res, 
id), proto);
                         }
                         data = result;
                     }

Reply via email to