funky-eyes commented on code in PR #7586:
URL: https://github.com/apache/incubator-seata/pull/7586#discussion_r2434465583


##########
discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java:
##########
@@ -418,36 +433,109 @@ private static boolean watch() throws RetryableException 
{
         Map<String, Long> groupTerms = METADATA.getClusterTerm(clusterName);
         groupTerms.forEach((k, v) -> param.put(k, String.valueOf(v)));
         for (String group : groupTerms.keySet()) {
-            String tcAddress = queryHttpAddress(clusterName, group);
+            Node selectedNode = selectNodeForHttpAddress(clusterName, group);
+            String tcAddress = queryHttpAddress(clusterName, selectedNode);
+
             if (isTokenExpired()) {
-                refreshToken(tcAddress);
+                refreshToken(clusterName, selectedNode);
             }
             if (StringUtils.isNotBlank(jwtToken)) {
                 header.put(AUTHORIZATION_HEADER, jwtToken);
             }
-            try (CloseableHttpResponse response =
-                    HttpClientUtil.doPost("http://"; + tcAddress + 
"/metadata/v1/watch", param, header, 30000)) {
-                if (response != null) {
-                    StatusLine statusLine = response.getStatusLine();
-                    if (statusLine != null && statusLine.getStatusCode() == 
HttpStatus.SC_UNAUTHORIZED) {
-                        if (StringUtils.isNotBlank(USERNAME) && 
StringUtils.isNotBlank(PASSWORD)) {
-                            throw new RetryableException("Authentication 
failed!");
-                        } else {
-                            throw new AuthenticationFailedException(
-                                    "Authentication failed! you should 
configure the correct username and password.");
-                        }
+
+            ResponseProcessor processor = (responseBody, error) -> {
+                if (error != null) {
+                    LOGGER.error("Watch request failed: {}", 
error.getMessage(), error);
+                } else if (StringUtils.isNotBlank(responseBody)) {
+                    try {
+                        processWatchResponse(responseBody);
+                    } catch (Exception e) {
+                        LOGGER.error("Error processing watch response: {}", 
e.getMessage(), e);
                     }
-                    return statusLine != null && statusLine.getStatusCode() == 
HttpStatus.SC_OK;
                 }
-            } catch (IOException e) {
-                LOGGER.error("watch cluster node: {}, fail: {}", tcAddress, 
e.getMessage());
-                throw new RetryableException(e.getMessage(), e);
+            };
+
+            if (selectedNode != null && selectedNode.isHttp2Supported()) {
+                executeHttp2WatchRequest(tcAddress, param, header, processor);
+            } else {
+                executeHttpWatchRequest(tcAddress, param, header, processor);
             }
-            break;
+            return true;
         }
         return false;
     }
 
+    private static void executeHttpWatchRequest(
+            String tcAddress, Map<String, String> param, Map<String, String> 
header, ResponseProcessor processor) {
+        try (CloseableHttpResponse response =
+                HttpClientUtil.doPost("http://"; + tcAddress + 
"/metadata/v1/watch", param, header, 30000)) {
+            if (response != null) {
+                StatusLine statusLine = response.getStatusLine();
+                if (statusLine != null && statusLine.getStatusCode() == 
HttpStatus.SC_UNAUTHORIZED) {
+                    processor.process(null, new 
RetryableException("Authentication failed!"));
+                } else if (statusLine != null && statusLine.getStatusCode() == 
HttpStatus.SC_OK) {
+                    String responseBody = 
EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
+                    processor.process(responseBody, null);
+                } else {
+                    processor.process(
+                            null,
+                            new RetryableException("Invalid response status: "
+                                    + (statusLine != null ? 
statusLine.getStatusCode() : "unknown")));
+                }
+            } else {
+                processor.process(null, new RetryableException("Null 
response"));
+            }
+        } catch (Exception e) {
+            processor.process(null, new RetryableException(e.getMessage(), e));
+        }
+    }
+
+    private static void executeHttp2WatchRequest(
+            String tcAddress, Map<String, String> param, Map<String, String> 
header, ResponseProcessor processor) {
+        Http2ClientUtil.doPost(

Review Comment:
   > Could I ask you to provide some more explanation? I'm not yet sure what 
exactly is causing concern.
   
   With HTTP/2, configuration changes are delivered by the server via 
asynchronous pushes. Our current while(true) logic is based on HTTP/1.1 
long-polling: the loop blocks synchronously waiting for the server response. 
Because HTTP/2 uses asynchronous server push, that while(true) no longer 
provides the blocking behavior and will instead continuously issue watch 
requests and metadata-fetch requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to