Copilot commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2652265823
##########
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java:
##########
@@ -138,62 +139,84 @@ public void run() {
try (Response response =
HttpClientUtil.doPost("http://127.0.0.1:" + port +
"/metadata/v1/watch", param, header, 30000)) {
if (response != null) {
- Assertions.assertEquals(HttpStatus.SC_OK, response.code());
+ Assertions.assertEquals(200, response.code());
return;
}
}
Assertions.fail();
}
@Test
- @Order(5)
- void watch_withHttp2() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
-
- Map<String, String> headers = new HashMap<>();
- headers.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
-
- Map<String, String> params = new HashMap<>();
- params.put("default-test", "1");
-
- Thread thread = new Thread(() -> {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ @Order(3)
+ void watch_stream() throws Exception {
+ Map<String, String> header = new HashMap<>();
+ header.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+ Map<String, String> param = new HashMap<>();
+ param.put("default-test", "1");
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+ .publishEvent(new ClusterChangeEvent(this,
"default-test", 2, true));
}
- ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
- .publishEvent(new ClusterChangeEvent(this, "default-test",
2, true));
});
thread.start();
- HttpCallback<Response> callback = new HttpCallback<Response>() {
- @Override
- public void onSuccess(Response response) {
- Assertions.assertNotNull(response);
- Assertions.assertEquals(Protocol.H2_PRIOR_KNOWLEDGE,
response.protocol());
- Assertions.assertEquals(HttpStatus.SC_OK, response.code());
- latch.countDown();
- }
-
- @Override
- public void onFailure(Throwable t) {
- Assertions.fail("Should not fail: " + t.getMessage());
+ boolean keepaliveReceived = false;
+ boolean clusterUpdateReceived = false;
+ long startTime = System.currentTimeMillis();
+ long maxWaitTime = 10000; // Maximum wait time: 10 seconds
+
+ try (SeataHttpWatch<ClusterWatchEvent> watch =
HttpClientUtil.watchPost(
+ "http://127.0.0.1:" + port + "/metadata/v1/watch", param,
header, ClusterWatchEvent.class)) {
+ // For HTTP2, connection will remain open, so we need to break
after receiving expected events
+ while (watch.hasNext() && (System.currentTimeMillis() - startTime
< maxWaitTime)) {
+ SeataHttpWatch.Response<ClusterWatchEvent> response =
watch.next();
+ SeataHttpWatch.Response.Type type = response.type;
+
+ // 执行业务逻辑
Review Comment:
Chinese comment detected in test code. Comments should be in English for
consistency with the rest of the codebase and to maintain accessibility for all
contributors.
```suggestion
// Execute business logic
```
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -128,27 +180,64 @@ private void sendWatcherResponse(Watcher<HttpContext>
watcher, HttpResponseStatu
} else {
ctx.writeAndFlush(response);
}
- } else {
- // HTTP/2 response (h2c support)
- // Send headers frame
+ return;
+ }
+
+ // 第一次响应,必须先发送 headers
+ if (sendHeaders) {
Http2Headers headers = new
DefaultHttp2Headers().status(nettyStatus.codeAsText());
- headers.set(HttpHeaderNames.CONTENT_LENGTH, "0");
+ headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream;
charset=utf-8");
+ headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
+
ctx.write(new DefaultHttp2HeadersFrame(headers));
+ }
+
+ String group = watcher.getGroup();
+ String sse = buildSSEFormat(nettyStatus, closeStream, sendHeaders,
group);
+
+ ByteBuf content = Unpooled.copiedBuffer(sse, StandardCharsets.UTF_8);
- // Send empty data frame with endStream=true to close the stream
- ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER,
true))
- .addListener(f -> {
- if (!f.isSuccess()) {
- logger.warn("HTTP2 response send failed,
group={}", group, f.cause());
- }
- });
+ // 发送 DATA 帧(closeStream = true 则结束本次 stream)
+ ctx.write(new DefaultHttp2DataFrame(content, closeStream));
+ ctx.flush();
+ }
+
+ private String buildSSEFormat(
+ HttpResponseStatus nettyStatus, boolean closeStream, boolean
sendHeaders, String group) {
+ // 决定事件类型(放在 JSON 中,而不是 SSE event 字段)
+ String eventType;
+ if (sendHeaders) {
+ // 第一次建立 stream 时发送 keepalive 事件,确认连接建立
+ eventType = "keepalive";
+ } else if (closeStream && nettyStatus ==
HttpResponseStatus.NOT_MODIFIED) {
+ // 超时事件,需要关闭流
+ eventType = "timeout";
+ } else {
+ // 正常集群变更事件
+ eventType = "cluster-update";
}
+
+ // 构造 JSON 格式事件数据(包含 type 字段)
+ String json = String.format(
+
"{\"type\":\"%s\",\"group\":\"%s\",\"term\":%d,\"timestamp\":%d}",
+ eventType, group, GROUP_UPDATE_TERM.getOrDefault(group, 0L),
System.currentTimeMillis());
+ logger.debug("Sending watch event: {}", json);
+
+ // SSE 格式:只发送 data: 字段,事件类型包含在 JSON 中
Review Comment:
Chinese comments detected in production code. Comments should be in English
for consistency with the rest of the codebase and to maintain accessibility for
all contributors.
```suggestion
// Determine the event type (encoded inside the JSON, not via the
SSE "event" field)
String eventType;
if (sendHeaders) {
// On initial stream establishment, send a keepalive event to
confirm the connection
eventType = "keepalive";
} else if (closeStream && nettyStatus ==
HttpResponseStatus.NOT_MODIFIED) {
// Timeout event: the stream should be closed
eventType = "timeout";
} else {
// Normal cluster change event
eventType = "cluster-update";
}
// Build JSON event payload (including the "type" field)
String json = String.format(
"{\"type\":\"%s\",\"group\":\"%s\",\"term\":%d,\"timestamp\":%d}",
eventType, group, GROUP_UPDATE_TERM.getOrDefault(group, 0L),
System.currentTimeMillis());
logger.debug("Sending watch event: {}", json);
// SSE format: send only the data: field; the event type is carried
inside the JSON
```
##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +188,77 @@ private static String buildUrlWithParams(String url,
Map<String, String> params)
return urlBuilder.toString();
}
- private static void executeAsync(OkHttpClient client, Request request,
final HttpCallback<Response> callback) {
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onResponse(Call call, Response response) {
- try {
- callback.onSuccess(response);
- } finally {
- response.close();
- }
- }
+ private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
+ return new OkHttpClient.Builder()
+
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+ .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS) //
连接阶段快速失败
+ .readTimeout(0, TimeUnit.SECONDS) // 等待TC推送数据(建立连接后持续监听服务器推送)
+ .writeTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ .build();
+ }
- @Override
- public void onFailure(Call call, IOException e) {
- if (call.isCanceled()) {
- callback.onCancelled();
- } else {
- callback.onFailure(e);
- }
- }
- });
+ public static <T> SeataHttpWatch<T> watch(String url, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
+ return watch(url, headers, null, "GET", eventType);
+ }
+
+ public static <T> SeataHttpWatch<T> watch(String url, Class<T> eventType)
throws IOException {
+ return watch(url, null, null, "GET", eventType);
+ }
+
+ /**
+ * Execute a watch request with specified HTTP method and return a Watch
iterator.
+ * This method creates a long-lived HTTP/2 connection to receive
Server-Sent Events (SSE).
+ */
+ private static <T> SeataHttpWatch<T> watch(
+ String url, Map<String, String> headers, RequestBody requestBody,
String method, Class<T> eventType)
+ throws IOException {
+
+ OkHttpClient client = createHttp2WatchClient(30);
Review Comment:
Potential resource leak. The HTTP/2 client created by createHttp2WatchClient
is not cached or reused, which could lead to resource exhaustion under high
load. Consider implementing a client pool or caching mechanism similar to
HTTP_CLIENT_MAP used for HTTP/1 clients.
##########
common/src/test/java/org/apache/seata/common/util/SeataHttpWatchTest.java:
##########
@@ -0,0 +1,790 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.util;
+
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import okio.Buffer;
+import okio.BufferedSource;
+import org.apache.seata.common.exception.FrameworkException;
+import org.apache.seata.common.metadata.ClusterWatchEvent;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SeataHttpWatchTest {
+
+ private OkHttpClient mockClient;
+ private Call mockCall;
+ private Response mockResponse;
+ private ResponseBody mockResponseBody;
+ private BufferedSource mockSource;
+
+ @BeforeEach
+ public void setUp() {
+ mockClient = mock(OkHttpClient.class);
+ mockCall = mock(Call.class);
+ mockResponse = mock(Response.class);
+ mockResponseBody = mock(ResponseBody.class);
+ mockSource = mock(BufferedSource.class);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ // Clean up if needed
+ }
+
+ @Test
+ public void testCreateWatch_WithSuccessfulResponse() throws IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(mockSource);
+
+ // Execute
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Verify
+ assertNotNull(watch);
+ verify(mockClient).newCall(request);
+ verify(mockCall).execute();
+ }
+
+ @Test
+ public void testCreateWatch_WithUnsuccessfulResponse() throws IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(false);
+ when(mockResponse.code()).thenReturn(404);
+ when(mockResponse.message()).thenReturn("Not Found");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.string()).thenReturn("Error message");
+
+ // Execute & Verify
+ FrameworkException exception = assertThrows(FrameworkException.class,
() -> {
+ SeataHttpWatch.createWatch(mockClient, request,
ClusterWatchEvent.class);
+ });
+
+ assertTrue(exception.getMessage().contains("404"));
+ assertTrue(exception.getMessage().contains("Error message"));
+ }
+
+ @Test
+ public void
testCreateWatch_WithUnsuccessfulResponse_IOExceptionOnBodyRead() throws
IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(false);
+ when(mockResponse.code()).thenReturn(500);
+ when(mockResponse.message()).thenReturn("Internal Server Error");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.string()).thenThrow(new IOException("Read
error"));
+
+ // Execute & Verify
+ FrameworkException exception = assertThrows(FrameworkException.class,
() -> {
+ SeataHttpWatch.createWatch(mockClient, request,
ClusterWatchEvent.class);
+ });
+
+ assertTrue(exception.getMessage().contains("Watch request failed"));
+ }
+
+ @Test
+ public void testCreateWatch_WithNullContentType() throws IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+ when(mockResponse.header("Content-Type")).thenReturn(null);
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(mockSource);
+
+ // Execute
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Verify - should still create watch but log warning
+ assertNotNull(watch);
+ }
+
+ @Test
+ public void testHasNext_WithAvailableData() throws IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(mockSource);
+ when(mockSource.exhausted()).thenReturn(false);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ boolean hasNext = watch.hasNext();
+
+ // Verify
+ assertTrue(hasNext);
+ verify(mockSource).exhausted();
+ }
+
+ @Test
+ public void testHasNext_WithNoMoreData() throws IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(mockSource);
+ when(mockSource.exhausted()).thenReturn(true);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ boolean hasNext = watch.hasNext();
+
+ // Verify
+ assertFalse(hasNext);
+ }
+
+ @Test
+ public void testHasNext_WithIOException() throws IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(mockSource);
+ when(mockSource.exhausted()).thenThrow(new IOException("Stream
error"));
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ boolean hasNext = watch.hasNext();
+
+ // Verify - should return false on IOException
+ assertFalse(hasNext);
+ }
+
+ @Test
+ public void testNext_WithKeepaliveEvent() throws IOException {
+ // Setup
+ String sseData = "data:
{\"type\":\"keepalive\",\"group\":\"default-test\",\"timestamp\":1234567890}\n\n";
+ Buffer buffer = new Buffer();
+ buffer.writeString(sseData, StandardCharsets.UTF_8);
+
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(buffer);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ SeataHttpWatch.Response<ClusterWatchEvent> response = watch.next();
+
+ // Verify
+ assertNotNull(response);
+ assertEquals(SeataHttpWatch.Response.Type.KEEPALIVE, response.type);
+ assertNotNull(response.object);
+ assertEquals("keepalive", response.object.getType());
+ assertEquals("default-test", response.object.getGroup());
+ assertEquals(1234567890L, response.object.getTimestamp());
+ }
+
+ @Test
+ public void testNext_WithClusterUpdateEvent() throws IOException {
+ // Setup
+ String sseData = "data:
{\"type\":\"cluster-update\",\"group\":\"default-test\",\"term\":2,\"timestamp\":1234567890}\n\n";
+ Buffer buffer = new Buffer();
+ buffer.writeString(sseData, StandardCharsets.UTF_8);
+
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(buffer);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ SeataHttpWatch.Response<ClusterWatchEvent> response = watch.next();
+
+ // Verify
+ assertNotNull(response);
+ assertEquals(SeataHttpWatch.Response.Type.CLUSTER_UPDATE,
response.type);
+ assertNotNull(response.object);
+ assertEquals("cluster-update", response.object.getType());
+ assertEquals("default-test", response.object.getGroup());
+ assertEquals(2L, response.object.getTerm());
+ assertEquals(1234567890L, response.object.getTimestamp());
+ }
+
+ @Test
+ public void testNext_WithTimeoutEvent() throws IOException {
+ // Setup
+ String sseData = "data:
{\"type\":\"timeout\",\"group\":\"default-test\",\"timestamp\":1234567890}\n\n";
+ Buffer buffer = new Buffer();
+ buffer.writeString(sseData, StandardCharsets.UTF_8);
+
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(buffer);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ SeataHttpWatch.Response<ClusterWatchEvent> response = watch.next();
+
+ // Verify
+ assertNotNull(response);
+ assertEquals(SeataHttpWatch.Response.Type.TIMEOUT, response.type);
+ assertNotNull(response.object);
+ assertEquals("timeout", response.object.getType());
+ }
+
+ @Test
+ public void testNext_WithUnknownEventType() throws IOException {
+ // Setup
+ String sseData = "data:
{\"type\":\"unknown-type\",\"group\":\"default-test\",\"timestamp\":1234567890}\n\n";
+ Buffer buffer = new Buffer();
+ buffer.writeString(sseData, StandardCharsets.UTF_8);
+
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(buffer);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ SeataHttpWatch.Response<ClusterWatchEvent> response = watch.next();
+
+ // Verify - should default to CLUSTER_UPDATE for unknown types
+ assertNotNull(response);
+ assertEquals(SeataHttpWatch.Response.Type.CLUSTER_UPDATE,
response.type);
+ }
+
+ @Test
+ public void testNext_WithNullEventType() throws IOException {
+ // Setup
+ String sseData = "data:
{\"group\":\"default-test\",\"timestamp\":1234567890}\n\n";
+ Buffer buffer = new Buffer();
+ buffer.writeString(sseData, StandardCharsets.UTF_8);
+
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(buffer);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ SeataHttpWatch.Response<ClusterWatchEvent> response = watch.next();
+
+ // Verify - should default to CLUSTER_UPDATE when type is null
+ assertNotNull(response);
+ assertEquals(SeataHttpWatch.Response.Type.CLUSTER_UPDATE,
response.type);
+ }
+
+ @Test
+ public void testNext_WithInvalidJson() throws IOException {
+ // Setup
+ String sseData = "data: {invalid json}\n\n";
+ Buffer buffer = new Buffer();
+ buffer.writeString(sseData, StandardCharsets.UTF_8);
+
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(buffer);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ SeataHttpWatch.Response<ClusterWatchEvent> response = watch.next();
+
+ // Verify - should return ERROR type with null object
+ assertNotNull(response);
+ assertEquals(SeataHttpWatch.Response.Type.ERROR, response.type);
+ assertNull(response.object);
+ }
+
+ // TC发送SSE的固定格式\n\n 一次事件只发送一行,客户端每次读一行
+ @Test
+ public void testNext_WithMultipleEvents() throws IOException {
+ // 模拟两个事件
Review Comment:
Chinese comment detected in test code. Comments should be in English for
consistency with the rest of the codebase and to maintain accessibility for all
contributors.
```suggestion
// TC sends SSE in a fixed format "\n\n": each event contains a single
line and the client reads one line at a time
@Test
public void testNext_WithMultipleEvents() throws IOException {
// Simulate two events
```
##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +188,77 @@ private static String buildUrlWithParams(String url,
Map<String, String> params)
return urlBuilder.toString();
}
- private static void executeAsync(OkHttpClient client, Request request,
final HttpCallback<Response> callback) {
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onResponse(Call call, Response response) {
- try {
- callback.onSuccess(response);
- } finally {
- response.close();
- }
- }
+ private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
+ return new OkHttpClient.Builder()
+
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+ .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS) //
连接阶段快速失败
+ .readTimeout(0, TimeUnit.SECONDS) // 等待TC推送数据(建立连接后持续监听服务器推送)
+ .writeTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ .build();
+ }
- @Override
- public void onFailure(Call call, IOException e) {
- if (call.isCanceled()) {
- callback.onCancelled();
- } else {
- callback.onFailure(e);
- }
- }
- });
+ public static <T> SeataHttpWatch<T> watch(String url, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
+ return watch(url, headers, null, "GET", eventType);
+ }
+
+ public static <T> SeataHttpWatch<T> watch(String url, Class<T> eventType)
throws IOException {
+ return watch(url, null, null, "GET", eventType);
+ }
+
+ /**
+ * Execute a watch request with specified HTTP method and return a Watch
iterator.
+ * This method creates a long-lived HTTP/2 connection to receive
Server-Sent Events (SSE).
+ */
+ private static <T> SeataHttpWatch<T> watch(
+ String url, Map<String, String> headers, RequestBody requestBody,
String method, Class<T> eventType)
+ throws IOException {
+
+ OkHttpClient client = createHttp2WatchClient(30);
Review Comment:
Magic number detected. The connection timeout value of 30 seconds should be
extracted to a named constant with clear documentation explaining why this
specific timeout is appropriate for watch connections.
##########
common/src/main/java/org/apache/seata/common/util/SeataHttpWatch.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.util;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.ResponseBody;
+import okio.BufferedSource;
+import org.apache.seata.common.exception.FrameworkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Seata HTTP/2 Watch implementation.
+ * Consumes server-pushed SSE data frames via an iterator-style API.
+ *
+ * @param <T> event data type
+ */
+public class SeataHttpWatch<T>
+ implements Iterator<SeataHttpWatch.Response<T>>,
Iterable<SeataHttpWatch.Response<T>>, AutoCloseable {
Review Comment:
This Iterable is its own Iterator, but does not guard against multiple
iterations.
```suggestion
implements Iterator<SeataHttpWatch.Response<T>>, AutoCloseable {
```
##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +188,77 @@ private static String buildUrlWithParams(String url,
Map<String, String> params)
return urlBuilder.toString();
}
- private static void executeAsync(OkHttpClient client, Request request,
final HttpCallback<Response> callback) {
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onResponse(Call call, Response response) {
- try {
- callback.onSuccess(response);
- } finally {
- response.close();
- }
- }
+ private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
+ return new OkHttpClient.Builder()
+
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+ .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS) //
连接阶段快速失败
+ .readTimeout(0, TimeUnit.SECONDS) // 等待TC推送数据(建立连接后持续监听服务器推送)
+ .writeTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ .build();
+ }
- @Override
- public void onFailure(Call call, IOException e) {
- if (call.isCanceled()) {
- callback.onCancelled();
- } else {
- callback.onFailure(e);
- }
- }
- });
+ public static <T> SeataHttpWatch<T> watch(String url, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
+ return watch(url, headers, null, "GET", eventType);
+ }
+
+ public static <T> SeataHttpWatch<T> watch(String url, Class<T> eventType)
throws IOException {
+ return watch(url, null, null, "GET", eventType);
+ }
+
+ /**
+ * Execute a watch request with specified HTTP method and return a Watch
iterator.
+ * This method creates a long-lived HTTP/2 connection to receive
Server-Sent Events (SSE).
+ */
+ private static <T> SeataHttpWatch<T> watch(
+ String url, Map<String, String> headers, RequestBody requestBody,
String method, Class<T> eventType)
+ throws IOException {
+
+ OkHttpClient client = createHttp2WatchClient(30);
+ Request request = buildHttp2WatchRequest(url, headers, requestBody,
method);
+ return SeataHttpWatch.createWatch(client, request, eventType);
+ }
+
+ public static <T> SeataHttpWatch<T> watchPost(
+ String url, Map<String, String> params, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
+ try {
+ String contentType = headers != null ? headers.get("Content-Type")
: "";
+ RequestBody requestBody = createRequestBody(params, contentType);
+ return watch(url, headers, requestBody, "POST", eventType);
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Failed to create request body: {}", e.getMessage(),
e);
Review Comment:
Inconsistent error handling. The method wraps JsonProcessingException in
IOException with a descriptive message, but doesn't preserve the cause properly
for the logging statement. The logging on line 230 logs the message, then the
code throws with the same message. Consider either logging with the full
exception context or removing the redundant log statement.
```suggestion
LOGGER.error("Failed to create request body", e);
```
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -97,28 +115,62 @@ public void onChangeEvent(ClusterChangeEvent event) {
}
private void notifyWatcher(Watcher<HttpContext> watcher) {
- watcher.setDone(true);
- sendWatcherResponse(watcher, HttpResponseStatus.OK);
- }
+ HttpContext context = watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof HttpContext && context.isHttp2();
+
+ if (!isHttp2) {
+ watcher.setDone(true);
+ }
+
+ boolean isFirstResponse = !HTTP2_HEADERS_SENT.getOrDefault(watcher,
false);
+ sendWatcherResponse(watcher, HttpResponseStatus.OK, false,
isFirstResponse);
+ if (isFirstResponse && isHttp2) {
+ HTTP2_HEADERS_SENT.put(watcher, true);
+ }
- private void sendWatcherResponse(Watcher<HttpContext> watcher,
HttpResponseStatus nettyStatus) {
+ // Update watcher's term to the latest term to prevent infinite loop
+ // This ensures that when registryWatcher is called, it won't trigger
notifyWatcher again
String group = watcher.getGroup();
+ Long latestTerm = GROUP_UPDATE_TERM.get(group);
+ if (latestTerm != null && latestTerm > watcher.getTerm()) {
+ watcher.setTerm(latestTerm);
+ }
+
+ // For HTTP/2, re-register the watcher to continue listening for
future updates
+ if (isHttp2 && !watcher.isDone()) {
+ registryWatcher(watcher);
+ }
+ }
+ /**
+ * Send watcher response to the client.
+ *
+ * @param watcher the watcher instance
+ * @param nettyStatus the HTTP status code
+ * @param closeStream whether to close the HTTP/2 stream (endStream=true)
+ * @param sendHeaders whether to send HTTP/2 headers frame (only needed
for first response)
+ */
+ private void sendWatcherResponse(
+ Watcher<HttpContext> watcher, HttpResponseStatus nettyStatus,
boolean closeStream, boolean sendHeaders) {
+
HttpContext context = watcher.getAsyncContext();
if (!(context instanceof HttpContext)) {
logger.warn(
"Unsupported context type for watcher on group {}: {}",
- group,
+ watcher.getGroup(),
context != null ? context.getClass().getName() : "null");
return;
}
ChannelHandlerContext ctx = context.getContext();
+
if (!ctx.channel().isActive()) {
- logger.warn("Netty channel is not active for watcher on group {},
cannot send response.", group);
+ HTTP2_HEADERS_SENT.remove(watcher);
+ logger.warn(
+ "Netty channel is not active for watcher on group {},
cannot send response.", watcher.getGroup());
return;
}
+ // HTTP/1 长轮询保持原逻辑
Review Comment:
Chinese comments detected in production code. Comments should be in English
for consistency with the rest of the codebase and to maintain accessibility for
all contributors.
##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +188,77 @@ private static String buildUrlWithParams(String url,
Map<String, String> params)
return urlBuilder.toString();
}
- private static void executeAsync(OkHttpClient client, Request request,
final HttpCallback<Response> callback) {
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onResponse(Call call, Response response) {
- try {
- callback.onSuccess(response);
- } finally {
- response.close();
- }
- }
+ private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
+ return new OkHttpClient.Builder()
+
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+ .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS) //
连接阶段快速失败
+ .readTimeout(0, TimeUnit.SECONDS) // 等待TC推送数据(建立连接后持续监听服务器推送)
+ .writeTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ .build();
+ }
- @Override
- public void onFailure(Call call, IOException e) {
- if (call.isCanceled()) {
- callback.onCancelled();
- } else {
- callback.onFailure(e);
- }
- }
- });
+ public static <T> SeataHttpWatch<T> watch(String url, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
+ return watch(url, headers, null, "GET", eventType);
+ }
Review Comment:
Missing input validation. The method should validate that the 'url'
parameter is not null or empty before proceeding with the request. This would
provide clearer error messages when called incorrectly.
##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +188,77 @@ private static String buildUrlWithParams(String url,
Map<String, String> params)
return urlBuilder.toString();
}
- private static void executeAsync(OkHttpClient client, Request request,
final HttpCallback<Response> callback) {
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onResponse(Call call, Response response) {
- try {
- callback.onSuccess(response);
- } finally {
- response.close();
- }
- }
+ private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
+ return new OkHttpClient.Builder()
+
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+ .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS) //
连接阶段快速失败
+ .readTimeout(0, TimeUnit.SECONDS) // 等待TC推送数据(建立连接后持续监听服务器推送)
+ .writeTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ .build();
+ }
- @Override
- public void onFailure(Call call, IOException e) {
- if (call.isCanceled()) {
- callback.onCancelled();
- } else {
- callback.onFailure(e);
- }
- }
- });
+ public static <T> SeataHttpWatch<T> watch(String url, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
+ return watch(url, headers, null, "GET", eventType);
+ }
+
+ public static <T> SeataHttpWatch<T> watch(String url, Class<T> eventType)
throws IOException {
+ return watch(url, null, null, "GET", eventType);
+ }
+
+ /**
+ * Execute a watch request with specified HTTP method and return a Watch
iterator.
+ * This method creates a long-lived HTTP/2 connection to receive
Server-Sent Events (SSE).
+ */
+ private static <T> SeataHttpWatch<T> watch(
+ String url, Map<String, String> headers, RequestBody requestBody,
String method, Class<T> eventType)
+ throws IOException {
Review Comment:
Incomplete documentation. The method comment states "Execute a watch request
with specified HTTP method and return a Watch iterator" but doesn't document
the parameters or return value. Consider adding complete JavaDoc with @param
tags for all parameters and @return tag describing the returned SeataHttpWatch
instance, along with @throws IOException explaining when this exception is
thrown.
##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +188,77 @@ private static String buildUrlWithParams(String url,
Map<String, String> params)
return urlBuilder.toString();
}
- private static void executeAsync(OkHttpClient client, Request request,
final HttpCallback<Response> callback) {
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onResponse(Call call, Response response) {
- try {
- callback.onSuccess(response);
- } finally {
- response.close();
- }
- }
+ private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
+ return new OkHttpClient.Builder()
+
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+ .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS) //
连接阶段快速失败
+ .readTimeout(0, TimeUnit.SECONDS) // 等待TC推送数据(建立连接后持续监听服务器推送)
Review Comment:
Chinese comment detected in production code. Comments should be in English
for consistency with the rest of the codebase and to maintain accessibility for
all contributors.
```suggestion
.connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS) //
Fail fast during connection phase
.readTimeout(0, TimeUnit.SECONDS) // Wait indefinitely for
TC push data (keep listening for server push after connection is established)
```
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -128,27 +180,64 @@ private void sendWatcherResponse(Watcher<HttpContext>
watcher, HttpResponseStatu
} else {
ctx.writeAndFlush(response);
}
- } else {
- // HTTP/2 response (h2c support)
- // Send headers frame
+ return;
+ }
+
+ // 第一次响应,必须先发送 headers
+ if (sendHeaders) {
Http2Headers headers = new
DefaultHttp2Headers().status(nettyStatus.codeAsText());
- headers.set(HttpHeaderNames.CONTENT_LENGTH, "0");
+ headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream;
charset=utf-8");
+ headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
+
ctx.write(new DefaultHttp2HeadersFrame(headers));
+ }
+
+ String group = watcher.getGroup();
+ String sse = buildSSEFormat(nettyStatus, closeStream, sendHeaders,
group);
+
+ ByteBuf content = Unpooled.copiedBuffer(sse, StandardCharsets.UTF_8);
- // Send empty data frame with endStream=true to close the stream
- ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER,
true))
- .addListener(f -> {
- if (!f.isSuccess()) {
- logger.warn("HTTP2 response send failed,
group={}", group, f.cause());
- }
- });
+ // 发送 DATA 帧(closeStream = true 则结束本次 stream)
+ ctx.write(new DefaultHttp2DataFrame(content, closeStream));
+ ctx.flush();
+ }
+
+ private String buildSSEFormat(
+ HttpResponseStatus nettyStatus, boolean closeStream, boolean
sendHeaders, String group) {
+ // 决定事件类型(放在 JSON 中,而不是 SSE event 字段)
+ String eventType;
+ if (sendHeaders) {
+ // 第一次建立 stream 时发送 keepalive 事件,确认连接建立
+ eventType = "keepalive";
+ } else if (closeStream && nettyStatus ==
HttpResponseStatus.NOT_MODIFIED) {
+ // 超时事件,需要关闭流
+ eventType = "timeout";
+ } else {
+ // 正常集群变更事件
+ eventType = "cluster-update";
}
+
+ // 构造 JSON 格式事件数据(包含 type 字段)
+ String json = String.format(
+
"{\"type\":\"%s\",\"group\":\"%s\",\"term\":%d,\"timestamp\":%d}",
+ eventType, group, GROUP_UPDATE_TERM.getOrDefault(group, 0L),
System.currentTimeMillis());
+ logger.debug("Sending watch event: {}", json);
+
+ // SSE 格式:只发送 data: 字段,事件类型包含在 JSON 中
Review Comment:
Chinese comments detected in production code. Comments should be in English
for consistency with the rest of the codebase and to maintain accessibility for
all contributors.
```suggestion
// Send DATA frame (closeStream = true will end this stream)
ctx.write(new DefaultHttp2DataFrame(content, closeStream));
ctx.flush();
}
private String buildSSEFormat(
HttpResponseStatus nettyStatus, boolean closeStream, boolean
sendHeaders, String group) {
// Determine event type (embedded in JSON instead of the SSE event
field)
String eventType;
if (sendHeaders) {
// When the stream is first established, send a keepalive event
to confirm the connection
eventType = "keepalive";
} else if (closeStream && nettyStatus ==
HttpResponseStatus.NOT_MODIFIED) {
// Timeout event; the stream should be closed
eventType = "timeout";
} else {
// Normal cluster change event
eventType = "cluster-update";
}
// Build JSON-formatted event data (including the type field)
String json = String.format(
"{\"type\":\"%s\",\"group\":\"%s\",\"term\":%d,\"timestamp\":%d}",
eventType, group, GROUP_UPDATE_TERM.getOrDefault(group, 0L),
System.currentTimeMillis());
logger.debug("Sending watch event: {}", json);
// SSE format: only send the data: field, with the event type
contained in the JSON
```
##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +188,77 @@ private static String buildUrlWithParams(String url,
Map<String, String> params)
return urlBuilder.toString();
}
- private static void executeAsync(OkHttpClient client, Request request,
final HttpCallback<Response> callback) {
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onResponse(Call call, Response response) {
- try {
- callback.onSuccess(response);
- } finally {
- response.close();
- }
- }
+ private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
+ return new OkHttpClient.Builder()
+
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+ .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS) //
连接阶段快速失败
+ .readTimeout(0, TimeUnit.SECONDS) // 等待TC推送数据(建立连接后持续监听服务器推送)
+ .writeTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ .build();
+ }
- @Override
- public void onFailure(Call call, IOException e) {
- if (call.isCanceled()) {
- callback.onCancelled();
- } else {
- callback.onFailure(e);
- }
- }
- });
+ public static <T> SeataHttpWatch<T> watch(String url, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
+ return watch(url, headers, null, "GET", eventType);
+ }
+
+ public static <T> SeataHttpWatch<T> watch(String url, Class<T> eventType)
throws IOException {
+ return watch(url, null, null, "GET", eventType);
+ }
Review Comment:
Missing input validation. The method should validate that the 'url'
parameter is not null or empty before proceeding with the request. This would
provide clearer error messages when called incorrectly.
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -68,13 +72,27 @@ public void init() {
for (String group : WATCHERS.keySet()) {
Optional.ofNullable(WATCHERS.remove(group))
.ifPresent(watchers ->
watchers.parallelStream().forEach(watcher -> {
- if (System.currentTimeMillis() >=
watcher.getTimeout()) {
- watcher.setDone(true);
- sendWatcherResponse(watcher,
HttpResponseStatus.NOT_MODIFIED);
- }
- if (!watcher.isDone()) {
- // Re-register
- registryWatcher(watcher);
+ HttpContext context =
watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof
HttpContext && context.isHttp2();
+ // 对于 HTTP2,不做超时处理,保持长连接
+ if (isHttp2) {
+ // 只检查连接是否还活跃
+ if
(!context.getContext().channel().isActive()) {
+ // 连接已断开,清理资源
+ watcher.setDone(true);
+ HTTP2_HEADERS_SENT.remove(watcher);
+ } else {
+ // 连接活跃,重新注册继续监听
+ registryWatcher(watcher);
+ }
+ } else {
+ // HTTP1 保持原有超时逻辑
Review Comment:
Chinese comments detected in production code. Comments should be in English
for consistency with the rest of the codebase and to maintain accessibility for
all contributors.
```suggestion
// For HTTP/2, do not perform timeout
handling; keep the long-lived connection
if (isHttp2) {
// Only check whether the connection
is still active
if
(!context.getContext().channel().isActive()) {
// Connection is closed; clean
up resources
watcher.setDone(true);
HTTP2_HEADERS_SENT.remove(watcher);
} else {
// Connection is active;
re-register the watcher to continue listening
registryWatcher(watcher);
}
} else {
// For HTTP/1, keep the original
timeout logic
```
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -97,28 +115,62 @@ public void onChangeEvent(ClusterChangeEvent event) {
}
private void notifyWatcher(Watcher<HttpContext> watcher) {
- watcher.setDone(true);
- sendWatcherResponse(watcher, HttpResponseStatus.OK);
- }
+ HttpContext context = watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof HttpContext && context.isHttp2();
+
+ if (!isHttp2) {
+ watcher.setDone(true);
+ }
+
+ boolean isFirstResponse = !HTTP2_HEADERS_SENT.getOrDefault(watcher,
false);
+ sendWatcherResponse(watcher, HttpResponseStatus.OK, false,
isFirstResponse);
+ if (isFirstResponse && isHttp2) {
+ HTTP2_HEADERS_SENT.put(watcher, true);
+ }
- private void sendWatcherResponse(Watcher<HttpContext> watcher,
HttpResponseStatus nettyStatus) {
+ // Update watcher's term to the latest term to prevent infinite loop
+ // This ensures that when registryWatcher is called, it won't trigger
notifyWatcher again
String group = watcher.getGroup();
+ Long latestTerm = GROUP_UPDATE_TERM.get(group);
+ if (latestTerm != null && latestTerm > watcher.getTerm()) {
+ watcher.setTerm(latestTerm);
+ }
+
+ // For HTTP/2, re-register the watcher to continue listening for
future updates
+ if (isHttp2 && !watcher.isDone()) {
+ registryWatcher(watcher);
+ }
+ }
+ /**
+ * Send watcher response to the client.
+ *
+ * @param watcher the watcher instance
+ * @param nettyStatus the HTTP status code
+ * @param closeStream whether to close the HTTP/2 stream (endStream=true)
+ * @param sendHeaders whether to send HTTP/2 headers frame (only needed
for first response)
Review Comment:
Incomplete JavaDoc. The method documentation is missing @param tags for all
four parameters (watcher, nettyStatus, closeStream, sendHeaders) and lacks
explanation of when each parameter should be used. This makes the API harder to
understand and maintain.
```suggestion
* Send a watcher response back to the client over HTTP/1.1 long polling
or HTTP/2 streaming.
* <p>
* For HTTP/1.1, this method writes a normal {@link HttpResponse} and
relies on
* {@link HttpContext#isKeepAlive()} to decide whether to close the TCP
connection.
* For HTTP/2, it optionally sends a {@code HEADERS} frame for the first
response on the
* stream and always sends a {@code DATA} frame containing the
SSE-formatted payload.
*
* @param watcher the watcher instance whose {@link HttpContext} is
used to obtain the
* underlying {@link ChannelHandlerContext} and
identify the logical
* watcher (group, term) being notified
* @param nettyStatus the HTTP status code to send to the client; for
HTTP/1.1 this is used
* as the status of the {@link HttpResponse}, and for
HTTP/2 it is
* encoded into the {@code :status} header and the
SSE event data
* @param closeStream for HTTP/2, {@code true} to end the current HTTP/2
stream with this
* {@code DATA} frame (i.e. {@code endStream=true});
ignored for HTTP/1.1
* where connection lifetime is controlled by {@link
HttpContext#isKeepAlive()}
* @param sendHeaders for HTTP/2, {@code true} if a {@code HEADERS}
frame (including
* {@code :status}, {@code Content-Type}, and
cache-control headers)
* should be sent before the {@code DATA} frame; this
should typically be
* {@code true} for the first response on a stream
and {@code false} for
* subsequent incremental updates. It is ignored for
HTTP/1.1.
```
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -57,6 +59,8 @@ public class ClusterWatcherManager implements
ClusterChangeListener {
private static final Map<String, Long> GROUP_UPDATE_TERM = new
ConcurrentHashMap<>();
+ private static final Map<Watcher<HttpContext>, Boolean> HTTP2_HEADERS_SENT
= new ConcurrentHashMap<>();
Review Comment:
Potential memory leak. The HTTP2_HEADERS_SENT map is populated but only
cleared when channels become inactive. For long-running servers with many watch
connections, this could lead to unbounded memory growth. Consider implementing
a cleanup mechanism or using weak references.
##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +188,77 @@ private static String buildUrlWithParams(String url,
Map<String, String> params)
return urlBuilder.toString();
}
- private static void executeAsync(OkHttpClient client, Request request,
final HttpCallback<Response> callback) {
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onResponse(Call call, Response response) {
- try {
- callback.onSuccess(response);
- } finally {
- response.close();
- }
- }
+ private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
+ return new OkHttpClient.Builder()
+
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+ .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS) //
连接阶段快速失败
+ .readTimeout(0, TimeUnit.SECONDS) // 等待TC推送数据(建立连接后持续监听服务器推送)
+ .writeTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ .build();
+ }
- @Override
- public void onFailure(Call call, IOException e) {
- if (call.isCanceled()) {
- callback.onCancelled();
- } else {
- callback.onFailure(e);
- }
- }
- });
+ public static <T> SeataHttpWatch<T> watch(String url, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
+ return watch(url, headers, null, "GET", eventType);
+ }
+
+ public static <T> SeataHttpWatch<T> watch(String url, Class<T> eventType)
throws IOException {
+ return watch(url, null, null, "GET", eventType);
+ }
+
+ /**
+ * Execute a watch request with specified HTTP method and return a Watch
iterator.
+ * This method creates a long-lived HTTP/2 connection to receive
Server-Sent Events (SSE).
+ */
+ private static <T> SeataHttpWatch<T> watch(
+ String url, Map<String, String> headers, RequestBody requestBody,
String method, Class<T> eventType)
+ throws IOException {
+
+ OkHttpClient client = createHttp2WatchClient(30);
+ Request request = buildHttp2WatchRequest(url, headers, requestBody,
method);
+ return SeataHttpWatch.createWatch(client, request, eventType);
+ }
+
+ public static <T> SeataHttpWatch<T> watchPost(
+ String url, Map<String, String> params, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
+ try {
+ String contentType = headers != null ? headers.get("Content-Type")
: "";
+ RequestBody requestBody = createRequestBody(params, contentType);
+ return watch(url, headers, requestBody, "POST", eventType);
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Failed to create request body: {}", e.getMessage(),
e);
+ throw new IOException("Failed to create request body", e);
+ }
+ }
+
+ public static <T> SeataHttpWatch<T> watchPost(String url, Map<String,
String> params, Class<T> eventType)
+ throws IOException {
+ return watchPost(url, params, null, eventType);
Review Comment:
Missing input validation. The method should validate that the 'url'
parameter is not null or empty before proceeding with the request. This would
provide clearer error messages when called incorrectly.
##########
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java:
##########
@@ -82,37 +77,43 @@ void watchTimeoutTest() throws Exception {
@Test
@Order(2)
void watchTimeoutTest_withHttp2() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
-
Map<String, String> headers = new HashMap<>();
- headers.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+ headers.put("Content-Type",
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
Map<String, String> params = new HashMap<>();
params.put("default-test", "1");
- HttpCallback<Response> callback = new HttpCallback<Response>() {
- @Override
- public void onSuccess(Response response) {
- Assertions.assertNotNull(response);
- Assertions.assertEquals(Protocol.H2_PRIOR_KNOWLEDGE,
response.protocol());
- Assertions.assertEquals(HttpStatus.SC_NOT_MODIFIED,
response.code());
- latch.countDown();
- }
-
- @Override
- public void onFailure(Throwable t) {
- Assertions.fail("Should not fail");
- }
-
- @Override
- public void onCancelled() {
- Assertions.fail("Should not be cancelled");
- }
- };
-
- HttpClientUtil.doPostWithHttp2(
- "http://127.0.0.1:" + port +
"/metadata/v1/watch?timeout=3000", params, headers, callback);
- Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ // For HTTP2, the connection should remain open and not timeout
+ // The test verifies that the connection stays alive beyond the
timeout period
+ try (SeataHttpWatch<ClusterWatchEvent> watch =
HttpClientUtil.watchPost(
+ "http://127.0.0.1:" + port + "/metadata/v1/watch?timeout=3000",
+ params,
+ headers,
+ ClusterWatchEvent.class)) {
+
+ boolean keepaliveReceived = false;
+ long startTime = System.currentTimeMillis();
+ long testDuration = 5000; // Test for 5 seconds to verify
connection stays alive
+ long endTime = startTime + testDuration;
Review Comment:
Several unused variables declared: 'keepaliveReceived' (line 94),
'startTime' (line 95), 'testDuration' (line 96), and 'endTime' (line 97). These
appear to be remnants from previous test logic and should be removed to improve
code clarity.
```suggestion
```
##########
common/src/test/java/org/apache/seata/common/util/HttpClientUtilTest.java:
##########
@@ -1122,4 +604,355 @@ void testHttpSendRes() throws IOException {
assertNotNull(postResponse2);
assertNotNull(nonjsonResponse);
}
+
+ // ========== Watch 方法测试 ==========
Review Comment:
Chinese comment detected in production code. Comments should be in English
for consistency with the rest of the codebase and to maintain accessibility for
all contributors.
```suggestion
// ========== Watch method tests ==========
```
##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +188,77 @@ private static String buildUrlWithParams(String url,
Map<String, String> params)
return urlBuilder.toString();
}
- private static void executeAsync(OkHttpClient client, Request request,
final HttpCallback<Response> callback) {
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onResponse(Call call, Response response) {
- try {
- callback.onSuccess(response);
- } finally {
- response.close();
- }
- }
+ private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
+ return new OkHttpClient.Builder()
+
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+ .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS) //
连接阶段快速失败
+ .readTimeout(0, TimeUnit.SECONDS) // 等待TC推送数据(建立连接后持续监听服务器推送)
+ .writeTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ .build();
+ }
- @Override
- public void onFailure(Call call, IOException e) {
- if (call.isCanceled()) {
- callback.onCancelled();
- } else {
- callback.onFailure(e);
- }
- }
- });
+ public static <T> SeataHttpWatch<T> watch(String url, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
+ return watch(url, headers, null, "GET", eventType);
+ }
+
+ public static <T> SeataHttpWatch<T> watch(String url, Class<T> eventType)
throws IOException {
+ return watch(url, null, null, "GET", eventType);
+ }
+
+ /**
+ * Execute a watch request with specified HTTP method and return a Watch
iterator.
+ * This method creates a long-lived HTTP/2 connection to receive
Server-Sent Events (SSE).
+ */
+ private static <T> SeataHttpWatch<T> watch(
+ String url, Map<String, String> headers, RequestBody requestBody,
String method, Class<T> eventType)
+ throws IOException {
+
+ OkHttpClient client = createHttp2WatchClient(30);
+ Request request = buildHttp2WatchRequest(url, headers, requestBody,
method);
+ return SeataHttpWatch.createWatch(client, request, eventType);
+ }
+
+ public static <T> SeataHttpWatch<T> watchPost(
+ String url, Map<String, String> params, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
Review Comment:
Missing input validation. The method should validate that the 'url'
parameter is not null or empty before proceeding with the request. This would
provide clearer error messages when called incorrectly.
##########
common/src/test/java/org/apache/seata/common/util/SeataHttpWatchTest.java:
##########
@@ -0,0 +1,790 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.util;
+
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import okio.Buffer;
+import okio.BufferedSource;
+import org.apache.seata.common.exception.FrameworkException;
+import org.apache.seata.common.metadata.ClusterWatchEvent;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SeataHttpWatchTest {
+
+ private OkHttpClient mockClient;
+ private Call mockCall;
+ private Response mockResponse;
+ private ResponseBody mockResponseBody;
+ private BufferedSource mockSource;
+
+ @BeforeEach
+ public void setUp() {
+ mockClient = mock(OkHttpClient.class);
+ mockCall = mock(Call.class);
+ mockResponse = mock(Response.class);
+ mockResponseBody = mock(ResponseBody.class);
+ mockSource = mock(BufferedSource.class);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ // Clean up if needed
+ }
+
+ @Test
+ public void testCreateWatch_WithSuccessfulResponse() throws IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(mockSource);
+
+ // Execute
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Verify
+ assertNotNull(watch);
+ verify(mockClient).newCall(request);
+ verify(mockCall).execute();
+ }
+
+ @Test
+ public void testCreateWatch_WithUnsuccessfulResponse() throws IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(false);
+ when(mockResponse.code()).thenReturn(404);
+ when(mockResponse.message()).thenReturn("Not Found");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.string()).thenReturn("Error message");
+
+ // Execute & Verify
+ FrameworkException exception = assertThrows(FrameworkException.class,
() -> {
+ SeataHttpWatch.createWatch(mockClient, request,
ClusterWatchEvent.class);
+ });
+
+ assertTrue(exception.getMessage().contains("404"));
+ assertTrue(exception.getMessage().contains("Error message"));
+ }
+
+ @Test
+ public void
testCreateWatch_WithUnsuccessfulResponse_IOExceptionOnBodyRead() throws
IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(false);
+ when(mockResponse.code()).thenReturn(500);
+ when(mockResponse.message()).thenReturn("Internal Server Error");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.string()).thenThrow(new IOException("Read
error"));
+
+ // Execute & Verify
+ FrameworkException exception = assertThrows(FrameworkException.class,
() -> {
+ SeataHttpWatch.createWatch(mockClient, request,
ClusterWatchEvent.class);
+ });
+
+ assertTrue(exception.getMessage().contains("Watch request failed"));
+ }
+
+ @Test
+ public void testCreateWatch_WithNullContentType() throws IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+ when(mockResponse.header("Content-Type")).thenReturn(null);
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(mockSource);
+
+ // Execute
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Verify - should still create watch but log warning
+ assertNotNull(watch);
+ }
+
+ @Test
+ public void testHasNext_WithAvailableData() throws IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(mockSource);
+ when(mockSource.exhausted()).thenReturn(false);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ boolean hasNext = watch.hasNext();
+
+ // Verify
+ assertTrue(hasNext);
+ verify(mockSource).exhausted();
+ }
+
+ @Test
+ public void testHasNext_WithNoMoreData() throws IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(mockSource);
+ when(mockSource.exhausted()).thenReturn(true);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ boolean hasNext = watch.hasNext();
+
+ // Verify
+ assertFalse(hasNext);
+ }
+
+ @Test
+ public void testHasNext_WithIOException() throws IOException {
+ // Setup
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(mockSource);
+ when(mockSource.exhausted()).thenThrow(new IOException("Stream
error"));
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ boolean hasNext = watch.hasNext();
+
+ // Verify - should return false on IOException
+ assertFalse(hasNext);
+ }
+
+ @Test
+ public void testNext_WithKeepaliveEvent() throws IOException {
+ // Setup
+ String sseData = "data:
{\"type\":\"keepalive\",\"group\":\"default-test\",\"timestamp\":1234567890}\n\n";
+ Buffer buffer = new Buffer();
+ buffer.writeString(sseData, StandardCharsets.UTF_8);
+
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(buffer);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ SeataHttpWatch.Response<ClusterWatchEvent> response = watch.next();
+
+ // Verify
+ assertNotNull(response);
+ assertEquals(SeataHttpWatch.Response.Type.KEEPALIVE, response.type);
+ assertNotNull(response.object);
+ assertEquals("keepalive", response.object.getType());
+ assertEquals("default-test", response.object.getGroup());
+ assertEquals(1234567890L, response.object.getTimestamp());
+ }
+
+ @Test
+ public void testNext_WithClusterUpdateEvent() throws IOException {
+ // Setup
+ String sseData = "data:
{\"type\":\"cluster-update\",\"group\":\"default-test\",\"term\":2,\"timestamp\":1234567890}\n\n";
+ Buffer buffer = new Buffer();
+ buffer.writeString(sseData, StandardCharsets.UTF_8);
+
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(buffer);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ SeataHttpWatch.Response<ClusterWatchEvent> response = watch.next();
+
+ // Verify
+ assertNotNull(response);
+ assertEquals(SeataHttpWatch.Response.Type.CLUSTER_UPDATE,
response.type);
+ assertNotNull(response.object);
+ assertEquals("cluster-update", response.object.getType());
+ assertEquals("default-test", response.object.getGroup());
+ assertEquals(2L, response.object.getTerm());
+ assertEquals(1234567890L, response.object.getTimestamp());
+ }
+
+ @Test
+ public void testNext_WithTimeoutEvent() throws IOException {
+ // Setup
+ String sseData = "data:
{\"type\":\"timeout\",\"group\":\"default-test\",\"timestamp\":1234567890}\n\n";
+ Buffer buffer = new Buffer();
+ buffer.writeString(sseData, StandardCharsets.UTF_8);
+
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(buffer);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ SeataHttpWatch.Response<ClusterWatchEvent> response = watch.next();
+
+ // Verify
+ assertNotNull(response);
+ assertEquals(SeataHttpWatch.Response.Type.TIMEOUT, response.type);
+ assertNotNull(response.object);
+ assertEquals("timeout", response.object.getType());
+ }
+
+ @Test
+ public void testNext_WithUnknownEventType() throws IOException {
+ // Setup
+ String sseData = "data:
{\"type\":\"unknown-type\",\"group\":\"default-test\",\"timestamp\":1234567890}\n\n";
+ Buffer buffer = new Buffer();
+ buffer.writeString(sseData, StandardCharsets.UTF_8);
+
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(buffer);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ SeataHttpWatch.Response<ClusterWatchEvent> response = watch.next();
+
+ // Verify - should default to CLUSTER_UPDATE for unknown types
+ assertNotNull(response);
+ assertEquals(SeataHttpWatch.Response.Type.CLUSTER_UPDATE,
response.type);
+ }
+
+ @Test
+ public void testNext_WithNullEventType() throws IOException {
+ // Setup
+ String sseData = "data:
{\"group\":\"default-test\",\"timestamp\":1234567890}\n\n";
+ Buffer buffer = new Buffer();
+ buffer.writeString(sseData, StandardCharsets.UTF_8);
+
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(buffer);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ SeataHttpWatch.Response<ClusterWatchEvent> response = watch.next();
+
+ // Verify - should default to CLUSTER_UPDATE when type is null
+ assertNotNull(response);
+ assertEquals(SeataHttpWatch.Response.Type.CLUSTER_UPDATE,
response.type);
+ }
+
+ @Test
+ public void testNext_WithInvalidJson() throws IOException {
+ // Setup
+ String sseData = "data: {invalid json}\n\n";
+ Buffer buffer = new Buffer();
+ buffer.writeString(sseData, StandardCharsets.UTF_8);
+
+ Request request = new Request.Builder()
+ .url("http://localhost:8080/test")
+ .get()
+ .build();
+
+ when(mockClient.newCall(any(Request.class))).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(mockResponse);
+ when(mockResponse.isSuccessful()).thenReturn(true);
+
when(mockResponse.header("Content-Type")).thenReturn("text/event-stream");
+ when(mockResponse.body()).thenReturn(mockResponseBody);
+ when(mockResponseBody.source()).thenReturn(buffer);
+
+ SeataHttpWatch<ClusterWatchEvent> watch =
SeataHttpWatch.createWatch(mockClient, request, ClusterWatchEvent.class);
+
+ // Execute
+ SeataHttpWatch.Response<ClusterWatchEvent> response = watch.next();
+
+ // Verify - should return ERROR type with null object
+ assertNotNull(response);
+ assertEquals(SeataHttpWatch.Response.Type.ERROR, response.type);
+ assertNull(response.object);
+ }
+
+ // TC发送SSE的固定格式\n\n 一次事件只发送一行,客户端每次读一行
+ @Test
+ public void testNext_WithMultipleEvents() throws IOException {
+ // 模拟两个事件
Review Comment:
Chinese comment detected in test code. Comments should be in English for
consistency with the rest of the codebase and to maintain accessibility for all
contributors.
```suggestion
// TC sends SSE in a fixed format: each event is sent in a single line,
and the client reads one line at a time
@Test
public void testNext_WithMultipleEvents() throws IOException {
// Simulate two events
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]