YvCeung commented on code in PR #7586:
URL: https://github.com/apache/incubator-seata/pull/7586#discussion_r2384021716


##########
discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java:
##########
@@ -418,36 +433,109 @@ private static boolean watch() throws RetryableException 
{
         Map<String, Long> groupTerms = METADATA.getClusterTerm(clusterName);
         groupTerms.forEach((k, v) -> param.put(k, String.valueOf(v)));
         for (String group : groupTerms.keySet()) {
-            String tcAddress = queryHttpAddress(clusterName, group);
+            Node selectedNode = selectNodeForHttpAddress(clusterName, group);
+            String tcAddress = queryHttpAddress(clusterName, selectedNode);
+
             if (isTokenExpired()) {
-                refreshToken(tcAddress);
+                refreshToken(clusterName, selectedNode);
             }
             if (StringUtils.isNotBlank(jwtToken)) {
                 header.put(AUTHORIZATION_HEADER, jwtToken);
             }
-            try (CloseableHttpResponse response =
-                    HttpClientUtil.doPost("http://"; + tcAddress + 
"/metadata/v1/watch", param, header, 30000)) {
-                if (response != null) {
-                    StatusLine statusLine = response.getStatusLine();
-                    if (statusLine != null && statusLine.getStatusCode() == 
HttpStatus.SC_UNAUTHORIZED) {
-                        if (StringUtils.isNotBlank(USERNAME) && 
StringUtils.isNotBlank(PASSWORD)) {
-                            throw new RetryableException("Authentication 
failed!");
-                        } else {
-                            throw new AuthenticationFailedException(
-                                    "Authentication failed! you should 
configure the correct username and password.");
-                        }
+
+            ResponseProcessor processor = (responseBody, error) -> {
+                if (error != null) {
+                    LOGGER.error("Watch request failed: {}", 
error.getMessage(), error);
+                } else if (StringUtils.isNotBlank(responseBody)) {
+                    try {
+                        processWatchResponse(responseBody);
+                    } catch (Exception e) {
+                        LOGGER.error("Error processing watch response: {}", 
e.getMessage(), e);
                     }
-                    return statusLine != null && statusLine.getStatusCode() == 
HttpStatus.SC_OK;
                 }
-            } catch (IOException e) {
-                LOGGER.error("watch cluster node: {}, fail: {}", tcAddress, 
e.getMessage());
-                throw new RetryableException(e.getMessage(), e);
+            };
+
+            if (selectedNode != null && selectedNode.isHttp2Supported()) {
+                executeHttp2WatchRequest(tcAddress, param, header, processor);
+            } else {
+                executeHttpWatchRequest(tcAddress, param, header, processor);
             }
-            break;
+            return true;
         }
         return false;
     }
 
+    private static void executeHttpWatchRequest(
+            String tcAddress, Map<String, String> param, Map<String, String> 
header, ResponseProcessor processor) {
+        try (CloseableHttpResponse response =
+                HttpClientUtil.doPost("http://"; + tcAddress + 
"/metadata/v1/watch", param, header, 30000)) {
+            if (response != null) {
+                StatusLine statusLine = response.getStatusLine();
+                if (statusLine != null && statusLine.getStatusCode() == 
HttpStatus.SC_UNAUTHORIZED) {
+                    processor.process(null, new 
RetryableException("Authentication failed!"));
+                } else if (statusLine != null && statusLine.getStatusCode() == 
HttpStatus.SC_OK) {
+                    String responseBody = 
EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
+                    processor.process(responseBody, null);
+                } else {
+                    processor.process(
+                            null,
+                            new RetryableException("Invalid response status: "
+                                    + (statusLine != null ? 
statusLine.getStatusCode() : "unknown")));
+                }
+            } else {
+                processor.process(null, new RetryableException("Null 
response"));
+            }
+        } catch (Exception e) {
+            processor.process(null, new RetryableException(e.getMessage(), e));
+        }
+    }
+
+    private static void executeHttp2WatchRequest(
+            String tcAddress, Map<String, String> param, Map<String, String> 
header, ResponseProcessor processor) {
+        Http2ClientUtil.doPost(
+                "http://"; + tcAddress + "/metadata/v1/watch", param, header, 
new HttpCallback<Response>() {
+                    @Override
+                    public void onSuccess(Response response) {
+                        try {
+                            String responseBody = response.body().string();
+                            processor.process(responseBody, null);
+                        } catch (IOException e) {
+                            processor.process(null, e);
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable e) {
+                        processor.process(null, e);
+                    }
+
+                    @Override
+                    public void onCancelled() {
+                        processor.process(null, new 
RetryableException("Request cancelled"));

Review Comment:
   `RetryableException` is triggered in the `okhttp`  thread. The `watch` 
method does not throw this exception, which may result in it not being caught 
by the caller ?   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to