Repository: hbase
Updated Branches:
  refs/heads/master 30817b922 -> 7b40f4f3e


HBASE-18092: Removing a peer does not properly clean up the 
ReplicationSourceManager state and metrics

Signed-off-by: tedyu <yuzhih...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7b40f4f3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7b40f4f3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7b40f4f3

Branch: refs/heads/master
Commit: 7b40f4f3ec1cdf278bf416db66284d62c4e078e0
Parents: 30817b9
Author: Ashu Pachauri <ashu210...@gmail.com>
Authored: Thu May 25 18:24:38 2017 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Fri Jun 9 08:23:04 2017 -0700

----------------------------------------------------------------------
 .../replication/regionserver/MetricsSource.java |   6 +-
 .../replication/regionserver/Replication.java   |   4 +-
 .../regionserver/ReplicationSource.java         |   4 -
 .../regionserver/ReplicationSourceManager.java  |  23 ++--
 .../replication/ReplicationSourceDummy.java     |   5 +-
 .../TestReplicationSourceManager.java           | 116 +++++++++++++++----
 6 files changed, 115 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7b40f4f3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 7a9ef9f..e39defd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -40,7 +40,6 @@ public class MetricsSource implements BaseSource {
 
   // tracks last shipped timestamp for each wal group
   private Map<String, Long> lastTimeStamps = new HashMap<>();
-  private int lastQueueSize = 0;
   private long lastHFileRefsQueueSize = 0;
   private String id;
 
@@ -181,11 +180,12 @@ public class MetricsSource implements BaseSource {
 
   /** Removes all metrics about this Source. */
   public void clear() {
-    singleSourceSource.clear();
+    int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
     globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
+    singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
+    singleSourceSource.clear();
     globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
     lastTimeStamps.clear();
-    lastQueueSize = 0;
     lastHFileRefsQueueSize = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b40f4f3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 9cc9c7c..6ff666e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -368,9 +368,7 @@ public class Replication extends WALActionsListener.Base 
implements
     // get source
     List<ReplicationSourceInterface> sources = 
this.replicationManager.getSources();
     for (ReplicationSourceInterface source : sources) {
-      if (source instanceof ReplicationSource) {
-        sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
-      }
+      sourceMetricsList.add(source.getSourceMetrics());
     }
 
     // get old source

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b40f4f3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index b86f35f..d098fd9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -540,10 +540,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     return sb.toString();
   }
 
-  /**
-   * Get Replication Source Metrics
-   * @return sourceMetrics
-   */
   @Override
   public MetricsSource getSourceMetrics() {
     return this.metrics;

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b40f4f3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index cb631c1..5b5e207 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -358,6 +358,20 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     return this.oldsources;
   }
 
+  /**
+   * Get the normal source for a given peer
+   * @param peerId
+   * @return the normal source for the give peer if it exists, otherwise null.
+   */
+  public ReplicationSourceInterface getSource(String peerId) {
+    for (ReplicationSourceInterface source: getSources()) {
+      if (source.getPeerId().equals(peerId)) {
+        return source;
+      }
+    }
+    return null;
+  }
+
   @VisibleForTesting
   List<String> getAllQueues() {
     return replicationQueues.getAllQueues();
@@ -542,9 +556,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    */
   public void closeQueue(ReplicationSourceInterface src) {
     LOG.info("Done with the queue " + src.getPeerClusterZnode());
-    if (src instanceof ReplicationSource) {
-      ((ReplicationSource) src).getSourceMetrics().clear();
-    }
+    src.getSourceMetrics().clear();
     this.sources.remove(src);
     deleteSource(src.getPeerClusterZnode(), true);
     this.walsById.remove(src.getPeerClusterZnode());
@@ -593,10 +605,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       }
       for (ReplicationSourceInterface toRemove : srcToRemove) {
         toRemove.terminate(terminateMessage);
-        if (toRemove instanceof ReplicationSource) {
-          ((ReplicationSource) toRemove).getSourceMetrics().clear();
-        }
-        this.sources.remove(toRemove);
+        closeQueue(toRemove);
       }
       deleteSource(id, true);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b40f4f3/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 3a7f77b..e23e15b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -41,6 +41,7 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
   ReplicationSourceManager manager;
   String peerClusterId;
   Path currentPath;
+  MetricsSource metrics;
 
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
@@ -50,11 +51,13 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
 
     this.manager = manager;
     this.peerClusterId = peerClusterId;
+    this.metrics = metrics;
   }
 
   @Override
   public void enqueueLog(Path log) {
     this.currentPath = log;
+    metrics.incrSizeOfLogQueue();
   }
 
   @Override
@@ -112,7 +115,7 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
 
   @Override
   public MetricsSource getSourceMetrics() {
-    return null;
+    return metrics;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b40f4f3/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 26aee6d..e1b3756 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -438,6 +438,9 @@ public abstract class TestReplicationSourceManager {
   @Test
   public void testPeerRemovalCleanup() throws Exception{
     String replicationSourceImplName = 
conf.get("replication.replicationsource.implementation");
+    final String peerId = "FakePeer";
+    final ReplicationPeerConfig peerConfig =
+        new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
     try {
       DummyServer server = new DummyServer();
       final ReplicationQueues rq =
@@ -450,40 +453,103 @@ public abstract class TestReplicationSourceManager {
           FailInitializeDummyReplicationSource.class.getName());
       final ReplicationPeers rp = manager.getReplicationPeers();
       // Set up the znode and ReplicationPeer for the fake peer
-      rp.registerPeer("FakePeer", new 
ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"));
-      // Wait for the peer to get created and connected
-      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return (rp.getConnectedPeer("FakePeer") != null);
-        }
-      });
+      // Don't wait for replication source to initialize, we know it won't.
+      addPeerAndWait(peerId, peerConfig, false);
 
-      // Make sure that the replication source was not initialized
-      List<ReplicationSourceInterface> sources = manager.getSources();
-      for (ReplicationSourceInterface source : sources) {
-        assertNotEquals("FakePeer", source.getPeerId());
-      }
+      // Sanity check
+      assertNull(manager.getSource(peerId));
 
       // Create a replication queue for the fake peer
-      rq.addLog("FakePeer", "FakeFile");
+      rq.addLog(peerId, "FakeFile");
       // Unregister peer, this should remove the peer and clear all queues 
associated with it
       // Need to wait for the ReplicationTracker to pick up the changes and 
notify listeners.
-      rp.unregisterPeer("FakePeer");
-      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          List<String> peers = rp.getAllPeerIds();
-          return (!rq.getAllQueues().contains("FakePeer"))
-              && (rp.getConnectedPeer("FakePeer") == null)
-              && (!peers.contains("FakePeer"));
-          }
-      });
+      removePeerAndWait(peerId);
+      assertFalse(rq.getAllQueues().contains(peerId));
     } finally {
       conf.set("replication.replicationsource.implementation", 
replicationSourceImplName);
+      removePeerAndWait(peerId);
+    }
+  }
+
+  @Test
+  public void testRemovePeerMetricsCleanup() throws Exception {
+    final String peerId = "DummyPeer";
+    final ReplicationPeerConfig peerConfig =
+        new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
+    try {
+      addPeerAndWait(peerId, peerConfig, true);
+
+      ReplicationSourceInterface source = manager.getSource(peerId);
+      // Sanity check
+      assertNotNull(source);
+      // Retrieve the global replication metrics source
+      Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
+      f.setAccessible(true);
+      MetricsReplicationSourceSource globalSource =
+          (MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
+      int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
+
+      // Enqueue log and check if metrics updated
+      source.enqueueLog(new Path("abc"));
+      assertEquals(1, source.getSourceMetrics().getSizeOfLogQueue());
+      assertEquals(1 + globalLogQueueSizeInitial, 
globalSource.getSizeOfLogQueue());
+
+      // Removing the peer should reset the global metrics
+      removePeerAndWait(peerId);
+      assertEquals(globalLogQueueSizeInitial, 
globalSource.getSizeOfLogQueue());
+
+      // Adding the same peer back again should reset the single source metrics
+      addPeerAndWait(peerId, peerConfig, true);
+      source = manager.getSource(peerId);
+      assertNotNull(source);
+      assertEquals(0, source.getSourceMetrics().getSizeOfLogQueue());
+      assertEquals(globalLogQueueSizeInitial, 
globalSource.getSizeOfLogQueue());
+    } finally {
+      removePeerAndWait(peerId);
     }
   }
 
+  /**
+   * Add a peer and wait for it to initialize
+   * @param peerId
+   * @param peerConfig
+   * @param waitForSource Whether to wait for replication source to initialize
+   * @throws Exception
+   */
+  private void addPeerAndWait(final String peerId, final ReplicationPeerConfig 
peerConfig,
+      final boolean waitForSource) throws Exception {
+    final ReplicationPeers rp = manager.getReplicationPeers();
+    rp.registerPeer(peerId, peerConfig);
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() throws Exception {
+        if (waitForSource) {
+          return (manager.getSource(peerId) != null);
+        } else {
+          return (rp.getConnectedPeer(peerId) != null);
+        }
+      }
+    });
+  }
+
+  /**
+   * Remove a peer and wait for it to get cleaned up
+   * @param peerId
+   * @throws Exception
+   */
+  private void removePeerAndWait(final String peerId) throws Exception {
+    final ReplicationPeers rp = manager.getReplicationPeers();
+    if (rp.getAllPeerIds().contains(peerId)) {
+      rp.unregisterPeer(peerId);
+    }
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() throws Exception {
+        List<String> peers = rp.getAllPeerIds();
+        return (!manager.getAllQueues().contains(peerId)) && 
(rp.getConnectedPeer(peerId) == null)
+            && (!peers.contains(peerId));
+      }
+    });
+  }
+
   private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
     // 1. Create store files for the families
     Map<byte[], List<Path>> storeFiles = new HashMap<>(1);

Reply via email to