ivandika3 commented on code in PR #6014:
URL: https://github.com/apache/ozone/pull/6014#discussion_r1639291830


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java:
##########
@@ -218,29 +226,71 @@ public ConcurrentMap<UUID, Long> getCommitInfoMap() {
     return commitInfoMap;
   }
 
-  private CompletableFuture<RaftClientReply> sendRequestAsync(
-      ContainerCommandRequestProto request) {
-    return TracingUtil.executeInNewSpan(
-        "XceiverClientRatis." + request.getCmdType().name(),
-        () -> {
-          final ContainerCommandRequestMessage message
-              = ContainerCommandRequestMessage.toMessage(
-              request, TracingUtil.exportCurrentSpan());
-          if (HddsUtils.isReadOnly(request)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("sendCommandAsync ReadOnly {}", message);
-            }
-            return getClient().async().sendReadOnly(message);
+  private CompletableFuture<RaftClientReply> sendRequestAsyncInternal(
+      ContainerCommandRequestProto request, ReplicationLevel 
writeReplicationLevel) {
+    final ContainerCommandRequestMessage message =
+        ContainerCommandRequestMessage.toMessage(request, 
TracingUtil.exportCurrentSpan());
+    if (HddsUtils.isReadOnly(request)) {
+      LOG.debug("sendCommandAsync ReadOnly message {}", message);
+      return getClient().async().sendReadOnly(message);
+    } else {
+      LOG.debug("sendCommandAsync Write {} message {}", writeReplicationLevel, 
message);
+      return getClient().async().send(message, 
writeReplicationLevel).handle((reply, e) -> {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("sendCommandAsync Write {} message {}: reply = {}, 
exception =",
+              writeReplicationLevel, message, reply, e);
+        }
+        if (reply != null) {
+          // If Raft server gives a reply, just return it.
+          return reply;
+        }
+        // reply == null implies exception != null with the current Raft 
server implementation
+        final Collection<CommitInfoProto> commitInfos;
+        final long maxCommitIndex;
+        if (e instanceof CompletionException) {
+          final CompletionException ce = (CompletionException) e;
+          if (e.getCause() instanceof NotReplicatedException) {
+            // Unwrap exception to get commitInfos if NotReplicatedException 
is thrown
+            final NotReplicatedException nre = (NotReplicatedException) 
ce.getCause();
+            commitInfos = nre.getCommitInfos();
+            maxCommitIndex = commitInfos.stream()
+                .map(CommitInfoProto::getCommitIndex)
+                .max(Long::compareTo).orElse(-1L);

Review Comment:
   Just wondering whether the `NotReplicatedException` is always the first 
cause in `CompletionException`?
   
   Maybe we need to use `HddsClientUtils#containsException` similar to how 
`GroupMismatchException` is handled in HDDS-9826?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to