Copilot commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2659250081
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -57,24 +59,35 @@ public class ClusterWatcherManager implements
ClusterChangeListener {
private static final Map<String, Long> GROUP_UPDATE_TERM = new
ConcurrentHashMap<>();
+ private static final Map<Watcher<HttpContext>, Boolean> HTTP2_HEADERS_SENT
= new ConcurrentHashMap<>();
+
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
new ScheduledThreadPoolExecutor(1, new
NamedThreadFactory("long-polling", 1));
@PostConstruct
public void init() {
- // Responds to monitors that time out
+ // Periodically check and respond to watchers that have timed out
scheduledThreadPoolExecutor.scheduleAtFixedRate(
() -> {
for (String group : WATCHERS.keySet()) {
Optional.ofNullable(WATCHERS.remove(group))
.ifPresent(watchers ->
watchers.parallelStream().forEach(watcher -> {
- if (System.currentTimeMillis() >=
watcher.getTimeout()) {
- watcher.setDone(true);
- sendWatcherResponse(watcher,
HttpResponseStatus.NOT_MODIFIED);
- }
- if (!watcher.isDone()) {
- // Re-register
- registryWatcher(watcher);
+ HttpContext context =
watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof
HttpContext && context.isHttp2();
+ if (isHttp2) {
+ if
(!context.getContext().channel().isActive()) {
+ watcher.setDone(true);
+ HTTP2_HEADERS_SENT.remove(watcher);
+ } else {
+ registryWatcher(watcher);
+ }
+ } else {
+ if (System.currentTimeMillis() >=
watcher.getTimeout()) {
+ watcher.setDone(true);
+ sendWatcherResponse(watcher,
HttpResponseStatus.NOT_MODIFIED, true, false);
+ } else if (!watcher.isDone()) {
+ registryWatcher(watcher);
+ }
}
}));
}
Review Comment:
The scheduled task removes all watchers from a group at once using
`WATCHERS.remove(group)`, then processes them. If new watchers are registered
for the same group during processing, they could be lost. Consider using a more
concurrent-safe approach, such as iterating over watchers without removing the
entire queue first, or using a copy-on-write approach.
##########
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java:
##########
@@ -81,43 +76,96 @@ void watchTimeoutTest() throws Exception {
@Test
@Order(2)
- void watchTimeoutTest_withHttp2() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
-
+ void watchTimeoutTest_http2() throws Exception {
Map<String, String> headers = new HashMap<>();
- headers.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+ headers.put("Content-Type",
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
Map<String, String> params = new HashMap<>();
params.put("default-test", "1");
- HttpCallback<Response> callback = new HttpCallback<Response>() {
- @Override
- public void onSuccess(Response response) {
- Assertions.assertNotNull(response);
- Assertions.assertEquals(Protocol.H2_PRIOR_KNOWLEDGE,
response.protocol());
- Assertions.assertEquals(HttpStatus.SC_NOT_MODIFIED,
response.code());
- latch.countDown();
+ // For HTTP2, the connection should remain open and not timeout
+ // The test verifies that the connection stays alive beyond the
timeout period
+ try (SeataHttpWatch<ClusterWatchEvent> watch =
HttpClientUtil.watchPost(
+ "http://127.0.0.1:" + port + "/metadata/v1/watch?timeout=3000",
+ params,
+ headers,
+ ClusterWatchEvent.class)) {
+
+ boolean keepaliveReceived = false;
+ long startTime = System.currentTimeMillis();
+ // Verify KEEPALIVE is received immediately
+ boolean hasNext = watch.hasNext();
+ if (!hasNext) {
+ Assertions.fail("Expect KEEPALIVE Event Received...");
}
-
- @Override
- public void onFailure(Throwable t) {
- Assertions.fail("Should not fail");
+ SeataHttpWatch.Response<ClusterWatchEvent> firstResponse =
watch.next();
+ Assertions.assertEquals(
+ SeataHttpWatch.Response.Type.KEEPALIVE,
firstResponse.type, "First event should be KEEPALIVE");
+ Assertions.assertNotNull(firstResponse.object, "Event data should
not be null");
+ ClusterWatchEvent keepaliveEvent = firstResponse.object;
+ Assertions.assertEquals("keepalive", keepaliveEvent.getType(),
"Event type should be 'keepalive'");
+ Assertions.assertEquals("default-test", keepaliveEvent.getGroup(),
"Group should match");
+ Assertions.assertNotNull(keepaliveEvent.getTimestamp(), "Timestamp
should not be null");
+ keepaliveReceived = true;
+
+ long elapsed = System.currentTimeMillis() - startTime;
+ Assertions.assertTrue(
+ elapsed < 1000,
+ "KEEPALIVE should be received immediately after
connection, elapsed: " + elapsed + "ms");
+ Assertions.assertTrue(keepaliveReceived, "Keepalive should be
received after connection");
+
+ // Verify connection remains alive beyond timeout period (3000ms)
+ // Note: hasNext() blocks when no data is available, so we cannot
use it to check connection status
+ // Instead, we wait beyond the timeout period and then trigger a
cluster change event
+ // If we can receive the event, it proves the connection is still
alive
+ long timeoutPeriod = 3000; // Timeout period from query parameter
+ long waitTime = timeoutPeriod + 1000; // Wait timeout period + 1
second
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ Assertions.fail("Test interrupted while waiting for connection
verification");
}
- @Override
- public void onCancelled() {
- Assertions.fail("Should not be cancelled");
+ // Trigger a cluster change event to verify connection is still
active
+ Thread triggerThread = new Thread(() -> {
+ try {
+ Thread.sleep(2000); // Small delay to ensure we're past
timeout
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+ .publishEvent(new ClusterChangeEvent(this,
"default-test", 2, true));
+ });
+ triggerThread.start();
+ boolean clusterUpdateReceived = false;
+ if (!watch.hasNext()) {
+ Assertions.fail("Expect cluster-update Event Received...");
+ }
Review Comment:
The test starts a trigger thread that sleeps for 2000ms, but immediately
after starting the thread it calls `watch.hasNext()` which will block. This
creates a race condition where `hasNext()` might block indefinitely if the
trigger thread hasn't published the event yet. The thread should be started
before the sleep on line 124, or the wait logic should be restructured to
ensure the trigger thread completes before checking for the next event.
##########
common/src/main/java/org/apache/seata/common/util/SeataHttpWatch.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.util;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.ResponseBody;
+import okio.BufferedSource;
+import org.apache.seata.common.exception.FrameworkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Seata HTTP/2 Watch implementation.
+ * Consumes server-pushed SSE data frames via an iterator-style API.
+ *
+ * @param <T> event data type
+ */
+public class SeataHttpWatch<T>
+ implements Iterator<SeataHttpWatch.Response<T>>,
Iterable<SeataHttpWatch.Response<T>>, AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SeataHttpWatch.class);
+
+ private final ResponseBody responseBody;
+ private final BufferedSource source;
+ private final Call call;
+ private final ObjectMapper objectMapper;
+ private final Class<T> eventType;
+
+ /**
+ * Response wrapper containing event type and data
+ */
+ public static class Response<T> {
+ /**
+ * Event type enum - matches SSE event field values
+ */
+ public enum Type {
+ /**
+ * Cluster update event
+ */
+ CLUSTER_UPDATE,
+ /**
+ * Keep-alive event (connection established)
+ */
+ KEEPALIVE,
+ /**
+ * Timeout event (stream closed due to timeout)
+ */
+ TIMEOUT,
+ /**
+ * Error event
+ */
+ ERROR
+ }
+
+ public final Type type;
+
+ public final T object;
+
+ public Response(Type type, T object) {
+ this.type = type;
+ this.object = object;
+ }
+ }
+
+ /**
+ * Create a Watch instance from an OkHttp call
+ *
+ * @param call the prepared HTTP call
+ * @param eventType the class type for deserializing event data
+ * @param <T> the event data type
+ * @return a Watch instance
+ * @throws IOException if the request fails
+ */
+ public static <T> SeataHttpWatch<T> createWatch(Call call, Class<T>
eventType) throws IOException {
+
+ okhttp3.Response response = call.execute();
+
+ if (!response.isSuccessful()) {
+ String respBody = null;
+ try (ResponseBody body = response.body()) {
+ if (body != null) {
+ respBody = body.string();
+ }
+ } catch (IOException e) {
+ throw new FrameworkException(e, "Watch request failed: " +
response.message());
+ }
+ throw new FrameworkException(
+ String.format("Watch request failed with code %d: %s",
response.code(), respBody));
+ }
+
+ // Verify Content-Type is SSE
+ String contentType = response.header("Content-Type");
+ if (contentType == null || !contentType.contains("text/event-stream"))
{
+ LOGGER.warn("Expected Content-Type: text/event-stream, got: {}",
contentType);
+ }
+
+ return new SeataHttpWatch<>(response.body(), call, eventType);
+ }
+
+ /**
+ * Create a Watch instance with a prepared request
+ *
+ * @param client the OkHttpClient instance
+ * @param request the HTTP request
+ * @param eventType the class type for deserializing event data
+ * @param <T> the event data type
+ * @return a Watch instance
+ * @throws IOException if the request fails
+ */
+ public static <T> SeataHttpWatch<T> createWatch(OkHttpClient client,
Request request, Class<T> eventType)
+ throws IOException {
+
+ Call call = client.newCall(request);
+ return createWatch(call, eventType);
+ }
+
+ private SeataHttpWatch(ResponseBody responseBody, Call call, Class<T>
eventType) {
+ this.responseBody = responseBody;
+ this.source = responseBody.source();
+ this.call = call;
+ this.objectMapper = new ObjectMapper();
+ this.eventType = eventType;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ // Check if source is exhausted (stream closed)
+ return !source.exhausted();
+ } catch (IOException e) {
+ LOGGER.error("Error checking if stream has more data", e);
+ return false;
+ }
+ }
+
+ @Override
+ public Response<T> next() {
+ try {
+ // Read complete SSE event in a loop (not recursive to avoid stack
overflow)
+ // SSE format: "data: {json}\n\n"
+ // Each event is separated by double newline
+ // Event type is included in the JSON data, not in a separate
"event:" field
+
+ StringBuilder dataBuffer = new StringBuilder();
+
+ // Read lines until we get a complete event (ending with empty
line)
+ // This loop reads all lines of a single event atomically
+ while (true) {
+ String line = source.readUtf8Line();
+
+ if (line == null) {
+ // Stream closed
+ if (dataBuffer.length() > 0) {
+ // We have partial data, try to parse it
+ String eventJson = dataBuffer.toString();
+ return parseEvent(eventJson);
+ }
+ throw new RuntimeException("Stream closed unexpectedly");
+ }
+
+ if (line.startsWith("data: ")) {
+ String jsonData = line.substring(6);
+ if (dataBuffer.length() > 0) {
+ dataBuffer.append('\n');
+ }
+ dataBuffer.append(jsonData);
+ } else if (line.isEmpty()) {
+ // Empty line indicates end of event
+ if (dataBuffer.length() > 0) {
+ String eventJson = dataBuffer.toString();
+ return parseEvent(eventJson);
+ }
+ // Empty line but no data, continue reading (skip blank
lines between events)
+ } else {
+ LOGGER.debug("Unknown SSE line format, ignoring: {}",
line);
+ }
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException("IO Exception during next()", e);
+ }
+ }
+
+ /**
+ * Parse SSE event JSON into Response object
+ *
+ * @param json the JSON string to parse (contains type field)
+ * @return the parsed Response object
+ * @throws IOException if parsing fails
+ */
+ private Response<T> parseEvent(String json) throws IOException {
+ try {
+ // Parse JSON once to get both event type and full data
+ // First, parse to extract event type
+ EventMetadata metadata = objectMapper.readValue(json,
EventMetadata.class);
+
+ // Map event type from JSON to Response.Type
+ Response.Type responseType =
mapEventTypeToResponseType(metadata.type);
+
+ // Deserialize the full event data (parse once, reuse if possible)
+ T eventData = objectMapper.readValue(json, eventType);
Review Comment:
The JSON is being parsed twice - once to extract the event type (line 216)
and again to deserialize the full event data (line 222). This is inefficient
and could impact performance, especially with high-frequency events. Consider
parsing once and extracting the type from the deserialized object, or using a
single pass with a custom deserializer.
##########
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java:
##########
@@ -138,63 +186,176 @@ public void run() {
try (Response response =
HttpClientUtil.doPost("http://127.0.0.1:" + port +
"/metadata/v1/watch", param, header, 30000)) {
if (response != null) {
- Assertions.assertEquals(HttpStatus.SC_OK, response.code());
+ Assertions.assertEquals(200, response.code());
return;
}
}
Assertions.fail();
}
@Test
- @Order(5)
- void watch_withHttp2() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
-
- Map<String, String> headers = new HashMap<>();
- headers.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
-
- Map<String, String> params = new HashMap<>();
- params.put("default-test", "1");
-
- Thread thread = new Thread(() -> {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ @Order(4)
+ void watchStream_http2() throws Exception {
+ Map<String, String> header = new HashMap<>();
+ header.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+ Map<String, String> param = new HashMap<>();
+ param.put("default-test", "1");
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+ .publishEvent(new ClusterChangeEvent(this,
"default-test", 2, true));
}
- ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
- .publishEvent(new ClusterChangeEvent(this, "default-test",
2, true));
});
thread.start();
- HttpCallback<Response> callback = new HttpCallback<Response>() {
- @Override
- public void onSuccess(Response response) {
- Assertions.assertNotNull(response);
- Assertions.assertEquals(Protocol.H2_PRIOR_KNOWLEDGE,
response.protocol());
- Assertions.assertEquals(HttpStatus.SC_OK, response.code());
- latch.countDown();
- }
+ boolean keepaliveReceived = false;
+ boolean clusterUpdateReceived = false;
+ long startTime = System.currentTimeMillis();
+ long maxWaitTime = 10000; // Maximum wait time: 10 seconds
- @Override
- public void onFailure(Throwable t) {
- Assertions.fail("Should not fail: " + t.getMessage());
+ try (SeataHttpWatch<ClusterWatchEvent> watch =
HttpClientUtil.watchPost(
+ "http://127.0.0.1:" + port + "/metadata/v1/watch", param,
header, ClusterWatchEvent.class)) {
+ // For HTTP2, connection will remain open, so we need to break
after receiving expected events
+ // Note: hasNext() blocks when no data is available, so we use
next() directly with timeout protection
+ while (System.currentTimeMillis() - startTime < maxWaitTime) {
+ try {
+ SeataHttpWatch.Response<ClusterWatchEvent> response =
watch.next();
Review Comment:
The `watch.next()` call blocks indefinitely if no data is available. The
timeout protection using `System.currentTimeMillis() - startTime < maxWaitTime`
only checks the elapsed time but doesn't interrupt a blocking `next()` call. If
the server never sends an event, this test could hang for much longer than 10
seconds. Consider using a separate thread with timeout or a non-blocking
approach to read events.
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -57,24 +59,35 @@ public class ClusterWatcherManager implements
ClusterChangeListener {
private static final Map<String, Long> GROUP_UPDATE_TERM = new
ConcurrentHashMap<>();
+ private static final Map<Watcher<HttpContext>, Boolean> HTTP2_HEADERS_SENT
= new ConcurrentHashMap<>();
+
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
new ScheduledThreadPoolExecutor(1, new
NamedThreadFactory("long-polling", 1));
@PostConstruct
public void init() {
- // Responds to monitors that time out
+ // Periodically check and respond to watchers that have timed out
scheduledThreadPoolExecutor.scheduleAtFixedRate(
() -> {
for (String group : WATCHERS.keySet()) {
Optional.ofNullable(WATCHERS.remove(group))
.ifPresent(watchers ->
watchers.parallelStream().forEach(watcher -> {
- if (System.currentTimeMillis() >=
watcher.getTimeout()) {
- watcher.setDone(true);
- sendWatcherResponse(watcher,
HttpResponseStatus.NOT_MODIFIED);
- }
- if (!watcher.isDone()) {
- // Re-register
- registryWatcher(watcher);
+ HttpContext context =
watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof
HttpContext && context.isHttp2();
Review Comment:
The check `context instanceof HttpContext` is redundant since the parameter
type already declares `context` as `HttpContext`. This check will always return
true and should be removed.
```suggestion
boolean isHttp2 = context.isHttp2();
```
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -128,27 +174,63 @@ private void sendWatcherResponse(Watcher<HttpContext>
watcher, HttpResponseStatu
} else {
ctx.writeAndFlush(response);
}
- } else {
- // HTTP/2 response (h2c support)
- // Send headers frame
+ return;
+ }
+
+ // For HTTP/2, headers must be sent first on the initial response
+ if (sendHeaders) {
Http2Headers headers = new
DefaultHttp2Headers().status(nettyStatus.codeAsText());
- headers.set(HttpHeaderNames.CONTENT_LENGTH, "0");
+ headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream;
charset=utf-8");
+ headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
+
ctx.write(new DefaultHttp2HeadersFrame(headers));
+ }
+
+ String group = watcher.getGroup();
+ String sse = buildSSEFormat(nettyStatus, closeStream, sendHeaders,
group);
+
+ ByteBuf content = Unpooled.copiedBuffer(sse, StandardCharsets.UTF_8);
- // Send empty data frame with endStream=true to close the stream
- ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER,
true))
- .addListener(f -> {
- if (!f.isSuccess()) {
- logger.warn("HTTP2 response send failed,
group={}", group, f.cause());
- }
- });
+ // Send DATA frame (if closeStream is true, it will end the current
stream)
+ ctx.write(new DefaultHttp2DataFrame(content, closeStream));
+ ctx.flush();
+ }
+
+ private String buildSSEFormat(
+ HttpResponseStatus nettyStatus, boolean closeStream, boolean
sendHeaders, String group) {
+ // Determine event type (embedded in JSON, not in SSE event field)
+ String eventType;
+ if (sendHeaders) {
+ // Send keepalive event when stream is first established to
confirm connection
+ eventType = "keepalive";
+ } else if (closeStream && nettyStatus ==
HttpResponseStatus.NOT_MODIFIED) {
+ // Timeout event, stream needs to be closed
+ eventType = "timeout";
+ } else {
+ // Normal cluster update event
+ eventType = "cluster-update";
}
+
+ String json = String.format(
+
"{\"type\":\"%s\",\"group\":\"%s\",\"term\":%d,\"timestamp\":%d}",
+ eventType, group, GROUP_UPDATE_TERM.getOrDefault(group, 0L),
System.currentTimeMillis());
+ logger.debug("Sending watch event: {}", json);
+
+ // SSE format: only send data: field, event type is embedded in JSON
+ return "data: " + json + "\n\n";
}
public void registryWatcher(Watcher<HttpContext> watcher) {
String group = watcher.getGroup();
Long term = GROUP_UPDATE_TERM.get(group);
+ HttpContext context = watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof HttpContext && context.isHttp2();
Review Comment:
The check `context instanceof HttpContext` is redundant since the parameter
type already declares `context` as `HttpContext`. This check will always return
true and should be removed.
```suggestion
boolean isHttp2 = context.isHttp2();
```
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -90,35 +103,68 @@ public void init() {
public void onChangeEvent(ClusterChangeEvent event) {
if (event.getTerm() > 0) {
GROUP_UPDATE_TERM.put(event.getGroup(), event.getTerm());
- // Notifications are made of changes in cluster information
+ // Notify all watchers of cluster information changes
Optional.ofNullable(WATCHERS.remove(event.getGroup()))
.ifPresent(watchers ->
watchers.parallelStream().forEach(this::notifyWatcher));
}
}
private void notifyWatcher(Watcher<HttpContext> watcher) {
- watcher.setDone(true);
- sendWatcherResponse(watcher, HttpResponseStatus.OK);
- }
+ HttpContext context = watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof HttpContext && context.isHttp2();
Review Comment:
The check `context instanceof HttpContext` is redundant since the parameter
type already declares `context` as `HttpContext`. This check will always return
true and should be removed.
##########
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java:
##########
@@ -138,63 +186,176 @@ public void run() {
try (Response response =
HttpClientUtil.doPost("http://127.0.0.1:" + port +
"/metadata/v1/watch", param, header, 30000)) {
if (response != null) {
- Assertions.assertEquals(HttpStatus.SC_OK, response.code());
+ Assertions.assertEquals(200, response.code());
return;
}
}
Assertions.fail();
}
@Test
- @Order(5)
- void watch_withHttp2() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
-
- Map<String, String> headers = new HashMap<>();
- headers.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
-
- Map<String, String> params = new HashMap<>();
- params.put("default-test", "1");
-
- Thread thread = new Thread(() -> {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ @Order(4)
+ void watchStream_http2() throws Exception {
+ Map<String, String> header = new HashMap<>();
+ header.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+ Map<String, String> param = new HashMap<>();
+ param.put("default-test", "1");
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+ .publishEvent(new ClusterChangeEvent(this,
"default-test", 2, true));
}
- ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
- .publishEvent(new ClusterChangeEvent(this, "default-test",
2, true));
});
thread.start();
- HttpCallback<Response> callback = new HttpCallback<Response>() {
- @Override
- public void onSuccess(Response response) {
- Assertions.assertNotNull(response);
- Assertions.assertEquals(Protocol.H2_PRIOR_KNOWLEDGE,
response.protocol());
- Assertions.assertEquals(HttpStatus.SC_OK, response.code());
- latch.countDown();
- }
+ boolean keepaliveReceived = false;
+ boolean clusterUpdateReceived = false;
+ long startTime = System.currentTimeMillis();
+ long maxWaitTime = 10000; // Maximum wait time: 10 seconds
- @Override
- public void onFailure(Throwable t) {
- Assertions.fail("Should not fail: " + t.getMessage());
+ try (SeataHttpWatch<ClusterWatchEvent> watch =
HttpClientUtil.watchPost(
+ "http://127.0.0.1:" + port + "/metadata/v1/watch", param,
header, ClusterWatchEvent.class)) {
+ // For HTTP2, connection will remain open, so we need to break
after receiving expected events
+ // Note: hasNext() blocks when no data is available, so we use
next() directly with timeout protection
+ while (System.currentTimeMillis() - startTime < maxWaitTime) {
+ try {
+ SeataHttpWatch.Response<ClusterWatchEvent> response =
watch.next();
+ SeataHttpWatch.Response.Type type = response.type;
+
+ if (type == SeataHttpWatch.Response.Type.KEEPALIVE) {
+ keepaliveReceived = true;
+ Assertions.assertNotNull(response.object, "KEEPALIVE
event data should not be null");
+ Assertions.assertEquals(
+ "keepalive", response.object.getType(), "Event
type should be 'keepalive'");
+ } else if (type ==
SeataHttpWatch.Response.Type.CLUSTER_UPDATE) {
+ clusterUpdateReceived = true;
+ Assertions.assertNotNull(response.object,
"CLUSTER_UPDATE event data should not be null");
+ Assertions.assertEquals(
+ "cluster-update", response.object.getType(),
"Event type should be 'cluster-update'");
+ Assertions.assertEquals("default-test",
response.object.getGroup(), "Group should match");
+ Assertions.assertNotNull(response.object.getTerm(),
"Term should not be null");
+ Assertions.assertEquals(2L,
response.object.getTerm().longValue(), "Term should be 2");
+ // After receiving cluster update, exit the loop
+ break;
+ }
+ } catch (Exception e) {
+ // If connection was closed or timeout, break the loop
+ if (keepaliveReceived && clusterUpdateReceived) {
+ // We already received expected events, break normally
+ break;
+ }
+ throw new RuntimeException("Unexpected exception while
waiting for events", e);
+ }
}
- @Override
- public void onCancelled() {
- Assertions.fail("Should not be cancelled");
+ // Verify both events were received
+ Assertions.assertTrue(keepaliveReceived, "KEEPALIVE event should
be received");
+ Assertions.assertTrue(
+ clusterUpdateReceived, "CLUSTER_UPDATE event should be
received after cluster change");
+ } catch (IOException e) {
+ throw new RuntimeException("Watch failed", e);
+ }
+ }
+
+ @Test
+ @Order(5)
+ void watchMultipleClusterUpdates_http2() throws Exception {
+ Map<String, String> header = new HashMap<>();
+ header.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+ Map<String, String> param = new HashMap<>();
+ param.put("default-test", "1");
+
+ // Trigger multiple cluster change events with different terms
+ Thread triggerThread = new Thread(() -> {
+ try {
+ Thread.sleep(3000); // Wait for connection to be established
+ ApplicationEventPublisher publisher =
(ApplicationEventPublisher)
+
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT);
+
+ // Trigger first cluster change event (term = 2)
+ publisher.publishEvent(new ClusterChangeEvent(this,
"default-test", 2, true));
+ Thread.sleep(500);
+
+ // Trigger second cluster change event (term = 3)
+ publisher.publishEvent(new ClusterChangeEvent(this,
"default-test", 3, true));
+ Thread.sleep(500);
+
+ // Trigger third cluster change event (term = 4)
+ publisher.publishEvent(new ClusterChangeEvent(this,
"default-test", 4, true));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ triggerThread.start();
+
+ boolean keepaliveReceived = false;
+ int clusterUpdateCount = 0;
+ long startTime = System.currentTimeMillis();
+ long maxWaitTime = 10000; // Maximum wait time: 10 seconds
+ long[] expectedTerms = {2L, 3L, 4L};
+
+ try (SeataHttpWatch<ClusterWatchEvent> watch =
HttpClientUtil.watchPost(
+ "http://127.0.0.1:" + port + "/metadata/v1/watch", param,
header, ClusterWatchEvent.class)) {
+ // Receive events until we get all expected cluster updates
+ while (System.currentTimeMillis() - startTime < maxWaitTime &&
clusterUpdateCount < expectedTerms.length) {
+ try {
+ if (watch.hasNext()) {
+ SeataHttpWatch.Response<ClusterWatchEvent> response =
watch.next();
+ SeataHttpWatch.Response.Type type = response.type;
+ if (type == SeataHttpWatch.Response.Type.KEEPALIVE) {
+ keepaliveReceived = true;
+ Assertions.assertNotNull(response.object,
"KEEPALIVE event data should not be null");
+ Assertions.assertEquals(
+ "keepalive", response.object.getType(),
"Event type should be 'keepalive'");
+ } else if (type ==
SeataHttpWatch.Response.Type.CLUSTER_UPDATE) {
+ clusterUpdateCount++;
+ Assertions.assertNotNull(response.object,
"CLUSTER_UPDATE event data should not be null");
+ Assertions.assertEquals(
+ "cluster-update",
+ response.object.getType(),
+ "Event type should be 'cluster-update'");
+ Assertions.assertEquals("default-test",
response.object.getGroup(), "Group should match");
+
Assertions.assertNotNull(response.object.getTerm(), "Term should not be null");
+
+ // Verify term matches expected value
+ long expectedTerm =
expectedTerms[clusterUpdateCount - 1];
+ long actualTerm =
response.object.getTerm().longValue();
+ Assertions.assertEquals(
+ expectedTerm,
+ actualTerm,
+ "Term should be " + expectedTerm + "But
actualTerm is " + actualTerm
Review Comment:
The error message has a typo: "But actualTerm is" should be "but actualTerm
is" (lowercase 'b'). Additionally, a space is missing before "But" in the
concatenated string.
```suggestion
"Term should be " + expectedTerm + " but
actualTerm is " + actualTerm
```
##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +215,101 @@ private static String buildUrlWithParams(String url,
Map<String, String> params)
return urlBuilder.toString();
}
- private static void executeAsync(OkHttpClient client, Request request,
final HttpCallback<Response> callback) {
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onResponse(Call call, Response response) {
- try {
- callback.onSuccess(response);
- } finally {
- response.close();
- }
- }
+ /**
+ * Create an HTTP/2 client for watch connections.
+ * This client is configured for long-lived connections to receive
Server-Sent Events (SSE).
+ * The client instances are cached and reused based on the connection
timeout to improve performance.
+ *
+ * @param connectTimeoutSeconds connection timeout in seconds (fast
failure if server is unreachable)
+ * @return configured OkHttpClient instance (cached and reused)
+ */
+ private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
+ return HTTP2_CLIENT_MAP.computeIfAbsent(connectTimeoutSeconds, k ->
new OkHttpClient.Builder()
+
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+ // Fast failure during connection phase
+ .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ // Infinite read timeout to allow continuous listening for
server push
+ .readTimeout(0, TimeUnit.SECONDS)
+ .writeTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ .build());
+ }
- @Override
- public void onFailure(Call call, IOException e) {
- if (call.isCanceled()) {
- callback.onCancelled();
- } else {
- callback.onFailure(e);
- }
- }
- });
+ public static <T> SeataHttpWatch<T> watch(String url, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
+ return watch(url, headers, null, "GET", eventType);
+ }
+
+ public static <T> SeataHttpWatch<T> watch(String url, Class<T> eventType)
throws IOException {
+ return watch(url, null, null, "GET", eventType);
+ }
+
+ /**
+ * Execute a watch request with specified HTTP method and return a Watch
iterator.
+ * This method creates a long-lived HTTP/2 connection to receive
Server-Sent Events (SSE).
+ *
+ * @param url the URL to watch (must not be null or blank)
+ * @param headers HTTP headers
+ * @param requestBody request body (optional)
+ * @param method HTTP method (GET, POST, PUT)
+ * @param eventType the class type for deserializing event data
+ * @param <T> the event data type
+ * @return a Watch instance for receiving SSE events
+ * @throws IOException if the request fails
+ * @throws IllegalArgumentException if the URL is null or blank
+ */
+ private static <T> SeataHttpWatch<T> watch(
+ String url, Map<String, String> headers, RequestBody requestBody,
String method, Class<T> eventType)
+ throws IOException {
+
+ if (StringUtils.isBlank(url)) {
+ throw new IllegalArgumentException("URL must not be null or
blank");
+ }
+
+ OkHttpClient client =
createHttp2WatchClient(HTTP2_WATCH_CONNECT_TIMEOUT_SECONDS);
+ Request request = buildHttp2WatchRequest(url, headers, requestBody,
method);
+ return SeataHttpWatch.createWatch(client, request, eventType);
+ }
+
+ public static <T> SeataHttpWatch<T> watchPost(
+ String url, Map<String, String> params, Map<String, String>
headers, Class<T> eventType)
+ throws IOException {
+ try {
+ String contentType = headers != null ? headers.get("Content-Type")
: "";
+ RequestBody requestBody = createRequestBody(params, contentType);
+ return watch(url, headers, requestBody, "POST", eventType);
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Failed to create request body", e);
+ throw new IOException("Failed to create request body", e);
+ }
+ }
+
+ public static <T> SeataHttpWatch<T> watchPost(String url, Map<String,
String> params, Class<T> eventType)
+ throws IOException {
+ return watchPost(url, params, null, eventType);
+ }
+
+ private static Request buildHttp2WatchRequest(
+ String url, Map<String, String> headers, RequestBody requestBody,
String method) {
+ Headers.Builder headerBuilder = new Headers.Builder();
+ if (headers != null) {
+ headers.forEach(headerBuilder::add);
+ }
+ // Always add Accept header for SSE
+ headerBuilder.add("Accept", "text/event-stream");
+
+ Request.Builder requestBuilder = new
Request.Builder().url(url).headers(headerBuilder.build());
+
+ if ("POST".equals(method) && requestBody != null) {
+ requestBuilder.post(requestBody);
+ } else if ("PUT".equals(method) && requestBody != null) {
+ requestBuilder.put(requestBody);
+ } else if ("GET".equals(method)) {
+ requestBuilder.get();
+ } else {
+ // Default to GET if method is not specified or not supported
+ requestBuilder.get();
Review Comment:
The method allows POST and PUT with requestBody, but silently defaults to
GET for any unsupported method (including DELETE, PATCH, etc.). This could mask
errors when an incorrect method is specified. Consider throwing an
IllegalArgumentException for unsupported methods instead of silently defaulting
to GET.
```suggestion
throw new IllegalArgumentException("Unsupported HTTP method for
watch request: " + method);
```
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -90,35 +103,68 @@ public void init() {
public void onChangeEvent(ClusterChangeEvent event) {
if (event.getTerm() > 0) {
GROUP_UPDATE_TERM.put(event.getGroup(), event.getTerm());
- // Notifications are made of changes in cluster information
+ // Notify all watchers of cluster information changes
Optional.ofNullable(WATCHERS.remove(event.getGroup()))
.ifPresent(watchers ->
watchers.parallelStream().forEach(this::notifyWatcher));
}
}
private void notifyWatcher(Watcher<HttpContext> watcher) {
- watcher.setDone(true);
- sendWatcherResponse(watcher, HttpResponseStatus.OK);
- }
+ HttpContext context = watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof HttpContext && context.isHttp2();
+
+ if (!isHttp2) {
+ watcher.setDone(true);
+ }
+
+ boolean isFirstResponse = !HTTP2_HEADERS_SENT.getOrDefault(watcher,
false);
+ sendWatcherResponse(watcher, HttpResponseStatus.OK, false,
isFirstResponse);
+ if (isFirstResponse && isHttp2) {
+ HTTP2_HEADERS_SENT.put(watcher, true);
+ }
- private void sendWatcherResponse(Watcher<HttpContext> watcher,
HttpResponseStatus nettyStatus) {
+ // Update watcher's term to the latest term to prevent infinite loop
+ // This ensures that when registryWatcher is called, it won't trigger
notifyWatcher again
String group = watcher.getGroup();
+ Long latestTerm = GROUP_UPDATE_TERM.get(group);
+ if (latestTerm != null && latestTerm > watcher.getTerm()) {
+ watcher.setTerm(latestTerm);
+ }
+
+ // For HTTP/2, re-register the watcher to continue listening for
future updates
+ if (isHttp2 && !watcher.isDone()) {
+ registryWatcher(watcher);
+ }
+ }
+ /**
+ * Send watcher response to the client.
+ *
+ * @param watcher the watcher instance
+ * @param nettyStatus the HTTP status code
+ * @param closeStream whether to close the HTTP/2 stream (endStream=true)
+ * @param sendHeaders whether to send HTTP/2 headers frame (only needed
for first response)
+ */
+ private void sendWatcherResponse(
+ Watcher<HttpContext> watcher, HttpResponseStatus nettyStatus,
boolean closeStream, boolean sendHeaders) {
+
HttpContext context = watcher.getAsyncContext();
if (!(context instanceof HttpContext)) {
logger.warn(
"Unsupported context type for watcher on group {}: {}",
- group,
+ watcher.getGroup(),
context != null ? context.getClass().getName() : "null");
return;
}
Review Comment:
The check `context instanceof HttpContext` is redundant and will always be
true since the parameter is already declared as `HttpContext`. Additionally,
the warning message at line 155 will never be reached in normal circumstances.
Consider removing this check or adjusting the logic if this is meant to check
for a more specific subtype.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]