[ 
https://issues.apache.org/jira/browse/HBASE-24625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenglei updated HBASE-24625:
-----------------------------
    Description: 
By HBASE-14004, we introduce {{WALFileLengthProvider}} interface to keep the 
current writing wal file length  by ourselves,  {{WALEntryStream}} used by 
{{ReplicationSourceWALReader}} could only read WAL file byte size <= 
{{WALFileLengthProvider.getLogFileSizeIfBeingWritten}} if the WAL file is 
current been writing on the same RegionServer .

{{AsyncFSWAL}} implements {{WALFileLengthProvider}}  by 
{{AbstractFSWAL.getLogFileSizeIfBeingWritten}}, just as folllows :
{code:java}
   public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
    rollWriterLock.lock();
    try {
      Path currentPath = getOldPath();
      if (path.equals(currentPath)) {
        W writer = this.writer;
        return writer != null ? OptionalLong.of(writer.getLength()) : 
OptionalLong.empty();
      } else {
        return OptionalLong.empty();
      }
    } finally {
      rollWriterLock.unlock();
    }
  }
{code}

For {{AsyncFSWAL}},  above {{AsyncFSWAL.writer}}  is {{AsyncProtobufLogWriter}} 
,and {{AsyncProtobufLogWriter.getLength}}  is as follows:

{code:java}
    public long getLength() {
        return length.get();
    }
{code}

But for {{AsyncProtobufLogWriter}}, any append method may increase the above 
{{AsyncProtobufLogWriter.length}}, especially for following 
{{AsyncFSWAL.append}}
method just appending the {{WALEntry}} to {{FanOutOneBlockAsyncDFSOutput.buf}}:
{code:java}
     public void append(Entry entry) {
          int buffered = output.buffered();
          try {
              entry.getKey().
              
getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
              .writeDelimitedTo(asyncOutputWrapper);
          } catch (IOException e) {
             throw new AssertionError("should not happen", e);
          }
   
        try {
           for (Cell cell : entry.getEdit().getCells()) {
             cellEncoder.write(cell);
           }
          } catch (IOException e) {
           throw new AssertionError("should not happen", e);
         }
         length.addAndGet(output.buffered() - buffered);
     }
{code}

That is to say, {{AsyncFSWAL.getLogFileSizeIfBeingWritten}} could not reflect 
the  file length which successfully synced to underlying HDFS, which is not as 
expected.



  was:
By HBASE-14004, we introduce {{WALFileLengthProvider}} interface to keep the 
current writing wal file length  by ourselves,  {{WALEntryStream}} used by 
{{ReplicationSourceWALReader}} could only read WAL file byte size <= 
{{WALFileLengthProvider.getLogFileSizeIfBeingWritten}} if the WAL file is 
current been writing on the same RegionServer .

{{AsyncFSWAL}} implements {{WALFileLengthProvider}}  by 
{{AbstractFSWAL.getLogFileSizeIfBeingWritten}}, just as folllows :
{code:java}
   public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
    rollWriterLock.lock();
    try {
      Path currentPath = getOldPath();
      if (path.equals(currentPath)) {
        W writer = this.writer;
        return writer != null ? OptionalLong.of(writer.getLength()) : 
OptionalLong.empty();
      } else {
        return OptionalLong.empty();
      }
    } finally {
      rollWriterLock.unlock();
    }
  }
{code}

For {{AsyncFSWAL}},  above {{AsyncFSWAL.writer}}  is {{AsyncProtobufLogWriter}} 
,and {{AsyncProtobufLogWriter.getLength}}  is as follows:

{code:java}
    public long getLength() {
        return length.get();
    }
{code}

But for {{AsyncProtobufLogWriter}}, any append method may increase the above 
{{AsyncProtobufLogWriter.length}}, especially for following 
{{AsyncFSWAL.append}}
method just appending the {{WALEntry}} to {{FanOutOneBlockAsyncDFSOutput.buf}}:
{code:java}
     public void append(Entry entry) {
    int buffered = output.buffered();
    try {
      entry.getKey().
        
getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
          .writeDelimitedTo(asyncOutputWrapper);
    } catch (IOException e) {
      throw new AssertionError("should not happen", e);
    }
    try {
      for (Cell cell : entry.getEdit().getCells()) {
        cellEncoder.write(cell);
      }
    } catch (IOException e) {
      throw new AssertionError("should not happen", e);
    }
    length.addAndGet(output.buffered() - buffered);
  }
{code}

That is to say, {{AsyncFSWAL.getLogFileSizeIfBeingWritten}} could not reflect 
the  file length which successfully synced to underlying HDFS, which is not as 
expected.




> AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced 
> file length.
> ----------------------------------------------------------------------------------------
>
>                 Key: HBASE-24625
>                 URL: https://issues.apache.org/jira/browse/HBASE-24625
>             Project: HBase
>          Issue Type: Bug
>    Affects Versions: 2.3.0
>            Reporter: chenglei
>            Priority: Major
>
> By HBASE-14004, we introduce {{WALFileLengthProvider}} interface to keep the 
> current writing wal file length  by ourselves,  {{WALEntryStream}} used by 
> {{ReplicationSourceWALReader}} could only read WAL file byte size <= 
> {{WALFileLengthProvider.getLogFileSizeIfBeingWritten}} if the WAL file is 
> current been writing on the same RegionServer .
> {{AsyncFSWAL}} implements {{WALFileLengthProvider}}  by 
> {{AbstractFSWAL.getLogFileSizeIfBeingWritten}}, just as folllows :
> {code:java}
>    public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
>     rollWriterLock.lock();
>     try {
>       Path currentPath = getOldPath();
>       if (path.equals(currentPath)) {
>         W writer = this.writer;
>         return writer != null ? OptionalLong.of(writer.getLength()) : 
> OptionalLong.empty();
>       } else {
>         return OptionalLong.empty();
>       }
>     } finally {
>       rollWriterLock.unlock();
>     }
>   }
> {code}
> For {{AsyncFSWAL}},  above {{AsyncFSWAL.writer}}  is 
> {{AsyncProtobufLogWriter}} ,and {{AsyncProtobufLogWriter.getLength}}  is as 
> follows:
> {code:java}
>     public long getLength() {
>         return length.get();
>     }
> {code}
> But for {{AsyncProtobufLogWriter}}, any append method may increase the above 
> {{AsyncProtobufLogWriter.length}}, especially for following 
> {{AsyncFSWAL.append}}
> method just appending the {{WALEntry}} to 
> {{FanOutOneBlockAsyncDFSOutput.buf}}:
> {code:java}
>      public void append(Entry entry) {
>           int buffered = output.buffered();
>           try {
>               entry.getKey().
>               
> getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
>               .writeDelimitedTo(asyncOutputWrapper);
>           } catch (IOException e) {
>              throw new AssertionError("should not happen", e);
>           }
>    
>         try {
>            for (Cell cell : entry.getEdit().getCells()) {
>              cellEncoder.write(cell);
>            }
>           } catch (IOException e) {
>            throw new AssertionError("should not happen", e);
>          }
>          length.addAndGet(output.buffered() - buffered);
>      }
> {code}
> That is to say, {{AsyncFSWAL.getLogFileSizeIfBeingWritten}} could not reflect 
> the  file length which successfully synced to underlying HDFS, which is not 
> as expected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to