HBASE-18137 Replication gets stuck for empty WALs Signed-off-by: Andrew Purtell <apurt...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6782dfca Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6782dfca Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6782dfca Branch: refs/heads/branch-1.3 Commit: 6782dfca4f3a2f5e02cc60a7c04d8d5d95ebc36e Parents: 6a216c7 Author: Vincent <vincentp...@gmail.com> Authored: Wed Jun 7 14:48:45 2017 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Sat Jun 10 12:26:12 2017 -0700 ---------------------------------------------------------------------- .../regionserver/ReplicationSource.java | 16 ++-- .../hbase/replication/TestReplicationBase.java | 1 + .../replication/TestReplicationSmallTests.java | 83 ++++++++++++++++++++ 3 files changed, 94 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6782dfca/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 65f581a..2285292 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -542,9 +542,9 @@ public class ReplicationSource extends Thread terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); } } + int sleepMultiplier = 1; // Loop until we close down while (isWorkerActive()) { - int sleepMultiplier = 1; // Sleep until replication is enabled again if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { @@ -622,7 +622,7 @@ public class ReplicationSource extends Thread if (considerDumping && sleepMultiplier == maxRetriesMultiplier && - processEndOfFile()) { + processEndOfFile(false)) { continue; } } @@ -749,7 +749,7 @@ public class ReplicationSource extends Thread } // If we didn't get anything and the queue has an object, it means we // hit the end of the file for sure - return seenEntries == 0 && processEndOfFile(); + return seenEntries == 0 && processEndOfFile(false); } /** @@ -930,11 +930,12 @@ public class ReplicationSource extends Thread // which throws a NPE if we open a file before any data node has the most recent block // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. LOG.warn("Got NPE opening reader, will retry."); - } else if (sleepMultiplier >= maxRetriesMultiplier) { + } else if (sleepMultiplier >= maxRetriesMultiplier + && conf.getBoolean("replication.source.eof.autorecovery", false)) { // TODO Need a better way to determine if a file is really gone but // TODO without scanning all logs dir LOG.warn("Waited too long for this file, considering dumping"); - return !processEndOfFile(); + return !processEndOfFile(true); } } return true; @@ -1100,7 +1101,7 @@ public class ReplicationSource extends Thread */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE", justification = "Yeah, this is how it works") - protected boolean processEndOfFile() { + protected boolean processEndOfFile(boolean dumpOnlyIfZeroLength) { // We presume this means the file we're reading is closed. if (this.queue.size() != 0) { // -1 means the wal wasn't closed cleanly. @@ -1135,6 +1136,9 @@ public class ReplicationSource extends Thread LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats() + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen())); } + if (dumpOnlyIfZeroLength && stat.getLen() != 0) { + return false; + } this.currentPath = null; this.repLogReader.finishCurrentFile(); this.reader = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/6782dfca/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index e52a600..01d9335 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -101,6 +101,7 @@ public class TestReplicationBase { conf1.setLong("replication.sleep.before.failover", 2000); conf1.setInt("replication.source.maxretriesmultiplier", 10); conf1.setFloat("replication.source.ratio", 1.0f); + conf1.setBoolean("replication.source.eof.autorecovery", true); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); http://git-wip-us.apache.org/repos/asf/hbase/blob/6782dfca/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 42a127f..e1f9f01 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -30,6 +30,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.ClusterStatus; @@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; @@ -53,9 +55,13 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; @@ -758,4 +764,81 @@ public class TestReplicationSmallTests extends TestReplicationBase { } } } + + // "replication.source.eof.autorecovery" must be true for this to pass + @Test + public void testEmptyWALRecovery() throws Exception { + final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size(); + + // for each RS, create an empty wal with same walGroupId + final List<Path> emptyWalPaths = new ArrayList<>(); + long ts = System.currentTimeMillis(); + for (int i = 0; i < numRs; i++) { + HRegionInfo regionInfo = + utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + Path currentWalPath = DefaultWALProvider.getCurrentFileName(wal); + String walGroupId = DefaultWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); + Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts); + utility1.getTestFileSystem().create(emptyWalPath).close(); + emptyWalPaths.add(emptyWalPath); + } + + // inject our empty wal into the replication queue + for (int i = 0; i < numRs; i++) { + Replication replicationService = + (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); + replicationService.preLogRoll(null, emptyWalPaths.get(i)); + replicationService.postLogRoll(null, emptyWalPaths.get(i)); + } + + // wait for ReplicationSource to start reading from our empty wal + waitForLogAdvance(numRs, emptyWalPaths, false); + + // roll the original wal, which enqueues a new wal behind our empty wal + for (int i = 0; i < numRs; i++) { + HRegionInfo regionInfo = + utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + wal.rollWriter(true); + } + + // ReplicationSource should advance past the empty wal, or else the test will fail + waitForLogAdvance(numRs, emptyWalPaths, true); + + // we're now writing to the new wal + // if everything works, the source should've stopped reading from the empty wal, and start + // replicating from the new wal + testSimplePutDelete(); + } + + /** + * Waits for the ReplicationSource to start reading from the given paths + * @param numRs number of regionservers + * @param emptyWalPaths path for each regionserver + * @param invert if true, waits until ReplicationSource is NOT reading from the given paths + */ + private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths, + final boolean invert) throws Exception { + Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + for (int i = 0; i < numRs; i++) { + Replication replicationService = (Replication) utility1.getHBaseCluster() + .getRegionServer(i).getReplicationSourceService(); + for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() + .getSources()) { + ReplicationSource source = (ReplicationSource) rsi; + if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) { + return false; + } + if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) { + return false; + } + } + } + return true; + } + }); + } }