Repository: hbase Updated Branches: refs/heads/branch-2 1cb05a18b -> f73986d11
HBASE-20417 Do not read wal entries when peer is disabled Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f73986d1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f73986d1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f73986d1 Branch: refs/heads/branch-2 Commit: f73986d11045902e82668408b09d0488aef9d1c0 Parents: 1cb05a1 Author: zhangduo <zhang...@apache.org> Authored: Sat Apr 14 22:00:15 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Mon Apr 16 22:07:48 2018 +0800 ---------------------------------------------------------------------- .../regionserver/ReplicationSourceShipper.java | 15 ++--- .../ReplicationSourceWALReader.java | 4 ++ .../regionserver/TestWALEntryStream.java | 59 +++++++++++++++++++- 3 files changed, 68 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f73986d1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 2097d00..11fd660 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -87,12 +87,11 @@ public class ReplicationSourceShipper extends Thread { setWorkerState(WorkerState.RUNNING); // Loop until we close down while (isActive()) { - int sleepMultiplier = 1; // Sleep until replication is enabled again if (!source.isPeerEnabled()) { - if (sleepForRetries("Replication is disabled", sleepMultiplier)) { - sleepMultiplier++; - } + // The peer enabled check is in memory, not expensive, so do not need to increase the + // sleep interval as it may cause a long lag when we enable the peer. + sleepForRetries("Replication is disabled", 1); continue; } try { @@ -188,8 +187,8 @@ public class ReplicationSourceShipper extends Thread { } break; } catch (Exception ex) { - LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:" - + org.apache.hadoop.util.StringUtils.stringifyException(ex)); + LOG.warn("{} threw unknown exception:", + source.getReplicationEndpoint().getClass().getName(), ex); if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { sleepMultiplier++; } @@ -292,9 +291,7 @@ public class ReplicationSourceShipper extends Thread { */ public boolean sleepForRetries(String msg, int sleepMultiplier) { try { - if (LOG.isTraceEnabled()) { - LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); - } + LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); Thread.sleep(this.sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { LOG.debug("Interrupted while sleeping between retries"); http://git-wip-us.apache.org/repos/asf/hbase/blob/f73986d1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 7ba347f..64fd48d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -126,6 +126,10 @@ class ReplicationSourceWALReader extends Thread { source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), source.getSourceMetrics())) { while (isReaderRunning()) { // loop here to keep reusing stream while we can + if (!source.isPeerEnabled()) { + Threads.sleep(sleepForRetries); + continue; + } if (!checkQuota()) { continue; } http://git-wip-us.apache.org/repos/asf/hbase/blob/f73986d1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 2670756..35e4f82 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -30,7 +30,12 @@ import java.io.IOException; import java.util.NavigableMap; import java.util.OptionalLong; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -328,7 +333,7 @@ public class TestWALEntryStream { } } - private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { + private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); Server mockServer = Mockito.mock(Server.class); @@ -338,6 +343,12 @@ public class TestWALEntryStream { when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); + return source; + } + + private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { + ReplicationSource source = mockReplicationSource(recovered, conf); + when(source.isPeerEnabled()).thenReturn(true); ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); reader.start(); @@ -460,6 +471,52 @@ public class TestWALEntryStream { assertFalse(entryBatch.isEndOfFile()); } + @Test + public void testReplicationSourceWALReaderDisabled() + throws IOException, InterruptedException, ExecutionException { + appendEntriesToLogAndSync(3); + // get ending position + long position; + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + entryStream.next(); + entryStream.next(); + entryStream.next(); + position = entryStream.getPosition(); + } + + // start up a reader + Path walPath = walQueue.peek(); + ReplicationSource source = mockReplicationSource(false, CONF); + AtomicInteger invokeCount = new AtomicInteger(0); + AtomicBoolean enabled = new AtomicBoolean(false); + when(source.isPeerEnabled()).then(i -> { + invokeCount.incrementAndGet(); + return enabled.get(); + }); + + ReplicationSourceWALReader reader = + new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source); + reader.start(); + Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> { + return reader.take(); + }); + // make sure that the isPeerEnabled has been called several times + TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5); + // confirm that we can read nothing if the peer is disabled + assertFalse(future.isDone()); + // then enable the peer, we should get the batch + enabled.set(true); + WALEntryBatch entryBatch = future.get(); + + // should've batched up our entries + assertNotNull(entryBatch); + assertEquals(3, entryBatch.getWalEntries().size()); + assertEquals(position, entryBatch.getLastWalPosition()); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(3, entryBatch.getNbRowKeys()); + } + private String getRow(WAL.Entry entry) { Cell cell = entry.getEdit().getCells().get(0); return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());