funky-eyes commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2659252630


##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +215,101 @@ private static String buildUrlWithParams(String url, 
Map<String, String> params)
         return urlBuilder.toString();
     }
 
-    private static void executeAsync(OkHttpClient client, Request request, 
final HttpCallback<Response> callback) {
-        client.newCall(request).enqueue(new Callback() {
-            @Override
-            public void onResponse(Call call, Response response) {
-                try {
-                    callback.onSuccess(response);
-                } finally {
-                    response.close();
-                }
-            }
+    /**
+     * Create an HTTP/2 client for watch connections.
+     * This client is configured for long-lived connections to receive 
Server-Sent Events (SSE).
+     * The client instances are cached and reused based on the connection 
timeout to improve performance.
+     *
+     * @param connectTimeoutSeconds connection timeout in seconds (fast 
failure if server is unreachable)
+     * @return configured OkHttpClient instance (cached and reused)
+     */
+    private static OkHttpClient createHttp2WatchClient(int 
connectTimeoutSeconds) {
+        return HTTP2_CLIENT_MAP.computeIfAbsent(connectTimeoutSeconds, k -> 
new OkHttpClient.Builder()
+                
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+                // Fast failure during connection phase
+                .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+                // Infinite read timeout to allow continuous listening for 
server push
+                .readTimeout(0, TimeUnit.SECONDS)

Review Comment:
   We should make the read timeout configurable instead of hardcoding it to 0. 
When readTimeout is set to 0, the underlying socket timeout becomes infinite, 
which can cause the request to block indefinitely. In cases where the server 
shuts down abruptly (e.g., a crash without a graceful TCP handshake), this may 
lead to permanent blocking.
   The connectTimeout should be limited to around 10 seconds, while the 
readTimeout should be adjustable based on the specific request type.



##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -57,24 +59,35 @@ public class ClusterWatcherManager implements 
ClusterChangeListener {
 
     private static final Map<String, Long> GROUP_UPDATE_TERM = new 
ConcurrentHashMap<>();
 
+    private static final Map<Watcher<HttpContext>, Boolean> HTTP2_HEADERS_SENT 
= new ConcurrentHashMap<>();
+
     private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
             new ScheduledThreadPoolExecutor(1, new 
NamedThreadFactory("long-polling", 1));
 
     @PostConstruct
     public void init() {
-        // Responds to monitors that time out
+        // Periodically check and respond to watchers that have timed out
         scheduledThreadPoolExecutor.scheduleAtFixedRate(
                 () -> {
                     for (String group : WATCHERS.keySet()) {
                         Optional.ofNullable(WATCHERS.remove(group))
                                 .ifPresent(watchers -> 
watchers.parallelStream().forEach(watcher -> {
-                                    if (System.currentTimeMillis() >= 
watcher.getTimeout()) {
-                                        watcher.setDone(true);
-                                        sendWatcherResponse(watcher, 
HttpResponseStatus.NOT_MODIFIED);
-                                    }
-                                    if (!watcher.isDone()) {
-                                        // Re-register
-                                        registryWatcher(watcher);
+                                    HttpContext context = 
watcher.getAsyncContext();
+                                    boolean isHttp2 = context instanceof 
HttpContext && context.isHttp2();
+                                    if (isHttp2) {
+                                        if 
(!context.getContext().channel().isActive()) {
+                                            watcher.setDone(true);
+                                            HTTP2_HEADERS_SENT.remove(watcher);
+                                        } else {
+                                            registryWatcher(watcher);

Review Comment:
   If the cluster state changes again during the interval before re-registering 
for the subscription queue, is there a risk—in extreme cases—that the 
notification could be lost?



##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -128,27 +174,63 @@ private void sendWatcherResponse(Watcher<HttpContext> 
watcher, HttpResponseStatu
             } else {
                 ctx.writeAndFlush(response);
             }
-        } else {
-            // HTTP/2 response (h2c support)
-            // Send headers frame
+            return;
+        }
+
+        // For HTTP/2, headers must be sent first on the initial response
+        if (sendHeaders) {
             Http2Headers headers = new 
DefaultHttp2Headers().status(nettyStatus.codeAsText());
-            headers.set(HttpHeaderNames.CONTENT_LENGTH, "0");
+            headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream; 
charset=utf-8");
+            headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
+
             ctx.write(new DefaultHttp2HeadersFrame(headers));
+        }
+
+        String group = watcher.getGroup();
+        String sse = buildSSEFormat(nettyStatus, closeStream, sendHeaders, 
group);
+
+        ByteBuf content = Unpooled.copiedBuffer(sse, StandardCharsets.UTF_8);
 
-            // Send empty data frame with endStream=true to close the stream
-            ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER, 
true))
-                    .addListener(f -> {
-                        if (!f.isSuccess()) {
-                            logger.warn("HTTP2 response send failed, 
group={}", group, f.cause());
-                        }
-                    });
+        // Send DATA frame (if closeStream is true, it will end the current 
stream)
+        ctx.write(new DefaultHttp2DataFrame(content, closeStream));
+        ctx.flush();
+    }
+
+    private String buildSSEFormat(
+            HttpResponseStatus nettyStatus, boolean closeStream, boolean 
sendHeaders, String group) {
+        // Determine event type (embedded in JSON, not in SSE event field)
+        String eventType;
+        if (sendHeaders) {
+            // Send keepalive event when stream is first established to 
confirm connection
+            eventType = "keepalive";
+        } else if (closeStream && nettyStatus == 
HttpResponseStatus.NOT_MODIFIED) {
+            // Timeout event, stream needs to be closed
+            eventType = "timeout";
+        } else {
+            // Normal cluster update event
+            eventType = "cluster-update";
         }
+
+        String json = String.format(

Review Comment:
   I hope that when an HTTP/2 connection is established, the server directly 
pushes the cluster change results to the client, rather than requiring the 
client to initiate a separate request to fetch the cluster information.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to