This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 340aa3caa1 optimize: add the request and response objects in HTTP 
thread context (#7822)
340aa3caa1 is described below

commit 340aa3caa10f01c0d510e718027fa90fdfd658d9
Author: lokidundun <[email protected]>
AuthorDate: Wed Dec 24 14:41:01 2025 +0800

    optimize: add the request and response objects in HTTP thread context 
(#7822)
---
 changes/en-us/2.x.md                               |   1 +
 changes/zh-cn/2.x.md                               |   1 +
 .../core/rpc/netty/http/HttpDispatchHandler.java   |  33 ++++--
 .../rpc/netty/http/filter/HttpFilterContext.java   |  22 ++++
 .../rpc/netty/http/HttpDispatchHandlerTest.java    | 126 +++++++++++++++++++++
 5 files changed, 174 insertions(+), 9 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 44dcd804df..303d575053 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -98,6 +98,7 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#7721](https://github.com/apache/incubator-seata/pull/7721)] optimize 
common module
 - [[#7809](https://github.com/apache/incubator-seata/pull/7809)] optimize 
README.md
 - [[#7813](https://github.com/apache/incubator-seata/pull/7813)] add decode 
buffer limit
+- [[#7822](https://github.com/apache/incubator-seata/pull/7822)] add the 
request and response objects in HTTP thread context
 - [[#7829](https://github.com/apache/incubator-seata/pull/7829)] optimize lz4 
compressor
 - [[#7864](https://github.com/apache/incubator-seata/pull/7864)] automatically 
skip the compilation of console and namingserver modules in JDK<25
 - [[#7868](https://github.com/apache/incubator-seata/pull/7868)] change 
build_arm64-binary CI to JDK25 Version and runs on ubuntu-24.04-arm
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index e97981cb84..5fa00db58a 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -97,6 +97,7 @@
 - [[#7721](https://github.com/apache/incubator-seata/pull/7721)] 优化common 模块
 - [[#7809](https://github.com/apache/incubator-seata/pull/7809)] 优化 README.md
 - [[#7813](https://github.com/apache/incubator-seata/pull/7813)] 增加解码buffer限制
+- [[#7822](https://github.com/apache/incubator-seata/pull/7822)] 在 HTTP 
线程上下文中添加请求和响应对象
 - [[#7829](https://github.com/apache/incubator-seata/pull/7829)] 优化lz4 
compressor
 - [[#7864](https://github.com/apache/incubator-seata/pull/7864)] 
自动跳过JDK<25环境下的console和namingserver模块编译
 - [[#7868](https://github.com/apache/incubator-seata/pull/7868)] 
将build_arm64-binary的CI更改为JDK25版本,并运行于ubuntu-24.04-arm
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandler.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandler.java
index a8f9022f08..a9536c7278 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandler.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandler.java
@@ -75,7 +75,8 @@ public class HttpDispatchHandler extends 
BaseHttpChannelHandler<HttpRequest> {
         HttpInvocation httpInvocation = 
ControllerManager.getHttpInvocation(path);
 
         if (httpInvocation == null) {
-            sendErrorResponse(ctx, HttpResponseStatus.NOT_FOUND, false);
+            FullHttpResponse errorResponse = addErrorResponse(context, 
HttpResponseStatus.NOT_FOUND);
+            sendErrorResponse(ctx, errorResponse, false);
             return;
         }
 
@@ -95,7 +96,8 @@ public class HttpDispatchHandler extends 
BaseHttpChannelHandler<HttpRequest> {
                     httpInvocation.getParamMetaData(), 
httpInvocation.getMethod(), requestDataNode, context);
         } catch (Exception e) {
             LOGGER.error("Error parsing request arguments: {}", 
e.getMessage(), e);
-            sendErrorResponse(ctx, HttpResponseStatus.BAD_REQUEST, false);
+            FullHttpResponse errorResponse = addErrorResponse(context, 
HttpResponseStatus.BAD_REQUEST);
+            sendErrorResponse(ctx, errorResponse, false);
             return;
         }
         context.setAttribute("args", args);
@@ -103,14 +105,19 @@ public class HttpDispatchHandler extends 
BaseHttpChannelHandler<HttpRequest> {
         // Execute filter chain in HTTP thread pool
         HttpRequestFilterChain filterChain = 
HttpRequestFilterManager.getFilterChain(this::executeFinalAction);
         HTTP_HANDLER_THREADS.execute(() -> {
+            HttpFilterContext.setCurrentContext(context);
             try {
                 filterChain.doFilter(context);
             } catch (HttpRequestFilterException e) {
                 LOGGER.warn("Request blocked by filter: {}", e.getMessage());
-                sendErrorResponse(ctx, HttpResponseStatus.BAD_REQUEST, false);
+                FullHttpResponse errorResponse = addErrorResponse(context, 
HttpResponseStatus.BAD_REQUEST);
+                sendErrorResponse(ctx, errorResponse, false);
             } catch (Exception e) {
                 LOGGER.error("Unexpected error during request processing: {}", 
e.getMessage(), e);
-                sendErrorResponse(ctx, 
HttpResponseStatus.INTERNAL_SERVER_ERROR, false);
+                FullHttpResponse errorResponse = addErrorResponse(context, 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                sendErrorResponse(ctx, errorResponse, false);
+            } finally {
+                HttpFilterContext.clearCurrentContext();
             }
         });
     }
@@ -127,17 +134,19 @@ public class HttpDispatchHandler extends 
BaseHttpChannelHandler<HttpRequest> {
                 return;
             }
 
-            sendResponse(context.getContext(), context.isKeepAlive(), result);
+            sendResponse(context.getContext(), context.isKeepAlive(), result, 
context);
         } catch (IllegalArgumentException e) {
             LOGGER.error("Illegal argument exception: {}", e.getMessage(), e);
-            sendErrorResponse(context.getContext(), 
HttpResponseStatus.BAD_REQUEST, false);
+            FullHttpResponse errorResponse = addErrorResponse(context, 
HttpResponseStatus.BAD_REQUEST);
+            sendErrorResponse(context.getContext(), errorResponse, false);
         } catch (Exception e) {
             LOGGER.error("Exception occurred while processing HTTP request: 
{}", e.getMessage(), e);
-            sendErrorResponse(context.getContext(), 
HttpResponseStatus.INTERNAL_SERVER_ERROR, false);
+            FullHttpResponse errorResponse = addErrorResponse(context, 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            sendErrorResponse(context.getContext(), errorResponse, false);
         }
     }
 
-    private void sendResponse(ChannelHandlerContext ctx, boolean keepAlive, 
Object result)
+    private void sendResponse(ChannelHandlerContext ctx, boolean keepAlive, 
Object result, HttpFilterContext<?> context)
             throws JsonProcessingException {
         FullHttpResponse response;
         if (result != null) {
@@ -149,6 +158,7 @@ public class HttpDispatchHandler extends 
BaseHttpChannelHandler<HttpRequest> {
             response = new DefaultFullHttpResponse(
                     HttpVersion.HTTP_1_1, HttpResponseStatus.OK, 
Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER));
         }
+        context.setResponse(response);
         if (!keepAlive) {
             
ctx.writeAndFlush(response).addListeners(ChannelFutureListener.CLOSE);
         } else {
@@ -156,9 +166,14 @@ public class HttpDispatchHandler extends 
BaseHttpChannelHandler<HttpRequest> {
         }
     }
 
-    private void sendErrorResponse(ChannelHandlerContext ctx, 
HttpResponseStatus status, boolean keepAlive) {
+    private FullHttpResponse addErrorResponse(HttpFilterContext<?> context, 
HttpResponseStatus status) {
         FullHttpResponse response = new DefaultFullHttpResponse(
                 HttpVersion.HTTP_1_1, status, 
Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER));
+        context.setResponse(response);
+        return response;
+    }
+
+    private void sendErrorResponse(ChannelHandlerContext ctx, FullHttpResponse 
response, boolean keepAlive) {
         if (!keepAlive) {
             
ctx.writeAndFlush(response).addListeners(ChannelFutureListener.CLOSE);
         } else {
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/http/filter/HttpFilterContext.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/http/filter/HttpFilterContext.java
index e9eb2d9ab2..030d73ebab 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/http/filter/HttpFilterContext.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/http/filter/HttpFilterContext.java
@@ -27,6 +27,8 @@ public class HttpFilterContext<T> extends HttpContext<T> {
     private final Supplier<HttpRequestParamWrapper> paramWrapperSupplier;
     private volatile HttpRequestParamWrapper paramWrapper;
     private final Map<String, Object> attributes = new ConcurrentHashMap<>();
+    private static final ThreadLocal<HttpFilterContext<?>> CURRENT_CONTEXT = 
new ThreadLocal<>();
+    private Object response;
 
     public HttpFilterContext(
             T request,
@@ -38,6 +40,26 @@ public class HttpFilterContext<T> extends HttpContext<T> {
         this.paramWrapperSupplier = paramWrapperSupplier;
     }
 
+    public static HttpFilterContext<?> getCurrentContext() {
+        return CURRENT_CONTEXT.get();
+    }
+
+    public static void setCurrentContext(HttpFilterContext<?> context) {
+        CURRENT_CONTEXT.set(context);
+    }
+
+    public static void clearCurrentContext() {
+        CURRENT_CONTEXT.remove();
+    }
+
+    public void setResponse(Object response) {
+        this.response = response;
+    }
+
+    public Object getResponse() {
+        return this.response;
+    }
+
     public HttpRequestParamWrapper getParamWrapper() {
         if (paramWrapper == null) {
             synchronized (this) {
diff --git 
a/core/src/test/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandlerTest.java
 
b/core/src/test/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandlerTest.java
index de5ef2f198..2549603558 100644
--- 
a/core/src/test/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandlerTest.java
+++ 
b/core/src/test/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandlerTest.java
@@ -16,33 +16,53 @@
  */
 package org.apache.seata.core.rpc.netty.http;
 
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
+import org.apache.seata.common.rpc.http.HttpContext;
 import org.apache.seata.core.exception.HttpRequestFilterException;
+import org.apache.seata.core.rpc.netty.http.filter.HttpFilterContext;
+import org.apache.seata.core.rpc.netty.http.filter.HttpRequestFilter;
 import org.apache.seata.core.rpc.netty.http.filter.HttpRequestFilterChain;
 import org.apache.seata.core.rpc.netty.http.filter.HttpRequestFilterManager;
+import org.apache.seata.core.rpc.netty.http.filter.HttpRequestParamWrapper;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
 import org.mockito.MockedStatic;
+import org.mockito.MockitoAnnotations;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import static org.assertj.core.api.Fail.fail;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
 
 class HttpDispatchHandlerTest {
 
@@ -50,6 +70,13 @@ class HttpDispatchHandlerTest {
     private EmbeddedChannel channel;
     private TestController testController = new TestController();
 
+    @Mock
+    private ChannelHandlerContext mockCtx;
+
+    private FullHttpRequest testHttpRequest;
+    private ExecutorService testExecutor;
+    private EmbeddedChannel embeddedChannel;
+
     class TestController {
         public String handleRequest(String param) {
             return "Processed: " + param;
@@ -58,6 +85,7 @@ class HttpDispatchHandlerTest {
 
     @BeforeEach
     void setUp() throws NoSuchMethodException {
+        MockitoAnnotations.openMocks(this);
         handler = new HttpDispatchHandler();
         channel = new EmbeddedChannel(handler);
         Method method = TestController.class.getMethod("handleRequest", 
String.class);
@@ -73,6 +101,16 @@ class HttpDispatchHandlerTest {
         invocation.setParamMetaData(paramMetaDatas);
 
         ControllerManager.addHttpInvocation(invocation);
+
+        testHttpRequest = new DefaultFullHttpRequest(
+                HttpVersion.HTTP_1_1,
+                HttpMethod.GET,
+                "/test",
+                Unpooled.copiedBuffer("{\"name\":\"test\"}", 
StandardCharsets.UTF_8));
+        embeddedChannel = new EmbeddedChannel();
+        when(mockCtx.channel()).thenReturn(embeddedChannel);
+        when(mockCtx.pipeline()).thenReturn(embeddedChannel.pipeline());
+        testExecutor = Executors.newSingleThreadExecutor();
     }
 
     @AfterEach
@@ -81,6 +119,15 @@ class HttpDispatchHandlerTest {
         Field field2 = 
HttpRequestFilterManager.class.getDeclaredField("initialized");
         field2.setAccessible(true);
         field2.set(null, false);
+
+        if (testExecutor != null) {
+            testExecutor.shutdown();
+            testExecutor.awaitTermination(1, TimeUnit.SECONDS);
+        }
+        HttpFilterContext.clearCurrentContext();
+        if (embeddedChannel != null) {
+            embeddedChannel.close();
+        }
     }
 
     @Test
@@ -304,6 +351,85 @@ class HttpDispatchHandlerTest {
         }
     }
 
+    @Test
+    void testHttpFilterContextAvailableInThreadLocalDuringFilterExecution()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        class MockHttpAspect {
+            public void beforeFilter() {
+                HttpFilterContext<?> context = 
HttpFilterContext.getCurrentContext();
+                assertNotNull(context, "get request and response");
+
+                HttpRequest request = (HttpRequest) context.getRequest();
+                assertNotNull(request);
+                assertEquals("/test", request.uri());
+                assertEquals(HttpMethod.GET, request.method());
+                assertEquals("test-invocation", 
context.getAttribute("testKey"));
+            }
+
+            public void afterFilter() {
+                HttpFilterContext<?> context = 
HttpFilterContext.getCurrentContext();
+                assertNotNull(context, "get request and response");
+
+                FullHttpResponse response = (FullHttpResponse) 
context.getResponse();
+                assertNotNull(response);
+                assertEquals(HttpResponseStatus.OK, response.status());
+            }
+
+            public void afterFinally() {
+                assertNull(HttpFilterContext.getCurrentContext(), "clean the 
request and response");
+            }
+        }
+
+        MockHttpAspect mockAspect = new MockHttpAspect();
+
+        HttpRequestFilter businessFilter = new HttpRequestFilter() {
+            @Override
+            public void doFilter(HttpFilterContext<?> ctx, 
HttpRequestFilterChain chain)
+                    throws HttpRequestFilterException {
+                mockAspect.beforeFilter();
+
+                chain.doFilter(ctx);
+
+                FullHttpResponse response =
+                        new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER);
+                ctx.setResponse(response);
+
+                mockAspect.afterFilter();
+            }
+
+            @Override
+            public boolean shouldApply() {
+                return false;
+            }
+        };
+
+        HttpRequestFilterChain filterChain = new 
HttpRequestFilterChain(Arrays.asList(businessFilter), ctx -> {});
+
+        HttpFilterContext<HttpRequest> context = new HttpFilterContext<>(
+                testHttpRequest,
+                mockCtx,
+                true,
+                HttpContext.HTTP_1_1,
+                () -> new HttpRequestParamWrapper(null, null, null, null));
+        context.setAttribute("testKey", "test-invocation");
+
+        testExecutor
+                .submit(() -> {
+                    try {
+                        HttpFilterContext.setCurrentContext(context);
+                        filterChain.doFilter(context);
+                    } catch (HttpRequestFilterException e) {
+                        fail("Filter failure: " + e.getMessage());
+                    } finally {
+                        HttpFilterContext.clearCurrentContext();
+                        mockAspect.afterFinally();
+                    }
+                })
+                .get(1, TimeUnit.SECONDS);
+
+        assertNull(HttpFilterContext.getCurrentContext());
+    }
+
     private FullHttpResponse waitForResponse(long timeoutMs) {
         long startTime = System.currentTimeMillis();
         FullHttpResponse response = null;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to