This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c07b408bdf HDDS-11208. Change RatisBlockOutputStream to use 
HDDS-11174. (#7072)
c07b408bdf is described below

commit c07b408bdfa315f3234f8bd32955ead8703d99ae
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Aug 26 14:16:26 2024 -0700

    HDDS-11208. Change RatisBlockOutputStream to use HDDS-11174. (#7072)
---
 .../hdds/scm/storage/AbstractCommitWatcher.java    |  1 -
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 75 +++++++++++-----------
 .../hdds/scm/storage/RatisBlockOutputStream.java   |  4 +-
 3 files changed, 40 insertions(+), 40 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
index 61bc73420e..7641de1274 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
@@ -124,7 +124,6 @@ abstract class AbstractCommitWatcher<BUFFER> {
    *
    * @param commitIndex log index to watch for
    * @return minimum commit index replicated to all nodes
-   * @throws IOException IOException in case watch gets timed out
    */
   CompletableFuture<XceiverClientReply> watchForCommitAsync(long commitIndex) {
     final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index e88b097c49..ca3e4e5374 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -375,10 +375,8 @@ public class BlockOutputStream extends OutputStream {
   }
 
   private void recordWatchForCommitAsync(CompletableFuture<PutBlockResult> 
putBlockResultFuture) {
-    recordFlushFuture(watchForCommitAsync(putBlockResultFuture));
-  }
+    final CompletableFuture<Void> flushFuture = 
putBlockResultFuture.thenCompose(x -> watchForCommit(x.commitIndex));
 
-  private void recordFlushFuture(CompletableFuture<Void> flushFuture) {
     Preconditions.checkState(Thread.holdsLock(this));
     this.lastFlushFuture = flushFuture;
     this.allPendingFlushFutures = 
allPendingFlushFutures.thenCombine(flushFuture, (last, curr) -> null);
@@ -444,7 +442,8 @@ public class BlockOutputStream extends OutputStream {
         writeChunk(buffer);
         putBlockFuture = executePutBlock(false, false);
       }
-      CompletableFuture<Void> watchForCommitAsync = 
watchForCommitAsync(putBlockFuture);
+      CompletableFuture<Void> watchForCommitAsync =
+          putBlockFuture.thenCompose(x -> watchForCommit(x.commitIndex));
       try {
         watchForCommitAsync.get();
       } catch (InterruptedException e) {
@@ -477,33 +476,44 @@ public class BlockOutputStream extends OutputStream {
   }
 
   /**
-   * Watch for a specific commit index.
+   * Send a watch request to wait until the given index became committed.
+   * When watch is not needed (e.g. EC), this is a NOOP.
+   *
+   * @param index the log index to wait for.
+   * @return the future of the reply.
    */
-  XceiverClientReply sendWatchForCommit(long commitIndex)
-      throws IOException {
-    return null;
+  CompletableFuture<XceiverClientReply> sendWatchForCommit(long index) {
+    return CompletableFuture.completedFuture(null);
   }
 
-  private void watchForCommit(long commitIndex) throws IOException {
-    checkOpen();
+  private CompletableFuture<Void> watchForCommit(long commitIndex) {
     try {
-      LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex);
-      final XceiverClientReply reply = sendWatchForCommit(commitIndex);
-      if (reply != null) {
-        List<DatanodeDetails> dnList = reply.getDatanodes();
-        if (!dnList.isEmpty()) {
-          Pipeline pipe = xceiverClient.getPipeline();
-
-          LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}",
-              blockID, pipe, dnList);
-          failedServers.addAll(dnList);
-        }
-      }
-    } catch (IOException ioe) {
-      setIoException(ioe);
-      throw getIoException();
+      checkOpen();
+    } catch (IOException e) {
+      throw new FlushRuntimeException(e);
+    }
+
+    LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex);
+    return sendWatchForCommit(commitIndex)
+        .thenAccept(this::checkReply)
+        .exceptionally(e -> {
+          throw new FlushRuntimeException(setIoException(e));
+        })
+        .whenComplete((r, e) -> LOG.debug("Leaving watchForCommit commitIndex 
= {}", commitIndex));
+  }
+
+  private void checkReply(XceiverClientReply reply) {
+    if (reply == null) {
+      return;
+    }
+    final List<DatanodeDetails> dnList = reply.getDatanodes();
+    if (dnList.isEmpty()) {
+      return;
     }
-    LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex);
+
+    LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}",
+        blockID, xceiverClient.getPipeline(), dnList);
+    failedServers.addAll(dnList);
   }
 
   void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
@@ -723,16 +733,6 @@ public class BlockOutputStream extends OutputStream {
     return lastFlushFuture;
   }
 
-  private CompletableFuture<Void> 
watchForCommitAsync(CompletableFuture<PutBlockResult> putBlockResultFuture) {
-    return putBlockResultFuture.thenAccept(x -> {
-      try {
-        watchForCommit(x.commitIndex);
-      } catch (IOException e) {
-        throw new FlushRuntimeException(e);
-      }
-    });
-  }
-
   @Override
   public void close() throws IOException {
     if (xceiverClientFactory != null && xceiverClient != null) {
@@ -771,7 +771,7 @@ public class BlockOutputStream extends OutputStream {
   }
 
 
-  public void setIoException(Exception e) {
+  public IOException setIoException(Throwable e) {
     IOException ioe = getIoException();
     if (ioe == null) {
       IOException exception =  new IOException(EXCEPTION_MSG + e.toString(), 
e);
@@ -782,6 +782,7 @@ public class BlockOutputStream extends OutputStream {
               "so subsequent request also encounters " +
               "Storage Container Exception {}", ioe, e);
     }
+    return getIoException();
   }
 
   void cleanup() {
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index d32c37eba6..0f95716bf9 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -102,8 +102,8 @@ public class RatisBlockOutputStream extends 
BlockOutputStream
   }
 
   @Override
-  XceiverClientReply sendWatchForCommit(long commitIndex) throws IOException {
-    return commitWatcher.watchForCommit(commitIndex);
+  CompletableFuture<XceiverClientReply> sendWatchForCommit(long index) {
+    return commitWatcher.watchForCommitAsync(index);
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to