szetszwo commented on code in PR #699:
URL: https://github.com/apache/ratis/pull/699#discussion_r943039944
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java:
##########
@@ -91,23 +98,26 @@ void flush() throws IOException {
}
}
- CompletableFuture<Void> asyncFlush(ExecutorService executor) throws
IOException {
+ CompletableFuture<Long> asyncFlush(ExecutorService executor, Long
writtenIndex) throws IOException {
Review Comment:
Let's don't pass `writtenIndex`, which is not used anywhere except for
returning back in the future. It makes the method complicated.
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java:
##########
@@ -133,13 +143,19 @@ boolean isOpen() {
}
@Override
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
public void close() throws IOException {
if (!isOpen()) {
return;
}
try {
fileChannel.truncate(fileChannel.position());
+ callback.get().get();
Review Comment:
We should wait for the flush `callback` first and then truncate. BTW, let's
rename `callback` to `flushFuture` or `flushFutureSupplier`? Otherwise, it is
hard to know why does `callback` mean.
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java:
##########
@@ -91,23 +98,26 @@ void flush() throws IOException {
}
}
- CompletableFuture<Void> asyncFlush(ExecutorService executor) throws
IOException {
+ CompletableFuture<Long> asyncFlush(ExecutorService executor, Long
writtenIndex) throws IOException {
flushBuffer();
if (forced) {
return CompletableFuture.completedFuture(null);
}
- final CompletableFuture<Void> f =
CompletableFuture.supplyAsync(this::fileChannelForce, executor);
+ final CompletableFuture<Long> f = new CompletableFuture();
+ executor.execute(() -> fileChannelForce(writtenIndex, f));
forced = true;
+
return f;
}
- private Void fileChannelForce() {
+ private void fileChannelForce(Long lastWrittenIndex, CompletableFuture
future) {
Review Comment:
Similarly, let's don't pass `lastWrittenIndex` and `future`. A standard way
is to use ` CompletableFuture.supplyAsync(..)`.
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java:
##########
@@ -45,16 +50,18 @@ static BufferedWriteChannel open(File file, boolean append,
ByteBuffer buffer) t
fc.truncate(0);
}
Preconditions.assertSame(fc.size(), fc.position(), "fc.position");
- return new BufferedWriteChannel(fc, buffer);
+ return new BufferedWriteChannel(fc, buffer, closeCallback);
}
private final FileChannel fileChannel;
private final ByteBuffer writeBuffer;
private boolean forced = true;
+ private final Supplier<CompletableFuture> callback;
Review Comment:
Please specify the type parameter. Otherwise, there is a "Raw use of
parameterized class 'CompletableFuture'" warning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]