[ https://issues.apache.org/jira/browse/HBASE-24625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
chenglei updated HBASE-24625: ----------------------------- Description: By HBASE-1004, we introduce {{WALFileLengthProvider}} interface to keep the current writing wal file length by ourselves, {{WALEntryStream}} used by {{ReplicationSourceWALReader}} could only read WAL file byte size <= {{WALFileLengthProvider.getLogFileSizeIfBeingWritten}} if the WAL file is current been writing on the same RegionServer . {{AsyncFSWAL}} implements {{WALFileLengthProvider}} by {{AbstractFSWAL.getLogFileSizeIfBeingWritten}}, just as folllows : {code:java} public OptionalLong getLogFileSizeIfBeingWritten(Path path) { rollWriterLock.lock(); try { Path currentPath = getOldPath(); if (path.equals(currentPath)) { W writer = this.writer; return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty(); } else { return OptionalLong.empty(); } } finally { rollWriterLock.unlock(); } } {code} For {{AsyncFSWAL}}, above {{AsyncFSWAL.writer}} is {{AsyncProtobufLogWriter}} ,{{AsyncProtobufLogWriter.getLength}} is as follows: {code:java} public long getLength() { return length.get(); } {code} But for {{AsyncProtobufLogWriter}}, any append method may increase the above {{AsyncProtobufLogWriter.length}}, especially for following {{AsyncFSWAL.append}} method just append the {{WALEntry}} to {{FanOutOneBlockAsyncDFSOutput.buf}}: {code:java} public void append(Entry entry) { int buffered = output.buffered(); try { entry.getKey(). getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() .writeDelimitedTo(asyncOutputWrapper); } catch (IOException e) { throw new AssertionError("should not happen", e); } try { for (Cell cell : entry.getEdit().getCells()) { cellEncoder.write(cell); } } catch (IOException e) { throw new AssertionError("should not happen", e); } length.addAndGet(output.buffered() - buffered); } {code} That is to say, {{AsyncFSWAL.getLogFileSizeIfBeingWritten}} could not reflect the file length which successfully synced to underlying HDFS, which is not as expected. was: By HBASE-1004, we introduce {{WALFileLengthProvider}} interface to keep the current writing wal file length by ourselves, {{WALEntryStream}} used by {{ReplicationSourceWALReader}} could only read WAL file byte size <= {{WALFileLengthProvider.getLogFileSizeIfBeingWritten}} if the WAL file is current been writing on the same RegionServer . {{AsyncFSWAL}} implements {{WALFileLengthProvider}} by {{AbstractFSWAL.getLogFileSizeIfBeingWritten}}, just as folllows : {code:java} public OptionalLong getLogFileSizeIfBeingWritten(Path path) { rollWriterLock.lock(); try { Path currentPath = getOldPath(); if (path.equals(currentPath)) { W writer = this.writer; return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty(); } else { return OptionalLong.empty(); } } finally { rollWriterLock.unlock(); } } {code} For {{AsyncFSWAL}}, above {{AsyncFSWAL.writer}} is {{AsyncProtobufLogWriter}} ,{{AsyncProtobufLogWriter.getLength}} is as follows: {code:java} public long getLength() { return length.get(); } {code} But for {{AsyncProtobufLogWriter}}, any append method may increase the above {{AsyncProtobufLogWriter.length}}, especial > AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced > file length. > ---------------------------------------------------------------------------------------- > > Key: HBASE-24625 > URL: https://issues.apache.org/jira/browse/HBASE-24625 > Project: HBase > Issue Type: Bug > Affects Versions: 2.3.0 > Reporter: chenglei > Priority: Major > > By HBASE-1004, we introduce {{WALFileLengthProvider}} interface to keep the > current writing wal file length by ourselves, {{WALEntryStream}} used by > {{ReplicationSourceWALReader}} could only read WAL file byte size <= > {{WALFileLengthProvider.getLogFileSizeIfBeingWritten}} if the WAL file is > current been writing on the same RegionServer . > {{AsyncFSWAL}} implements {{WALFileLengthProvider}} by > {{AbstractFSWAL.getLogFileSizeIfBeingWritten}}, just as folllows : > {code:java} > public OptionalLong getLogFileSizeIfBeingWritten(Path path) { > rollWriterLock.lock(); > try { > Path currentPath = getOldPath(); > if (path.equals(currentPath)) { > W writer = this.writer; > return writer != null ? OptionalLong.of(writer.getLength()) : > OptionalLong.empty(); > } else { > return OptionalLong.empty(); > } > } finally { > rollWriterLock.unlock(); > } > } > {code} > For {{AsyncFSWAL}}, above {{AsyncFSWAL.writer}} is > {{AsyncProtobufLogWriter}} ,{{AsyncProtobufLogWriter.getLength}} is as > follows: > {code:java} > public long getLength() { > return length.get(); > } > {code} > But for {{AsyncProtobufLogWriter}}, any append method may increase the above > {{AsyncProtobufLogWriter.length}}, especially for following > {{AsyncFSWAL.append}} > method just append the {{WALEntry}} to {{FanOutOneBlockAsyncDFSOutput.buf}}: > {code:java} > public void append(Entry entry) { > int buffered = output.buffered(); > try { > entry.getKey(). > > getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() > .writeDelimitedTo(asyncOutputWrapper); > } catch (IOException e) { > throw new AssertionError("should not happen", e); > } > try { > for (Cell cell : entry.getEdit().getCells()) { > cellEncoder.write(cell); > } > } catch (IOException e) { > throw new AssertionError("should not happen", e); > } > length.addAndGet(output.buffered() - buffered); > } > {code} > That is to say, {{AsyncFSWAL.getLogFileSizeIfBeingWritten}} could not reflect > the file length which successfully synced to underlying HDFS, which is not > as expected. -- This message was sent by Atlassian Jira (v8.3.4#803005)