[ 
https://issues.apache.org/jira/browse/HBASE-24625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenglei updated HBASE-24625:
-----------------------------
    Description: 
By HBASE-1004, we introduce {{WALFileLengthProvider}} interface to keep the 
current writing wal file length  by ourselves,  {{WALEntryStream}} used by 
{{ReplicationSourceWALReader}} could only read WAL file byte size <= 
{{WALFileLengthProvider.getLogFileSizeIfBeingWritten}} if the WAL file is 
current been writing on the same RegionServer .

{{AsyncFSWAL}} implements {{WALFileLengthProvider}}  by 
{{AbstractFSWAL.getLogFileSizeIfBeingWritten}}, just as folllows :
{code:java}
   public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
    rollWriterLock.lock();
    try {
      Path currentPath = getOldPath();
      if (path.equals(currentPath)) {
        W writer = this.writer;
        return writer != null ? OptionalLong.of(writer.getLength()) : 
OptionalLong.empty();
      } else {
        return OptionalLong.empty();
      }
    } finally {
      rollWriterLock.unlock();
    }
  }
{code}

For {{AsyncFSWAL}},  above {{AsyncFSWAL.writer}}  is {{AsyncProtobufLogWriter}} 
,{{AsyncProtobufLogWriter.getLength}}  is as follows:

{code:java}
    public long getLength() {
        return length.get();
    }
{code}

But for {{AsyncProtobufLogWriter}}, any append method may increase the above 
{{AsyncProtobufLogWriter.length}}, especially for following 
{{AsyncFSWAL.append}}
method just append the {{WALEntry}} to {{FanOutOneBlockAsyncDFSOutput.buf}}:
{code:java}
     public void append(Entry entry) {
    int buffered = output.buffered();
    try {
      entry.getKey().
        
getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
          .writeDelimitedTo(asyncOutputWrapper);
    } catch (IOException e) {
      throw new AssertionError("should not happen", e);
    }
    try {
      for (Cell cell : entry.getEdit().getCells()) {
        cellEncoder.write(cell);
      }
    } catch (IOException e) {
      throw new AssertionError("should not happen", e);
    }
    length.addAndGet(output.buffered() - buffered);
  }
{code}

That is to say, {{AsyncFSWAL.getLogFileSizeIfBeingWritten}} could not reflect 
the  file length which successfully synced to underlying HDFS, which is not as 
expected.



  was:
By HBASE-1004, we introduce {{WALFileLengthProvider}} interface to keep the 
current writing wal file length  by ourselves,  {{WALEntryStream}} used by 
{{ReplicationSourceWALReader}} could only read WAL file byte size <= 
{{WALFileLengthProvider.getLogFileSizeIfBeingWritten}} if the WAL file is 
current been writing on the same RegionServer .

{{AsyncFSWAL}} implements {{WALFileLengthProvider}}  by 
{{AbstractFSWAL.getLogFileSizeIfBeingWritten}}, just as folllows :
{code:java}
   public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
    rollWriterLock.lock();
    try {
      Path currentPath = getOldPath();
      if (path.equals(currentPath)) {
        W writer = this.writer;
        return writer != null ? OptionalLong.of(writer.getLength()) : 
OptionalLong.empty();
      } else {
        return OptionalLong.empty();
      }
    } finally {
      rollWriterLock.unlock();
    }
  }
{code}

For {{AsyncFSWAL}},  above {{AsyncFSWAL.writer}}  is {{AsyncProtobufLogWriter}} 
,{{AsyncProtobufLogWriter.getLength}}  is as follows:

{code:java}
    public long getLength() {
        return length.get();
    }
{code}

But for {{AsyncProtobufLogWriter}}, any append method may increase the above 
{{AsyncProtobufLogWriter.length}}, especial




> AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced 
> file length.
> ----------------------------------------------------------------------------------------
>
>                 Key: HBASE-24625
>                 URL: https://issues.apache.org/jira/browse/HBASE-24625
>             Project: HBase
>          Issue Type: Bug
>    Affects Versions: 2.3.0
>            Reporter: chenglei
>            Priority: Major
>
> By HBASE-1004, we introduce {{WALFileLengthProvider}} interface to keep the 
> current writing wal file length  by ourselves,  {{WALEntryStream}} used by 
> {{ReplicationSourceWALReader}} could only read WAL file byte size <= 
> {{WALFileLengthProvider.getLogFileSizeIfBeingWritten}} if the WAL file is 
> current been writing on the same RegionServer .
> {{AsyncFSWAL}} implements {{WALFileLengthProvider}}  by 
> {{AbstractFSWAL.getLogFileSizeIfBeingWritten}}, just as folllows :
> {code:java}
>    public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
>     rollWriterLock.lock();
>     try {
>       Path currentPath = getOldPath();
>       if (path.equals(currentPath)) {
>         W writer = this.writer;
>         return writer != null ? OptionalLong.of(writer.getLength()) : 
> OptionalLong.empty();
>       } else {
>         return OptionalLong.empty();
>       }
>     } finally {
>       rollWriterLock.unlock();
>     }
>   }
> {code}
> For {{AsyncFSWAL}},  above {{AsyncFSWAL.writer}}  is 
> {{AsyncProtobufLogWriter}} ,{{AsyncProtobufLogWriter.getLength}}  is as 
> follows:
> {code:java}
>     public long getLength() {
>         return length.get();
>     }
> {code}
> But for {{AsyncProtobufLogWriter}}, any append method may increase the above 
> {{AsyncProtobufLogWriter.length}}, especially for following 
> {{AsyncFSWAL.append}}
> method just append the {{WALEntry}} to {{FanOutOneBlockAsyncDFSOutput.buf}}:
> {code:java}
>      public void append(Entry entry) {
>     int buffered = output.buffered();
>     try {
>       entry.getKey().
>         
> getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
>           .writeDelimitedTo(asyncOutputWrapper);
>     } catch (IOException e) {
>       throw new AssertionError("should not happen", e);
>     }
>     try {
>       for (Cell cell : entry.getEdit().getCells()) {
>         cellEncoder.write(cell);
>       }
>     } catch (IOException e) {
>       throw new AssertionError("should not happen", e);
>     }
>     length.addAndGet(output.buffered() - buffered);
>   }
> {code}
> That is to say, {{AsyncFSWAL.getLogFileSizeIfBeingWritten}} could not reflect 
> the  file length which successfully synced to underlying HDFS, which is not 
> as expected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to