This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 35f00f0395 Reject if response do not match any request (#11882)
35f00f0395 is described below

commit 35f00f0395fbd81ee708ea0e031da3cdad5ef2fb
Author: Albumen Kevin <[email protected]>
AuthorDate: Sat Mar 25 09:37:31 2023 +0800

    Reject if response do not match any request (#11882)
---
 .../remoting/exchange/codec/ExchangeCodec.java     | 27 ++++++++++++++--------
 .../dubbo/remoting/codec/ExchangeCodecTest.java    | 17 ++++++++++++++
 .../dubbo/rpc/protocol/dubbo/DubboCodec.java       |  4 ++--
 .../rpc/protocol/dubbo/DubboCountCodecTest.java    | 13 ++++++++---
 4 files changed, 46 insertions(+), 15 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 44cd564f57..6fdbbea497 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -42,6 +42,8 @@ import 
org.apache.dubbo.remoting.transport.ExceedPayloadLimitException;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 
 /**
  * ExchangeCodec.
@@ -167,7 +169,7 @@ public class ExchangeCodec extends TelnetCodec {
                             data = decodeEventData(channel, 
CodecSupport.deserialize(channel.getUrl(), new 
ByteArrayInputStream(eventPayload), proto), eventPayload);
                         }
                     } else {
-                        data = decodeResponseData(channel, 
CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
+                        data = decodeResponseData(channel, 
CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel, 
res, id));
                     }
                     res.setResult(data);
                 } else {
@@ -209,16 +211,21 @@ public class ExchangeCodec extends TelnetCodec {
         }
     }
 
-    protected Object getRequestData(long id) {
+    protected Object getRequestData(Channel channel, Response response, long 
id) {
         DefaultFuture future = DefaultFuture.getFuture(id);
-        if (future == null) {
-            return null;
-        }
-        Request req = future.getRequest();
-        if (req == null) {
-            return null;
+        if (future != null) {
+            Request req = future.getRequest();
+            if (req != null) {
+                return req.getData();
+            }
         }
-        return req.getData();
+
+        logger.warn("The timeout response finally returned at "
+            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new 
Date()))
+            + ", response status is " + response.getStatus() + ", response id 
is " + response.getId()
+            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+            + " -> " + channel.getRemoteAddress()) + ", please check provider 
side for detailed result.");
+        throw new IllegalArgumentException("Failed to find any request match 
the response, response id: " + id);
     }
 
     protected void encodeRequest(Channel channel, ChannelBuffer buffer, 
Request req) throws IOException {
@@ -427,7 +434,7 @@ public class ExchangeCodec extends TelnetCodec {
         try {
             if (eventBytes != null) {
                 int dataLen = eventBytes.length;
-                int threshold = 
ConfigurationUtils.getSystemConfiguration(channel.getUrl().getScopeModel()).getInt("deserialization.event.size",
 50);
+                int threshold = 
ConfigurationUtils.getSystemConfiguration(channel.getUrl().getScopeModel()).getInt("deserialization.event.size",
 15);
                 if (dataLen > threshold) {
                     throw new IllegalArgumentException("Event data too long, 
actual size " + threshold + ", threshold " + threshold + " rejected for 
security consideration.");
                 }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
index 495092d8bb..54247f28af 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
@@ -30,11 +30,13 @@ import org.apache.dubbo.remoting.buffer.ChannelBuffers;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
 import org.apache.dubbo.remoting.telnet.codec.TelnetCodec;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -137,6 +139,8 @@ public class ExchangeCodecTest extends TelnetCodecTest {
 
     @Test
     public void test_Decode_Error_Length() throws IOException {
+        DefaultFuture future = 
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, 
null);
+
         byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 0x02, 20, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0};
         Person person = new Person();
         byte[] request = getRequestBytes(person, header);
@@ -148,6 +152,8 @@ public class ExchangeCodecTest extends TelnetCodecTest {
         Assertions.assertEquals(person, obj.getResult());
         //only decode necessary bytes
         Assertions.assertEquals(request.length, buffer.readerIndex());
+
+        future.cancel();
     }
 
     @Test
@@ -226,6 +232,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
 
     @Test
     public void test_Decode_Return_Response_Person() throws IOException {
+        DefaultFuture future = 
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, 
null);
         //00000010-response/oneway/hearbeat=false/hessian 
|20-stats=ok|id=0|length=0
         byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 2, 20, 0, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0};
         Person person = new Person();
@@ -235,6 +242,8 @@ public class ExchangeCodecTest extends TelnetCodecTest {
         Assertions.assertEquals(20, obj.getStatus());
         Assertions.assertEquals(person, obj.getResult());
         System.out.println(obj);
+
+        future.cancel();
     }
 
     @Test //The status input has a problem, and the read information is wrong 
when the serialization is serialized.
@@ -324,6 +333,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
 
     @Test
     public void test_Header_Response_NoSerializationFlag() throws IOException {
+        DefaultFuture future = 
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, 
null);
         //00000010-response/oneway/hearbeat=false/noset 
|20-stats=ok|id=0|length=0
         byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte) 0x02, 20, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
         Person person = new Person();
@@ -333,10 +343,13 @@ public class ExchangeCodecTest extends TelnetCodecTest {
         Assertions.assertEquals(20, obj.getStatus());
         Assertions.assertEquals(person, obj.getResult());
         System.out.println(obj);
+
+        future.cancel();
     }
 
     @Test
     public void test_Header_Response_Heartbeat() throws IOException {
+        DefaultFuture future = 
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, 
null);
         //00000010-response/oneway/hearbeat=true |20-stats=ok|id=0|length=0
         byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 0x02, 20, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0};
         Person person = new Person();
@@ -346,6 +359,8 @@ public class ExchangeCodecTest extends TelnetCodecTest {
         Assertions.assertEquals(20, obj.getStatus());
         Assertions.assertEquals(person, obj.getResult());
         System.out.println(obj);
+
+        future.cancel();
     }
 
     @Test
@@ -371,6 +386,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
 
     @Test
     public void test_Encode_Response() throws IOException {
+        DefaultFuture future = 
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(1001), 100000, 
null);
         ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(1024);
         Channel channel = getClientSideChannel(url);
         Response response = new Response();
@@ -396,6 +412,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
         // encode response verson ??
 //        Assertions.assertEquals(response.getProtocolVersion(), 
obj.getVersion());
 
+        future.cancel();
     }
 
     @Test
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 913138dc66..652a5f9e42 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -101,12 +101,12 @@ public class DubboCodec extends ExchangeCodec {
                         DecodeableRpcResult result;
                         if 
(channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, 
DEFAULT_DECODE_IN_IO_THREAD)) {
                             result = new DecodeableRpcResult(channel, res, is,
-                                    (Invocation) getRequestData(id), proto);
+                                    (Invocation) getRequestData(channel, res, 
id), proto);
                             result.decode();
                         } else {
                             result = new DecodeableRpcResult(channel, res,
                                     new 
UnsafeByteArrayInputStream(readMessageData(is)),
-                                    (Invocation) getRequestData(id), proto);
+                                    (Invocation) getRequestData(channel, res, 
id), proto);
                         }
                         data = result;
                     }
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodecTest.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodecTest.java
index b531bb1aa0..212efd5fd4 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodecTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodecTest.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.remoting.buffer.ChannelBuffer;
 import org.apache.dubbo.remoting.buffer.ChannelBuffers;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
 import org.apache.dubbo.remoting.exchange.support.MultiMessage;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.RpcInvocation;
@@ -32,7 +33,9 @@ import 
org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import static org.apache.dubbo.rpc.Constants.INPUT_KEY;
 import static org.apache.dubbo.rpc.Constants.OUTPUT_KEY;
@@ -45,23 +48,25 @@ public class DubboCountCodecTest {
         ChannelBuffer buffer = ChannelBuffers.buffer(1024);
         Channel channel = new MockChannel();
         Assertions.assertEquals(Codec2.DecodeResult.NEED_MORE_INPUT, 
dubboCountCodec.decode(channel, buffer));
+        List<DefaultFuture> futures = new ArrayList<>();
 
         for (int i = 0; i < 10; i++) {
-            Request request = new Request(1);
+            Request request = new Request(i);
+            futures.add(DefaultFuture.newFuture(channel, request, 1000, null));
             RpcInvocation rpcInvocation = new RpcInvocation(null, "echo", 
DemoService.class.getName(), "", new Class<?>[]{String.class}, new 
String[]{"yug"});
             request.setData(rpcInvocation);
             dubboCountCodec.encode(channel, buffer, request);
         }
 
         for (int i = 0; i < 10; i++) {
-            Response response = new Response(1);
+            Response response = new Response(i);
             AppResponse appResponse = new AppResponse(i);
             response.setResult(appResponse);
             dubboCountCodec.encode(channel, buffer, response);
         }
 
         MultiMessage multiMessage = (MultiMessage) 
dubboCountCodec.decode(channel, buffer);
-        Assertions.assertEquals(multiMessage.size(), 20);
+        Assertions.assertEquals(20, multiMessage.size());
         int requestCount = 0;
         int responseCount = 0;
         Iterator iterator = multiMessage.iterator();
@@ -79,6 +84,8 @@ public class DubboCountCodecTest {
         }
         Assertions.assertEquals(requestCount, 10);
         Assertions.assertEquals(responseCount, 10);
+
+        futures.forEach(DefaultFuture::cancel);
     }
 
 }

Reply via email to