funky-eyes commented on code in PR #7586:
URL: https://github.com/apache/incubator-seata/pull/7586#discussion_r2434465583
##########
discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java:
##########
@@ -418,36 +433,109 @@ private static boolean watch() throws RetryableException
{
Map<String, Long> groupTerms = METADATA.getClusterTerm(clusterName);
groupTerms.forEach((k, v) -> param.put(k, String.valueOf(v)));
for (String group : groupTerms.keySet()) {
- String tcAddress = queryHttpAddress(clusterName, group);
+ Node selectedNode = selectNodeForHttpAddress(clusterName, group);
+ String tcAddress = queryHttpAddress(clusterName, selectedNode);
+
if (isTokenExpired()) {
- refreshToken(tcAddress);
+ refreshToken(clusterName, selectedNode);
}
if (StringUtils.isNotBlank(jwtToken)) {
header.put(AUTHORIZATION_HEADER, jwtToken);
}
- try (CloseableHttpResponse response =
- HttpClientUtil.doPost("http://" + tcAddress +
"/metadata/v1/watch", param, header, 30000)) {
- if (response != null) {
- StatusLine statusLine = response.getStatusLine();
- if (statusLine != null && statusLine.getStatusCode() ==
HttpStatus.SC_UNAUTHORIZED) {
- if (StringUtils.isNotBlank(USERNAME) &&
StringUtils.isNotBlank(PASSWORD)) {
- throw new RetryableException("Authentication
failed!");
- } else {
- throw new AuthenticationFailedException(
- "Authentication failed! you should
configure the correct username and password.");
- }
+
+ ResponseProcessor processor = (responseBody, error) -> {
+ if (error != null) {
+ LOGGER.error("Watch request failed: {}",
error.getMessage(), error);
+ } else if (StringUtils.isNotBlank(responseBody)) {
+ try {
+ processWatchResponse(responseBody);
+ } catch (Exception e) {
+ LOGGER.error("Error processing watch response: {}",
e.getMessage(), e);
}
- return statusLine != null && statusLine.getStatusCode() ==
HttpStatus.SC_OK;
}
- } catch (IOException e) {
- LOGGER.error("watch cluster node: {}, fail: {}", tcAddress,
e.getMessage());
- throw new RetryableException(e.getMessage(), e);
+ };
+
+ if (selectedNode != null && selectedNode.isHttp2Supported()) {
+ executeHttp2WatchRequest(tcAddress, param, header, processor);
+ } else {
+ executeHttpWatchRequest(tcAddress, param, header, processor);
}
- break;
+ return true;
}
return false;
}
+ private static void executeHttpWatchRequest(
+ String tcAddress, Map<String, String> param, Map<String, String>
header, ResponseProcessor processor) {
+ try (CloseableHttpResponse response =
+ HttpClientUtil.doPost("http://" + tcAddress +
"/metadata/v1/watch", param, header, 30000)) {
+ if (response != null) {
+ StatusLine statusLine = response.getStatusLine();
+ if (statusLine != null && statusLine.getStatusCode() ==
HttpStatus.SC_UNAUTHORIZED) {
+ processor.process(null, new
RetryableException("Authentication failed!"));
+ } else if (statusLine != null && statusLine.getStatusCode() ==
HttpStatus.SC_OK) {
+ String responseBody =
EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
+ processor.process(responseBody, null);
+ } else {
+ processor.process(
+ null,
+ new RetryableException("Invalid response status: "
+ + (statusLine != null ?
statusLine.getStatusCode() : "unknown")));
+ }
+ } else {
+ processor.process(null, new RetryableException("Null
response"));
+ }
+ } catch (Exception e) {
+ processor.process(null, new RetryableException(e.getMessage(), e));
+ }
+ }
+
+ private static void executeHttp2WatchRequest(
+ String tcAddress, Map<String, String> param, Map<String, String>
header, ResponseProcessor processor) {
+ Http2ClientUtil.doPost(
Review Comment:
> Could I ask you to provide some more explanation? I'm not yet sure what
exactly is causing concern.
With HTTP/2, configuration changes are delivered by the server via
asynchronous pushes. Our current while(true) logic is based on HTTP/1.1
long-polling: the loop blocks synchronously waiting for the server response.
Because HTTP/2 uses asynchronous server push, that while(true) no longer
provides the blocking behavior and will instead continuously issue watch
requests and metadata-fetch requests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]