duongkame commented on code in PR #6859:
URL: https://github.com/apache/ozone/pull/6859#discussion_r1677044023
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java:
##########
@@ -607,40 +621,77 @@ protected void handleFlush(boolean close) throws
IOException {
private void handleFlushInternal(boolean close)
throws IOException, InterruptedException, ExecutionException {
checkOpen();
- // flush the last chunk data residing on the currentBuffer
- if (totalDataFlushedLength < writtenDataLength) {
- refreshCurrentBuffer();
- Preconditions.checkArgument(currentBuffer.position() > 0);
-
- // This can be a partially filled chunk. Since we are flushing the buffer
- // here, we just limit this buffer to the current position. So that next
- // write will happen in new buffer
- if (currentBuffer.hasRemaining()) {
- if (allowPutBlockPiggybacking) {
- updateFlushLength();
- writeChunkAndPutBlock(currentBuffer, close);
+ LOG.debug("Start handleFlushInternal close={}", close);
+ CompletableFuture<Void> toWaitFor;
+ synchronized (this) {
+ CompletableFuture<PutBlockResult> putBlockResultFuture = null;
+ // flush the last chunk data residing on the currentBuffer
+ if (totalWriteChunkLength < writtenDataLength) {
+ Preconditions.checkArgument(currentBuffer.position() > 0);
+
+ // This can be a partially filled chunk. Since we are flushing the
buffer
+ // here, we just limit this buffer to the current position. So that
next
+ // write will happen in new buffer
+ updateWriteChunkLength();
+ updatePutBlockLength();
+ if (currentBuffer.hasRemaining()) {
+ if (allowPutBlockPiggybacking) {
+ putBlockResultFuture = writeChunkAndPutBlock(currentBuffer, close);
+ } else {
+ writeChunk(currentBuffer);
+ putBlockResultFuture = executePutBlock(close, false);
+ }
+ if (!close) {
+ // reset current buffer.
+ currentBuffer = null;
+ currentBufferRemaining = 0;
+ }
} else {
- writeChunk(currentBuffer);
- updateFlushLength();
- executePutBlock(close, false);
+ putBlockResultFuture = executePutBlock(close, false);
+ // set lastFuture.
}
+ } else if (totalPutBlockLength < totalWriteChunkLength) {
+ // There're no pending written data, but there're uncommitted data.
+ updatePutBlockLength();
+ putBlockResultFuture = executePutBlock(close, false);
+ } else if (close) {
+ // forcing an "empty" putBlock if stream is being closed without new
+ // data since latest flush - we need to send the "EOF" flag
+ updatePutBlockLength();
+ putBlockResultFuture = executePutBlock(true, true);
} else {
- updateFlushLength();
- executePutBlock(close, false);
+ LOG.debug("Flushing without data");
+ }
+
+ if (putBlockResultFuture != null) {
+ this.lastFlushFuture = watchForCommitAsync(putBlockResultFuture);
+ }
+ toWaitFor = this.lastFlushFuture;
+ } // End of synchronized block.
+
+ if (toWaitFor != null) {
+ LOG.debug("Waiting for flush");
+ try {
+ toWaitFor.get();
+ } catch (ExecutionException ex) {
+ if (ex.getCause() instanceof FlushRuntimeException) {
+ throw ((FlushRuntimeException) ex.getCause()).cause;
+ } else {
+ throw ex;
+ }
}
- } else if (close) {
- // forcing an "empty" putBlock if stream is being closed without new
- // data since latest flush - we need to send the "EOF" flag
- executePutBlock(true, true);
}
- waitOnFlushFutures();
- watchForCommit(false);
- // just check again if the exception is hit while waiting for the
- // futures to ensure flush has indeed succeeded
+ LOG.debug("Flush done.");
+ }
- // irrespective of whether the commitIndex2flushedDataMap is empty
- // or not, ensure there is no exception set
- checkOpen();
+ private CompletableFuture<Void>
watchForCommitAsync(CompletableFuture<PutBlockResult> putBlockResultFuture) {
Review Comment:
The waiting thread is named `client-write-TID-{}`. Which is from
`responseExecutor` in `BlockOutputStream`. It take over handling the future
return from ratis client via `thenApplyAsync`.
```
CompletableFuture<PutBlockResult> executePutBlock(boolean close,
asyncReply = putBlockAsync(xceiverClient, blockData, close,
tokenString);
CompletableFuture<ContainerCommandResponseProto> future =
asyncReply.getResponse();
flushFuture = future.thenApplyAsync(e -> {
try {
validateResponse(e);
} catch (IOException sce) {
throw new CompletionException(sce);
}
// if the ioException is not set, putBlock is successful
if (getIoException() == null && !force) {
handleSuccessfulPutBlock(e.getPutBlock().getCommittedBlockLength(),
asyncReply, flushPos, byteBufferList);
}
return e;
}, responseExecutor)
```
The thread we use to wait for `putBlock` result and then call
`watchForCommit` is from the designated for handling putBlock response. It's
not the thread from ratis-client internal. That should be fine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]