This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c07b408bdf HDDS-11208. Change RatisBlockOutputStream to use
HDDS-11174. (#7072)
c07b408bdf is described below
commit c07b408bdfa315f3234f8bd32955ead8703d99ae
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Aug 26 14:16:26 2024 -0700
HDDS-11208. Change RatisBlockOutputStream to use HDDS-11174. (#7072)
---
.../hdds/scm/storage/AbstractCommitWatcher.java | 1 -
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 75 +++++++++++-----------
.../hdds/scm/storage/RatisBlockOutputStream.java | 4 +-
3 files changed, 40 insertions(+), 40 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
index 61bc73420e..7641de1274 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
@@ -124,7 +124,6 @@ abstract class AbstractCommitWatcher<BUFFER> {
*
* @param commitIndex log index to watch for
* @return minimum commit index replicated to all nodes
- * @throws IOException IOException in case watch gets timed out
*/
CompletableFuture<XceiverClientReply> watchForCommitAsync(long commitIndex) {
final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index e88b097c49..ca3e4e5374 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -375,10 +375,8 @@ public class BlockOutputStream extends OutputStream {
}
private void recordWatchForCommitAsync(CompletableFuture<PutBlockResult>
putBlockResultFuture) {
- recordFlushFuture(watchForCommitAsync(putBlockResultFuture));
- }
+ final CompletableFuture<Void> flushFuture =
putBlockResultFuture.thenCompose(x -> watchForCommit(x.commitIndex));
- private void recordFlushFuture(CompletableFuture<Void> flushFuture) {
Preconditions.checkState(Thread.holdsLock(this));
this.lastFlushFuture = flushFuture;
this.allPendingFlushFutures =
allPendingFlushFutures.thenCombine(flushFuture, (last, curr) -> null);
@@ -444,7 +442,8 @@ public class BlockOutputStream extends OutputStream {
writeChunk(buffer);
putBlockFuture = executePutBlock(false, false);
}
- CompletableFuture<Void> watchForCommitAsync =
watchForCommitAsync(putBlockFuture);
+ CompletableFuture<Void> watchForCommitAsync =
+ putBlockFuture.thenCompose(x -> watchForCommit(x.commitIndex));
try {
watchForCommitAsync.get();
} catch (InterruptedException e) {
@@ -477,33 +476,44 @@ public class BlockOutputStream extends OutputStream {
}
/**
- * Watch for a specific commit index.
+ * Send a watch request to wait until the given index became committed.
+ * When watch is not needed (e.g. EC), this is a NOOP.
+ *
+ * @param index the log index to wait for.
+ * @return the future of the reply.
*/
- XceiverClientReply sendWatchForCommit(long commitIndex)
- throws IOException {
- return null;
+ CompletableFuture<XceiverClientReply> sendWatchForCommit(long index) {
+ return CompletableFuture.completedFuture(null);
}
- private void watchForCommit(long commitIndex) throws IOException {
- checkOpen();
+ private CompletableFuture<Void> watchForCommit(long commitIndex) {
try {
- LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex);
- final XceiverClientReply reply = sendWatchForCommit(commitIndex);
- if (reply != null) {
- List<DatanodeDetails> dnList = reply.getDatanodes();
- if (!dnList.isEmpty()) {
- Pipeline pipe = xceiverClient.getPipeline();
-
- LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}",
- blockID, pipe, dnList);
- failedServers.addAll(dnList);
- }
- }
- } catch (IOException ioe) {
- setIoException(ioe);
- throw getIoException();
+ checkOpen();
+ } catch (IOException e) {
+ throw new FlushRuntimeException(e);
+ }
+
+ LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex);
+ return sendWatchForCommit(commitIndex)
+ .thenAccept(this::checkReply)
+ .exceptionally(e -> {
+ throw new FlushRuntimeException(setIoException(e));
+ })
+ .whenComplete((r, e) -> LOG.debug("Leaving watchForCommit commitIndex
= {}", commitIndex));
+ }
+
+ private void checkReply(XceiverClientReply reply) {
+ if (reply == null) {
+ return;
+ }
+ final List<DatanodeDetails> dnList = reply.getDatanodes();
+ if (dnList.isEmpty()) {
+ return;
}
- LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex);
+
+ LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}",
+ blockID, xceiverClient.getPipeline(), dnList);
+ failedServers.addAll(dnList);
}
void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
@@ -723,16 +733,6 @@ public class BlockOutputStream extends OutputStream {
return lastFlushFuture;
}
- private CompletableFuture<Void>
watchForCommitAsync(CompletableFuture<PutBlockResult> putBlockResultFuture) {
- return putBlockResultFuture.thenAccept(x -> {
- try {
- watchForCommit(x.commitIndex);
- } catch (IOException e) {
- throw new FlushRuntimeException(e);
- }
- });
- }
-
@Override
public void close() throws IOException {
if (xceiverClientFactory != null && xceiverClient != null) {
@@ -771,7 +771,7 @@ public class BlockOutputStream extends OutputStream {
}
- public void setIoException(Exception e) {
+ public IOException setIoException(Throwable e) {
IOException ioe = getIoException();
if (ioe == null) {
IOException exception = new IOException(EXCEPTION_MSG + e.toString(),
e);
@@ -782,6 +782,7 @@ public class BlockOutputStream extends OutputStream {
"so subsequent request also encounters " +
"Storage Container Exception {}", ioe, e);
}
+ return getIoException();
}
void cleanup() {
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index d32c37eba6..0f95716bf9 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -102,8 +102,8 @@ public class RatisBlockOutputStream extends
BlockOutputStream
}
@Override
- XceiverClientReply sendWatchForCommit(long commitIndex) throws IOException {
- return commitWatcher.watchForCommit(commitIndex);
+ CompletableFuture<XceiverClientReply> sendWatchForCommit(long index) {
+ return commitWatcher.watchForCommitAsync(index);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]