This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push: new 457234c HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length(addendum) (#2055) 457234c is described below commit 457234c695a4950fea5b88c3e9666028f9b255d6 Author: chenglei <cheng...@apache.org> AuthorDate: Mon Aug 10 09:57:26 2020 +0800 HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length(addendum) (#2055) Signed-off-by: Duo Zhang <zhang...@apache.org> --- .../hbase/io/asyncfs/WrapperAsyncFSOutput.java | 8 ++++--- .../regionserver/wal/AsyncProtobufLogWriter.java | 26 ++++++++++++++++++++-- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java index 39f1f71..c7cc1fc 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java @@ -94,9 +94,11 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput { } } long pos = out.getPos(); - if(pos > this.syncedLength) { - this.syncedLength = pos; - } + /** + * This flush0 method could only be called by single thread, so here we could + * safely overwrite without any synchronization. + */ + this.syncedLength = pos; future.complete(pos); } catch (IOException e) { future.completeExceptionally(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 8c944b1..e834d65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -59,7 +59,11 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter private final Class<? extends Channel> channelClass; - private AsyncFSOutput output; + private volatile AsyncFSOutput output; + /** + * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed. + */ + private volatile long finalSyncedLength = -1; private static final class OutputStreamWrapper extends OutputStream implements ByteBufferWriter { @@ -156,6 +160,13 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter LOG.warn("normal close failed, try recover", e); output.recoverAndClose(null); } + /** + * We have to call {@link AsyncFSOutput#getSyncedLength()} + * after {@link AsyncFSOutput#close()} to get the final length + * synced to underlying filesystem because {@link AsyncFSOutput#close()} + * may also flush some data to underlying filesystem. + */ + this.finalSyncedLength = this.output.getSyncedLength(); this.output = null; } @@ -234,6 +245,17 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter @Override public long getSyncedLength() { - return this.output.getSyncedLength(); + /** + * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close} + * is a sync point, if output is null, then finalSyncedLength must set, + * so we can return finalSyncedLength, else we return output.getSyncedLength + */ + AsyncFSOutput outputToUse = this.output; + if(outputToUse == null) { + long finalSyncedLengthToUse = this.finalSyncedLength; + assert finalSyncedLengthToUse >= 0; + return finalSyncedLengthToUse; + } + return outputToUse.getSyncedLength(); } }