funky-eyes commented on code in PR #7586:
URL: https://github.com/apache/incubator-seata/pull/7586#discussion_r2373985805
##########
common/src/main/java/org/apache/seata/common/metadata/Node.java:
##########
@@ -99,6 +100,32 @@ public void setVersion(String version) {
this.version = version;
}
+ @JsonIgnore
+ public boolean isHttp2Supported() {
+ String baseVersion = "2.6.0";
Review Comment:
Could we use the Version class to compare versions instead?
##########
common/src/main/java/org/apache/seata/common/util/Http2ClientUtil.java:
##########
@@ -35,9 +35,9 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
-public class Http5ClientUtil {
+public class Http2ClientUtil {
Review Comment:
Could we name it HttpClientUtil? It handles more than just HTTP/2 requests.
##########
discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java:
##########
@@ -418,36 +433,109 @@ private static boolean watch() throws RetryableException
{
Map<String, Long> groupTerms = METADATA.getClusterTerm(clusterName);
groupTerms.forEach((k, v) -> param.put(k, String.valueOf(v)));
for (String group : groupTerms.keySet()) {
- String tcAddress = queryHttpAddress(clusterName, group);
+ Node selectedNode = selectNodeForHttpAddress(clusterName, group);
+ String tcAddress = queryHttpAddress(clusterName, selectedNode);
+
if (isTokenExpired()) {
- refreshToken(tcAddress);
+ refreshToken(clusterName, selectedNode);
}
if (StringUtils.isNotBlank(jwtToken)) {
header.put(AUTHORIZATION_HEADER, jwtToken);
}
- try (CloseableHttpResponse response =
- HttpClientUtil.doPost("http://" + tcAddress +
"/metadata/v1/watch", param, header, 30000)) {
- if (response != null) {
- StatusLine statusLine = response.getStatusLine();
- if (statusLine != null && statusLine.getStatusCode() ==
HttpStatus.SC_UNAUTHORIZED) {
- if (StringUtils.isNotBlank(USERNAME) &&
StringUtils.isNotBlank(PASSWORD)) {
- throw new RetryableException("Authentication
failed!");
- } else {
- throw new AuthenticationFailedException(
- "Authentication failed! you should
configure the correct username and password.");
- }
+
+ ResponseProcessor processor = (responseBody, error) -> {
+ if (error != null) {
+ LOGGER.error("Watch request failed: {}",
error.getMessage(), error);
+ } else if (StringUtils.isNotBlank(responseBody)) {
+ try {
+ processWatchResponse(responseBody);
+ } catch (Exception e) {
+ LOGGER.error("Error processing watch response: {}",
e.getMessage(), e);
}
- return statusLine != null && statusLine.getStatusCode() ==
HttpStatus.SC_OK;
}
- } catch (IOException e) {
- LOGGER.error("watch cluster node: {}, fail: {}", tcAddress,
e.getMessage());
- throw new RetryableException(e.getMessage(), e);
+ };
+
+ if (selectedNode != null && selectedNode.isHttp2Supported()) {
+ executeHttp2WatchRequest(tcAddress, param, header, processor);
+ } else {
+ executeHttpWatchRequest(tcAddress, param, header, processor);
}
- break;
+ return true;
}
return false;
}
+ private static void executeHttpWatchRequest(
+ String tcAddress, Map<String, String> param, Map<String, String>
header, ResponseProcessor processor) {
+ try (CloseableHttpResponse response =
+ HttpClientUtil.doPost("http://" + tcAddress +
"/metadata/v1/watch", param, header, 30000)) {
+ if (response != null) {
+ StatusLine statusLine = response.getStatusLine();
+ if (statusLine != null && statusLine.getStatusCode() ==
HttpStatus.SC_UNAUTHORIZED) {
+ processor.process(null, new
RetryableException("Authentication failed!"));
+ } else if (statusLine != null && statusLine.getStatusCode() ==
HttpStatus.SC_OK) {
+ String responseBody =
EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
+ processor.process(responseBody, null);
+ } else {
+ processor.process(
+ null,
+ new RetryableException("Invalid response status: "
+ + (statusLine != null ?
statusLine.getStatusCode() : "unknown")));
+ }
+ } else {
+ processor.process(null, new RetryableException("Null
response"));
+ }
+ } catch (Exception e) {
+ processor.process(null, new RetryableException(e.getMessage(), e));
+ }
+ }
+
+ private static void executeHttp2WatchRequest(
+ String tcAddress, Map<String, String> param, Map<String, String>
header, ResponseProcessor processor) {
+ Http2ClientUtil.doPost(
Review Comment:
Since it was changed to HTTP/2 server push, will the watch thread started in
startQueryMetadata that runs a while(true) loop become an infinite loop?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]