Repository: hbase
Updated Branches:
  refs/heads/branch-1.2 96e48c3df -> c1289960d


HBASE-18137 Replication gets stuck for empty WALs

Signed-off-by: Andrew Purtell <apurt...@apache.org>

Conflicts:
        
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c1289960
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c1289960
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c1289960

Branch: refs/heads/branch-1.2
Commit: c1289960dd7ebf94c615d00ef9c77c27737496e1
Parents: 96e48c3
Author: Vincent <vincentp...@gmail.com>
Authored: Wed Jun 7 14:48:45 2017 -0700
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Sat Jun 10 12:47:18 2017 -0700

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         | 16 ++--
 .../hbase/replication/TestReplicationBase.java  |  1 +
 .../replication/TestReplicationSmallTests.java  | 83 ++++++++++++++++++++
 3 files changed, 94 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c1289960/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2285a5e..a59c3c8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -512,9 +512,9 @@ public class ReplicationSource extends Thread
           terminate("Couldn't get the position of this recovered queue " + 
peerClusterZnode, e);
         }
       }
+      int sleepMultiplier = 1;
       // Loop until we close down
       while (isWorkerActive()) {
-        int sleepMultiplier = 1;
         // Sleep until replication is enabled again
         if (!isPeerEnabled()) {
           if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
@@ -591,7 +591,7 @@ public class ReplicationSource extends Thread
 
             if (considerDumping &&
                 sleepMultiplier == maxRetriesMultiplier &&
-                processEndOfFile()) {
+                processEndOfFile(false)) {
               continue;
             }
           }
@@ -717,7 +717,7 @@ public class ReplicationSource extends Thread
       }
       // If we didn't get anything and the queue has an object, it means we
       // hit the end of the file for sure
-      return seenEntries == 0 && processEndOfFile();
+      return seenEntries == 0 && processEndOfFile(false);
     }
 
     /**
@@ -846,11 +846,12 @@ public class ReplicationSource extends Thread
           // which throws a NPE if we open a file before any data node has the 
most recent block
           // Just sleep and retry. Will require re-reading compressed WALs for 
compressionContext.
           LOG.warn("Got NPE opening reader, will retry.");
-        } else if (sleepMultiplier >= maxRetriesMultiplier) {
+        } else if (sleepMultiplier >= maxRetriesMultiplier
+            && conf.getBoolean("replication.source.eof.autorecovery", false)) {
           // TODO Need a better way to determine if a file is really gone but
           // TODO without scanning all logs dir
           LOG.warn("Waited too long for this file, considering dumping");
-          return !processEndOfFile();
+          return !processEndOfFile(true);
         }
       }
       return true;
@@ -990,7 +991,7 @@ public class ReplicationSource extends Thread
      */
     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"DE_MIGHT_IGNORE",
         justification = "Yeah, this is how it works")
-    protected boolean processEndOfFile() {
+    protected boolean processEndOfFile(boolean dumpOnlyIfZeroLength) {
       // We presume this means the file we're reading is closed.
       if (this.queue.size() != 0) {
         // -1 means the wal wasn't closed cleanly.
@@ -1025,6 +1026,9 @@ public class ReplicationSource extends Thread
           LOG.trace("Reached the end of log " + this.currentPath + ", stats: " 
+ getStats()
               + ", and the length of the file is " + (stat == null ? "N/A" : 
stat.getLen()));
         }
+        if (dumpOnlyIfZeroLength && stat.getLen() != 0) {
+          return false;
+        }
         this.currentPath = null;
         this.repLogReader.finishCurrentFile();
         this.reader = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1289960/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index e52a600..01d9335 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -101,6 +101,7 @@ public class TestReplicationBase {
     conf1.setLong("replication.sleep.before.failover", 2000);
     conf1.setInt("replication.source.maxretriesmultiplier", 10);
     conf1.setFloat("replication.source.ratio", 1.0f);
+    conf1.setBoolean("replication.source.eof.autorecovery", true);
 
     utility1 = new HBaseTestingUtility(conf1);
     utility1.startMiniZKCluster();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1289960/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index bb78c24..366591f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -30,6 +30,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.ClusterStatus;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
@@ -53,9 +55,13 @@ import 
org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -715,4 +721,81 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
 
     hadmin.close();
   }
+
+  // "replication.source.eof.autorecovery" must be true for this to pass
+  @Test
+  public void testEmptyWALRecovery() throws Exception {
+    final int numRs = 
utility1.getHBaseCluster().getRegionServerThreads().size();
+
+    // for each RS, create an empty wal with same walGroupId
+    final List<Path> emptyWalPaths = new ArrayList<>();
+    long ts = System.currentTimeMillis();
+    for (int i = 0; i < numRs; i++) {
+      HRegionInfo regionInfo =
+          
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+      WAL wal = 
utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+      Path currentWalPath = DefaultWALProvider.getCurrentFileName(wal);
+      String walGroupId = 
DefaultWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+      Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." 
+ ts);
+      utility1.getTestFileSystem().create(emptyWalPath).close();
+      emptyWalPaths.add(emptyWalPath);
+    }
+
+    // inject our empty wal into the replication queue
+    for (int i = 0; i < numRs; i++) {
+      Replication replicationService =
+          (Replication) 
utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+      replicationService.preLogRoll(null, emptyWalPaths.get(i));
+      replicationService.postLogRoll(null, emptyWalPaths.get(i));
+    }
+
+    // wait for ReplicationSource to start reading from our empty wal
+    waitForLogAdvance(numRs, emptyWalPaths, false);
+
+    // roll the original wal, which enqueues a new wal behind our empty wal
+    for (int i = 0; i < numRs; i++) {
+      HRegionInfo regionInfo =
+          
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+      WAL wal = 
utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+      wal.rollWriter(true);
+    }
+
+    // ReplicationSource should advance past the empty wal, or else the test 
will fail
+    waitForLogAdvance(numRs, emptyWalPaths, true);
+
+    // we're now writing to the new wal
+    // if everything works, the source should've stopped reading from the 
empty wal, and start
+    // replicating from the new wal
+    testSimplePutDelete();
+  }
+
+  /**
+   * Waits for the ReplicationSource to start reading from the given paths
+   * @param numRs number of regionservers
+   * @param emptyWalPaths path for each regionserver
+   * @param invert if true, waits until ReplicationSource is NOT reading from 
the given paths
+   */
+  private void waitForLogAdvance(final int numRs, final List<Path> 
emptyWalPaths,
+      final boolean invert) throws Exception {
+    Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        for (int i = 0; i < numRs; i++) {
+          Replication replicationService = (Replication) 
utility1.getHBaseCluster()
+              .getRegionServer(i).getReplicationSourceService();
+          for (ReplicationSourceInterface rsi : 
replicationService.getReplicationManager()
+              .getSources()) {
+            ReplicationSource source = (ReplicationSource) rsi;
+            if (!invert && 
!emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+              return false;
+            }
+            if (invert && 
emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    });
+  }
 }

Reply via email to