This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new ea1a9e9  HBASE-25536 Remove 0 length wal file from logQueue if it 
belongs to old sources (#2908)
ea1a9e9 is described below

commit ea1a9e9f98fd3429960e7e1902781b96a2c4b70f
Author: shahrs87 <shahr...@gmail.com>
AuthorDate: Fri Jan 29 04:17:30 2021 -0800

    HBASE-25536 Remove 0 length wal file from logQueue if it belongs to old 
sources (#2908)
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
    Signed-off-by: Geoffrey Jacoby <gjac...@apache.org>
    Signed-off-by: Bharath Vissapragada <bhara...@apache.org>
    Signed-off-by: Viraj Jasani <vjas...@apache.org>
---
 .../regionserver/ReplicationSourceWALReader.java   |  4 ++-
 .../regionserver/TestWALEntryStream.java           | 30 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index cbc4922..273b3d8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -243,8 +243,10 @@ class ReplicationSourceWALReader extends Thread {
   // (highly likely we've closed the current log), we've hit the max retries, 
and autorecovery is
   // enabled, then dump the log
   private void handleEofException(IOException e) {
+    // Dump the log even if logQueue size is 1 if the source is from recovered 
Source
+    // since we don't add current log to recovered source queue so it is safe 
to remove.
     if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
-      logQueue.size() > 1 && this.eofAutoRecovery) {
+      (source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) {
       try {
         if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
           LOG.warn("Forcing removal of 0 length log in queue: " + 
logQueue.peek());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 067ab63..1b450a5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -617,4 +618,33 @@ public class TestWALEntryStream {
       assertFalse(entryStream.hasNext());
     }
   }
+
+  /*
+    Test removal of 0 length log from logQueue if the source is a recovered 
source and
+    size of logQueue is only 1.
+   */
+  @Test
+  public void testEOFExceptionForRecoveredQueue() throws Exception {
+    PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
+    // Create a 0 length log.
+    Path emptyLog = new Path("emptyLog");
+    FSDataOutputStream fsdos = fs.create(emptyLog);
+    fsdos.close();
+    assertEquals(0, fs.getFileStatus(emptyLog).getLen());
+    queue.add(emptyLog);
+
+    Configuration conf = new Configuration(CONF);
+    // Override the max retries multiplier to fail fast.
+    conf.setInt("replication.source.maxretriesmultiplier", 1);
+    conf.setBoolean("replication.source.eof.autorecovery", true);
+    // Create a reader thread with source as recovered source.
+    ReplicationSource source = mockReplicationSource(true, conf);
+    when(source.isPeerEnabled()).thenReturn(true);
+    ReplicationSourceWALReader reader =
+      new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), 
source);
+    reader.run();
+    // ReplicationSourceWALReaderThread#handleEofException method will
+    // remove empty log from logQueue.
+    assertEquals(0, queue.size());
+  }
 }

Reply via email to