Repository: hbase Updated Branches: refs/heads/branch-2 922894c96 -> 1e4f8491f
HBASE-18092: Removing a peer does not properly clean up the ReplicationSourceManager state and metrics Signed-off-by: tedyu <yuzhih...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1e4f8491 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1e4f8491 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1e4f8491 Branch: refs/heads/branch-2 Commit: 1e4f8491f7bd10c5507082a0e5ece54175969d8e Parents: 922894c Author: Ashu Pachauri <ashu210...@gmail.com> Authored: Thu May 25 18:24:38 2017 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Fri Jun 9 08:23:41 2017 -0700 ---------------------------------------------------------------------- .../replication/regionserver/MetricsSource.java | 6 +- .../replication/regionserver/Replication.java | 4 +- .../regionserver/ReplicationSource.java | 4 - .../regionserver/ReplicationSourceManager.java | 23 ++-- .../replication/ReplicationSourceDummy.java | 5 +- .../TestReplicationSourceManager.java | 116 +++++++++++++++---- 6 files changed, 115 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/1e4f8491/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 7a9ef9f..e39defd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -40,7 +40,6 @@ public class MetricsSource implements BaseSource { // tracks last shipped timestamp for each wal group private Map<String, Long> lastTimeStamps = new HashMap<>(); - private int lastQueueSize = 0; private long lastHFileRefsQueueSize = 0; private String id; @@ -181,11 +180,12 @@ public class MetricsSource implements BaseSource { /** Removes all metrics about this Source. */ public void clear() { - singleSourceSource.clear(); + int lastQueueSize = singleSourceSource.getSizeOfLogQueue(); globalSourceSource.decrSizeOfLogQueue(lastQueueSize); + singleSourceSource.decrSizeOfLogQueue(lastQueueSize); + singleSourceSource.clear(); globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); lastTimeStamps.clear(); - lastQueueSize = 0; lastHFileRefsQueueSize = 0; } http://git-wip-us.apache.org/repos/asf/hbase/blob/1e4f8491/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 9cc9c7c..6ff666e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -368,9 +368,7 @@ public class Replication extends WALActionsListener.Base implements // get source List<ReplicationSourceInterface> sources = this.replicationManager.getSources(); for (ReplicationSourceInterface source : sources) { - if (source instanceof ReplicationSource) { - sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); - } + sourceMetricsList.add(source.getSourceMetrics()); } // get old source http://git-wip-us.apache.org/repos/asf/hbase/blob/1e4f8491/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index b86f35f..d098fd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -540,10 +540,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf return sb.toString(); } - /** - * Get Replication Source Metrics - * @return sourceMetrics - */ @Override public MetricsSource getSourceMetrics() { return this.metrics; http://git-wip-us.apache.org/repos/asf/hbase/blob/1e4f8491/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index cb631c1..5b5e207 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -358,6 +358,20 @@ public class ReplicationSourceManager implements ReplicationListener { return this.oldsources; } + /** + * Get the normal source for a given peer + * @param peerId + * @return the normal source for the give peer if it exists, otherwise null. + */ + public ReplicationSourceInterface getSource(String peerId) { + for (ReplicationSourceInterface source: getSources()) { + if (source.getPeerId().equals(peerId)) { + return source; + } + } + return null; + } + @VisibleForTesting List<String> getAllQueues() { return replicationQueues.getAllQueues(); @@ -542,9 +556,7 @@ public class ReplicationSourceManager implements ReplicationListener { */ public void closeQueue(ReplicationSourceInterface src) { LOG.info("Done with the queue " + src.getPeerClusterZnode()); - if (src instanceof ReplicationSource) { - ((ReplicationSource) src).getSourceMetrics().clear(); - } + src.getSourceMetrics().clear(); this.sources.remove(src); deleteSource(src.getPeerClusterZnode(), true); this.walsById.remove(src.getPeerClusterZnode()); @@ -593,10 +605,7 @@ public class ReplicationSourceManager implements ReplicationListener { } for (ReplicationSourceInterface toRemove : srcToRemove) { toRemove.terminate(terminateMessage); - if (toRemove instanceof ReplicationSource) { - ((ReplicationSource) toRemove).getSourceMetrics().clear(); - } - this.sources.remove(toRemove); + closeQueue(toRemove); } deleteSource(id, true); } http://git-wip-us.apache.org/repos/asf/hbase/blob/1e4f8491/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 3a7f77b..e23e15b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -41,6 +41,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { ReplicationSourceManager manager; String peerClusterId; Path currentPath; + MetricsSource metrics; @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, @@ -50,11 +51,13 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { this.manager = manager; this.peerClusterId = peerClusterId; + this.metrics = metrics; } @Override public void enqueueLog(Path log) { this.currentPath = log; + metrics.incrSizeOfLogQueue(); } @Override @@ -112,7 +115,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public MetricsSource getSourceMetrics() { - return null; + return metrics; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/1e4f8491/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 26aee6d..e1b3756 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.lang.reflect.Field; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collection; @@ -438,6 +438,9 @@ public abstract class TestReplicationSourceManager { @Test public void testPeerRemovalCleanup() throws Exception{ String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); + final String peerId = "FakePeer"; + final ReplicationPeerConfig peerConfig = + new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"); try { DummyServer server = new DummyServer(); final ReplicationQueues rq = @@ -450,40 +453,103 @@ public abstract class TestReplicationSourceManager { FailInitializeDummyReplicationSource.class.getName()); final ReplicationPeers rp = manager.getReplicationPeers(); // Set up the znode and ReplicationPeer for the fake peer - rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase")); - // Wait for the peer to get created and connected - Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { - @Override - public boolean evaluate() throws Exception { - return (rp.getConnectedPeer("FakePeer") != null); - } - }); + // Don't wait for replication source to initialize, we know it won't. + addPeerAndWait(peerId, peerConfig, false); - // Make sure that the replication source was not initialized - List<ReplicationSourceInterface> sources = manager.getSources(); - for (ReplicationSourceInterface source : sources) { - assertNotEquals("FakePeer", source.getPeerId()); - } + // Sanity check + assertNull(manager.getSource(peerId)); // Create a replication queue for the fake peer - rq.addLog("FakePeer", "FakeFile"); + rq.addLog(peerId, "FakeFile"); // Unregister peer, this should remove the peer and clear all queues associated with it // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. - rp.unregisterPeer("FakePeer"); - Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { - @Override - public boolean evaluate() throws Exception { - List<String> peers = rp.getAllPeerIds(); - return (!rq.getAllQueues().contains("FakePeer")) - && (rp.getConnectedPeer("FakePeer") == null) - && (!peers.contains("FakePeer")); - } - }); + removePeerAndWait(peerId); + assertFalse(rq.getAllQueues().contains(peerId)); } finally { conf.set("replication.replicationsource.implementation", replicationSourceImplName); + removePeerAndWait(peerId); + } + } + + @Test + public void testRemovePeerMetricsCleanup() throws Exception { + final String peerId = "DummyPeer"; + final ReplicationPeerConfig peerConfig = + new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"); + try { + addPeerAndWait(peerId, peerConfig, true); + + ReplicationSourceInterface source = manager.getSource(peerId); + // Sanity check + assertNotNull(source); + // Retrieve the global replication metrics source + Field f = MetricsSource.class.getDeclaredField("globalSourceSource"); + f.setAccessible(true); + MetricsReplicationSourceSource globalSource = + (MetricsReplicationSourceSource)f.get(source.getSourceMetrics()); + int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); + + // Enqueue log and check if metrics updated + source.enqueueLog(new Path("abc")); + assertEquals(1, source.getSourceMetrics().getSizeOfLogQueue()); + assertEquals(1 + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + + // Removing the peer should reset the global metrics + removePeerAndWait(peerId); + assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + + // Adding the same peer back again should reset the single source metrics + addPeerAndWait(peerId, peerConfig, true); + source = manager.getSource(peerId); + assertNotNull(source); + assertEquals(0, source.getSourceMetrics().getSizeOfLogQueue()); + assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + } finally { + removePeerAndWait(peerId); } } + /** + * Add a peer and wait for it to initialize + * @param peerId + * @param peerConfig + * @param waitForSource Whether to wait for replication source to initialize + * @throws Exception + */ + private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, + final boolean waitForSource) throws Exception { + final ReplicationPeers rp = manager.getReplicationPeers(); + rp.registerPeer(peerId, peerConfig); + Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { + @Override public boolean evaluate() throws Exception { + if (waitForSource) { + return (manager.getSource(peerId) != null); + } else { + return (rp.getConnectedPeer(peerId) != null); + } + } + }); + } + + /** + * Remove a peer and wait for it to get cleaned up + * @param peerId + * @throws Exception + */ + private void removePeerAndWait(final String peerId) throws Exception { + final ReplicationPeers rp = manager.getReplicationPeers(); + if (rp.getAllPeerIds().contains(peerId)) { + rp.unregisterPeer(peerId); + } + Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { + @Override public boolean evaluate() throws Exception { + List<String> peers = rp.getAllPeerIds(); + return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null) + && (!peers.contains(peerId)); + } + }); + } + private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) { // 1. Create store files for the families Map<byte[], List<Path>> storeFiles = new HashMap<>(1);