This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new 340aa3caa1 optimize: add the request and response objects in HTTP
thread context (#7822)
340aa3caa1 is described below
commit 340aa3caa10f01c0d510e718027fa90fdfd658d9
Author: lokidundun <[email protected]>
AuthorDate: Wed Dec 24 14:41:01 2025 +0800
optimize: add the request and response objects in HTTP thread context
(#7822)
---
changes/en-us/2.x.md | 1 +
changes/zh-cn/2.x.md | 1 +
.../core/rpc/netty/http/HttpDispatchHandler.java | 33 ++++--
.../rpc/netty/http/filter/HttpFilterContext.java | 22 ++++
.../rpc/netty/http/HttpDispatchHandlerTest.java | 126 +++++++++++++++++++++
5 files changed, 174 insertions(+), 9 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 44dcd804df..303d575053 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -98,6 +98,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7721](https://github.com/apache/incubator-seata/pull/7721)] optimize
common module
- [[#7809](https://github.com/apache/incubator-seata/pull/7809)] optimize
README.md
- [[#7813](https://github.com/apache/incubator-seata/pull/7813)] add decode
buffer limit
+- [[#7822](https://github.com/apache/incubator-seata/pull/7822)] add the
request and response objects in HTTP thread context
- [[#7829](https://github.com/apache/incubator-seata/pull/7829)] optimize lz4
compressor
- [[#7864](https://github.com/apache/incubator-seata/pull/7864)] automatically
skip the compilation of console and namingserver modules in JDK<25
- [[#7868](https://github.com/apache/incubator-seata/pull/7868)] change
build_arm64-binary CI to JDK25 Version and runs on ubuntu-24.04-arm
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index e97981cb84..5fa00db58a 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -97,6 +97,7 @@
- [[#7721](https://github.com/apache/incubator-seata/pull/7721)] 优化common 模块
- [[#7809](https://github.com/apache/incubator-seata/pull/7809)] 优化 README.md
- [[#7813](https://github.com/apache/incubator-seata/pull/7813)] 增加解码buffer限制
+- [[#7822](https://github.com/apache/incubator-seata/pull/7822)] 在 HTTP
线程上下文中添加请求和响应对象
- [[#7829](https://github.com/apache/incubator-seata/pull/7829)] 优化lz4
compressor
- [[#7864](https://github.com/apache/incubator-seata/pull/7864)]
自动跳过JDK<25环境下的console和namingserver模块编译
- [[#7868](https://github.com/apache/incubator-seata/pull/7868)]
将build_arm64-binary的CI更改为JDK25版本,并运行于ubuntu-24.04-arm
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandler.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandler.java
index a8f9022f08..a9536c7278 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandler.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandler.java
@@ -75,7 +75,8 @@ public class HttpDispatchHandler extends
BaseHttpChannelHandler<HttpRequest> {
HttpInvocation httpInvocation =
ControllerManager.getHttpInvocation(path);
if (httpInvocation == null) {
- sendErrorResponse(ctx, HttpResponseStatus.NOT_FOUND, false);
+ FullHttpResponse errorResponse = addErrorResponse(context,
HttpResponseStatus.NOT_FOUND);
+ sendErrorResponse(ctx, errorResponse, false);
return;
}
@@ -95,7 +96,8 @@ public class HttpDispatchHandler extends
BaseHttpChannelHandler<HttpRequest> {
httpInvocation.getParamMetaData(),
httpInvocation.getMethod(), requestDataNode, context);
} catch (Exception e) {
LOGGER.error("Error parsing request arguments: {}",
e.getMessage(), e);
- sendErrorResponse(ctx, HttpResponseStatus.BAD_REQUEST, false);
+ FullHttpResponse errorResponse = addErrorResponse(context,
HttpResponseStatus.BAD_REQUEST);
+ sendErrorResponse(ctx, errorResponse, false);
return;
}
context.setAttribute("args", args);
@@ -103,14 +105,19 @@ public class HttpDispatchHandler extends
BaseHttpChannelHandler<HttpRequest> {
// Execute filter chain in HTTP thread pool
HttpRequestFilterChain filterChain =
HttpRequestFilterManager.getFilterChain(this::executeFinalAction);
HTTP_HANDLER_THREADS.execute(() -> {
+ HttpFilterContext.setCurrentContext(context);
try {
filterChain.doFilter(context);
} catch (HttpRequestFilterException e) {
LOGGER.warn("Request blocked by filter: {}", e.getMessage());
- sendErrorResponse(ctx, HttpResponseStatus.BAD_REQUEST, false);
+ FullHttpResponse errorResponse = addErrorResponse(context,
HttpResponseStatus.BAD_REQUEST);
+ sendErrorResponse(ctx, errorResponse, false);
} catch (Exception e) {
LOGGER.error("Unexpected error during request processing: {}",
e.getMessage(), e);
- sendErrorResponse(ctx,
HttpResponseStatus.INTERNAL_SERVER_ERROR, false);
+ FullHttpResponse errorResponse = addErrorResponse(context,
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ sendErrorResponse(ctx, errorResponse, false);
+ } finally {
+ HttpFilterContext.clearCurrentContext();
}
});
}
@@ -127,17 +134,19 @@ public class HttpDispatchHandler extends
BaseHttpChannelHandler<HttpRequest> {
return;
}
- sendResponse(context.getContext(), context.isKeepAlive(), result);
+ sendResponse(context.getContext(), context.isKeepAlive(), result,
context);
} catch (IllegalArgumentException e) {
LOGGER.error("Illegal argument exception: {}", e.getMessage(), e);
- sendErrorResponse(context.getContext(),
HttpResponseStatus.BAD_REQUEST, false);
+ FullHttpResponse errorResponse = addErrorResponse(context,
HttpResponseStatus.BAD_REQUEST);
+ sendErrorResponse(context.getContext(), errorResponse, false);
} catch (Exception e) {
LOGGER.error("Exception occurred while processing HTTP request:
{}", e.getMessage(), e);
- sendErrorResponse(context.getContext(),
HttpResponseStatus.INTERNAL_SERVER_ERROR, false);
+ FullHttpResponse errorResponse = addErrorResponse(context,
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ sendErrorResponse(context.getContext(), errorResponse, false);
}
}
- private void sendResponse(ChannelHandlerContext ctx, boolean keepAlive,
Object result)
+ private void sendResponse(ChannelHandlerContext ctx, boolean keepAlive,
Object result, HttpFilterContext<?> context)
throws JsonProcessingException {
FullHttpResponse response;
if (result != null) {
@@ -149,6 +158,7 @@ public class HttpDispatchHandler extends
BaseHttpChannelHandler<HttpRequest> {
response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER));
}
+ context.setResponse(response);
if (!keepAlive) {
ctx.writeAndFlush(response).addListeners(ChannelFutureListener.CLOSE);
} else {
@@ -156,9 +166,14 @@ public class HttpDispatchHandler extends
BaseHttpChannelHandler<HttpRequest> {
}
}
- private void sendErrorResponse(ChannelHandlerContext ctx,
HttpResponseStatus status, boolean keepAlive) {
+ private FullHttpResponse addErrorResponse(HttpFilterContext<?> context,
HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, status,
Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER));
+ context.setResponse(response);
+ return response;
+ }
+
+ private void sendErrorResponse(ChannelHandlerContext ctx, FullHttpResponse
response, boolean keepAlive) {
if (!keepAlive) {
ctx.writeAndFlush(response).addListeners(ChannelFutureListener.CLOSE);
} else {
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/http/filter/HttpFilterContext.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/http/filter/HttpFilterContext.java
index e9eb2d9ab2..030d73ebab 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/http/filter/HttpFilterContext.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/http/filter/HttpFilterContext.java
@@ -27,6 +27,8 @@ public class HttpFilterContext<T> extends HttpContext<T> {
private final Supplier<HttpRequestParamWrapper> paramWrapperSupplier;
private volatile HttpRequestParamWrapper paramWrapper;
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
+ private static final ThreadLocal<HttpFilterContext<?>> CURRENT_CONTEXT =
new ThreadLocal<>();
+ private Object response;
public HttpFilterContext(
T request,
@@ -38,6 +40,26 @@ public class HttpFilterContext<T> extends HttpContext<T> {
this.paramWrapperSupplier = paramWrapperSupplier;
}
+ public static HttpFilterContext<?> getCurrentContext() {
+ return CURRENT_CONTEXT.get();
+ }
+
+ public static void setCurrentContext(HttpFilterContext<?> context) {
+ CURRENT_CONTEXT.set(context);
+ }
+
+ public static void clearCurrentContext() {
+ CURRENT_CONTEXT.remove();
+ }
+
+ public void setResponse(Object response) {
+ this.response = response;
+ }
+
+ public Object getResponse() {
+ return this.response;
+ }
+
public HttpRequestParamWrapper getParamWrapper() {
if (paramWrapper == null) {
synchronized (this) {
diff --git
a/core/src/test/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandlerTest.java
b/core/src/test/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandlerTest.java
index de5ef2f198..2549603558 100644
---
a/core/src/test/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandlerTest.java
+++
b/core/src/test/java/org/apache/seata/core/rpc/netty/http/HttpDispatchHandlerTest.java
@@ -16,33 +16,53 @@
*/
package org.apache.seata.core.rpc.netty.http;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
+import org.apache.seata.common.rpc.http.HttpContext;
import org.apache.seata.core.exception.HttpRequestFilterException;
+import org.apache.seata.core.rpc.netty.http.filter.HttpFilterContext;
+import org.apache.seata.core.rpc.netty.http.filter.HttpRequestFilter;
import org.apache.seata.core.rpc.netty.http.filter.HttpRequestFilterChain;
import org.apache.seata.core.rpc.netty.http.filter.HttpRequestFilterManager;
+import org.apache.seata.core.rpc.netty.http.filter.HttpRequestParamWrapper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
import org.mockito.MockedStatic;
+import org.mockito.MockitoAnnotations;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import static org.assertj.core.api.Fail.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
class HttpDispatchHandlerTest {
@@ -50,6 +70,13 @@ class HttpDispatchHandlerTest {
private EmbeddedChannel channel;
private TestController testController = new TestController();
+ @Mock
+ private ChannelHandlerContext mockCtx;
+
+ private FullHttpRequest testHttpRequest;
+ private ExecutorService testExecutor;
+ private EmbeddedChannel embeddedChannel;
+
class TestController {
public String handleRequest(String param) {
return "Processed: " + param;
@@ -58,6 +85,7 @@ class HttpDispatchHandlerTest {
@BeforeEach
void setUp() throws NoSuchMethodException {
+ MockitoAnnotations.openMocks(this);
handler = new HttpDispatchHandler();
channel = new EmbeddedChannel(handler);
Method method = TestController.class.getMethod("handleRequest",
String.class);
@@ -73,6 +101,16 @@ class HttpDispatchHandlerTest {
invocation.setParamMetaData(paramMetaDatas);
ControllerManager.addHttpInvocation(invocation);
+
+ testHttpRequest = new DefaultFullHttpRequest(
+ HttpVersion.HTTP_1_1,
+ HttpMethod.GET,
+ "/test",
+ Unpooled.copiedBuffer("{\"name\":\"test\"}",
StandardCharsets.UTF_8));
+ embeddedChannel = new EmbeddedChannel();
+ when(mockCtx.channel()).thenReturn(embeddedChannel);
+ when(mockCtx.pipeline()).thenReturn(embeddedChannel.pipeline());
+ testExecutor = Executors.newSingleThreadExecutor();
}
@AfterEach
@@ -81,6 +119,15 @@ class HttpDispatchHandlerTest {
Field field2 =
HttpRequestFilterManager.class.getDeclaredField("initialized");
field2.setAccessible(true);
field2.set(null, false);
+
+ if (testExecutor != null) {
+ testExecutor.shutdown();
+ testExecutor.awaitTermination(1, TimeUnit.SECONDS);
+ }
+ HttpFilterContext.clearCurrentContext();
+ if (embeddedChannel != null) {
+ embeddedChannel.close();
+ }
}
@Test
@@ -304,6 +351,85 @@ class HttpDispatchHandlerTest {
}
}
+ @Test
+ void testHttpFilterContextAvailableInThreadLocalDuringFilterExecution()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ class MockHttpAspect {
+ public void beforeFilter() {
+ HttpFilterContext<?> context =
HttpFilterContext.getCurrentContext();
+ assertNotNull(context, "get request and response");
+
+ HttpRequest request = (HttpRequest) context.getRequest();
+ assertNotNull(request);
+ assertEquals("/test", request.uri());
+ assertEquals(HttpMethod.GET, request.method());
+ assertEquals("test-invocation",
context.getAttribute("testKey"));
+ }
+
+ public void afterFilter() {
+ HttpFilterContext<?> context =
HttpFilterContext.getCurrentContext();
+ assertNotNull(context, "get request and response");
+
+ FullHttpResponse response = (FullHttpResponse)
context.getResponse();
+ assertNotNull(response);
+ assertEquals(HttpResponseStatus.OK, response.status());
+ }
+
+ public void afterFinally() {
+ assertNull(HttpFilterContext.getCurrentContext(), "clean the
request and response");
+ }
+ }
+
+ MockHttpAspect mockAspect = new MockHttpAspect();
+
+ HttpRequestFilter businessFilter = new HttpRequestFilter() {
+ @Override
+ public void doFilter(HttpFilterContext<?> ctx,
HttpRequestFilterChain chain)
+ throws HttpRequestFilterException {
+ mockAspect.beforeFilter();
+
+ chain.doFilter(ctx);
+
+ FullHttpResponse response =
+ new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER);
+ ctx.setResponse(response);
+
+ mockAspect.afterFilter();
+ }
+
+ @Override
+ public boolean shouldApply() {
+ return false;
+ }
+ };
+
+ HttpRequestFilterChain filterChain = new
HttpRequestFilterChain(Arrays.asList(businessFilter), ctx -> {});
+
+ HttpFilterContext<HttpRequest> context = new HttpFilterContext<>(
+ testHttpRequest,
+ mockCtx,
+ true,
+ HttpContext.HTTP_1_1,
+ () -> new HttpRequestParamWrapper(null, null, null, null));
+ context.setAttribute("testKey", "test-invocation");
+
+ testExecutor
+ .submit(() -> {
+ try {
+ HttpFilterContext.setCurrentContext(context);
+ filterChain.doFilter(context);
+ } catch (HttpRequestFilterException e) {
+ fail("Filter failure: " + e.getMessage());
+ } finally {
+ HttpFilterContext.clearCurrentContext();
+ mockAspect.afterFinally();
+ }
+ })
+ .get(1, TimeUnit.SECONDS);
+
+ assertNull(HttpFilterContext.getCurrentContext());
+ }
+
private FullHttpResponse waitForResponse(long timeoutMs) {
long startTime = System.currentTimeMillis();
FullHttpResponse response = null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]