HBASE-20858 Port HBASE-20695 (Implement table level RegionServer replication 
metrics) to branch-1

Signed-off-by: Andrew Purtell <apurt...@apache.org>

Conflicts:
        
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
        
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9e39a200
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9e39a200
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9e39a200

Branch: refs/heads/branch-1.3
Commit: 9e39a2009648ba21d55676ea90b328941bb9b7db
Parents: 614b5f6
Author: Xu Cang <xc...@salesforce.com>
Authored: Fri Jul 6 16:36:05 2018 -0700
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Wed Dec 12 18:08:20 2018 -0800

----------------------------------------------------------------------
 .../replication/regionserver/MetricsSource.java | 43 +++++++++++++---
 .../regionserver/ReplicationSource.java         |  5 ++
 .../replication/TestReplicationEndpoint.java    | 52 ++++++++++++++++++++
 3 files changed, 94 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9e39a200/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 9b99f2a..56baa05 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -35,8 +33,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 public class MetricsSource {
 
-  private static final Log LOG = LogFactory.getLog(MetricsSource.class);
-
   // tracks last shipped timestamp for each wal group
   private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
   private long lastHFileRefsQueueSize = 0;
@@ -44,7 +40,7 @@ public class MetricsSource {
 
   private final MetricsReplicationSourceSource singleSourceSource;
   private final MetricsReplicationSourceSource globalSourceSource;
-
+  private Map<String, MetricsReplicationSourceSource> 
singleSourceSourceByTable;
 
   /**
    * Constructor used to register the metrics
@@ -56,7 +52,24 @@ public class MetricsSource {
     singleSourceSource =
         
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
             .getSource(id);
-    globalSourceSource = 
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
+    globalSourceSource = CompatibilitySingletonFactory
+        .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
+    singleSourceSourceByTable = new HashMap<>();
+  }
+
+  /**
+   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
+   * @param id Name of the source this class is monitoring
+   * @param singleSourceSource Class to monitor id-scoped metrics
+   * @param globalSourceSource Class to monitor global-scoped metrics
+   */
+  public MetricsSource(String id, MetricsReplicationSourceSource 
singleSourceSource,
+                       MetricsReplicationSourceSource globalSourceSource,
+                       Map<String, MetricsReplicationSourceSource> 
singleSourceSourceByTable) {
+    this.id = id;
+    this.singleSourceSource = singleSourceSource;
+    this.globalSourceSource = globalSourceSource;
+    this.singleSourceSourceByTable = singleSourceSourceByTable;
   }
 
   /**
@@ -72,6 +85,20 @@ public class MetricsSource {
   }
 
   /**
+   * Set the age of the last edit that was shipped group by table
+   * @param timestamp write time of the edit
+   * @param tableName String as group and tableName
+   */
+  public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
+    long age = EnvironmentEdgeManager.currentTime() - timestamp;
+    if (!this.getSingleSourceSourceByTable().containsKey(tableName)) {
+      this.getSingleSourceSourceByTable().put(tableName,
+          
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+          .getSource(tableName));
+    }
+    this.singleSourceSourceByTable.get(tableName).setLastShippedAge(age);
+  }
+  /**
    * Convenience method to use the last given timestamp to refresh the age of 
the last edit. Used
    * when replication fails and need to keep that metric accurate.
    * @param walGroupId id of the group to update
@@ -262,4 +289,8 @@ public class MetricsSource {
     singleSourceSource.incrCompletedRecoveryQueue();
     globalSourceSource.incrCompletedRecoveryQueue();
   }
+
+  public Map<String, MetricsReplicationSourceSource> 
getSingleSourceSourceByTable() {
+    return singleSourceSourceByTable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e39a200/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 78b465c..8112553 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -1066,6 +1066,11 @@ public class ReplicationSource extends Thread
             int size = entries.size();
             for (int i = 0; i < size; i++) {
               cleanUpHFileRefs(entries.get(i).getEdit());
+
+              TableName tableName = entries.get(i).getKey().getTablename();
+              source.getSourceMetrics().setAgeOfLastShippedOpByTable(
+                entries.get(i).getKey().getWriteTime(),
+                tableName.getNameAsString());
             }
             //Log and clean up WAL logs
             manager.logPositionAndCleanOldLogs(this.currentPath, 
peerClusterZnode,

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e39a200/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index d570549..c3822c1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -18,9 +18,15 @@
 
 package org.apache.hadoop.hbase.replication;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
+
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -39,10 +45,16 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import 
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -253,6 +265,46 @@ public class TestReplicationEndpoint extends 
TestReplicationBase {
     admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
   }
 
+  @Test
+  public void testMetricsSourceBaseSourcePassthrough(){
+    /*
+    The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
+    and a MetricsReplicationGlobalSourceSource, so that metrics get written to 
both namespaces.
+    Both of those classes wrap a MetricsReplicationSourceImpl that implements 
BaseSource, which
+    allows for custom JMX metrics.
+    This test checks to make sure the BaseSource decorator logic on 
MetricsSource actually calls down through
+    the two layers of wrapping to the actual BaseSource.
+    */
+    String id = "id";
+    DynamicMetricsRegistry mockRegistry = new DynamicMetricsRegistry(id);
+    MetricsReplicationSourceImpl singleRms = 
mock(MetricsReplicationSourceImpl.class);
+    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
+    MetricsReplicationSourceImpl globalRms = 
mock(MetricsReplicationSourceImpl.class);
+    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
+    MetricsReplicationSourceSource singleSourceSource = new 
MetricsReplicationSourceSourceImpl(singleRms, id);
+    MetricsReplicationSourceSource globalSourceSource = new 
MetricsReplicationGlobalSourceSource(globalRms);
+    MetricsReplicationSourceSource spyglobalSourceSource = 
spy(globalSourceSource);
+    Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = 
new HashMap<>();
+    MetricsSource source = new MetricsSource(id, singleSourceSource, 
spyglobalSourceSource,
+        singleSourceSourceByTable);
+
+    // check singleSourceSourceByTable metrics.
+    // singleSourceSourceByTable map entry will be created only
+    // after calling #setAgeOfLastShippedOpByTable
+    boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
+        .containsKey("RandomNewTable");
+    Assert.assertEquals(false, containsRandomNewTable);
+    source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
+    containsRandomNewTable = source.getSingleSourceSourceByTable()
+      .containsKey("RandomNewTable");
+    Assert.assertEquals(true, containsRandomNewTable);
+    MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
+        .get("RandomNewTable");
+    // cannot put more concreate value here to verify because the age is 
arbitrary.
+    // as long as it's greater than 0, we see it as correct answer.
+    Assert.assertTrue(msr.getLastShippedAge() > 0);
+  }
 
   private void doPut(byte[] row) throws IOException {
     try (Connection connection = ConnectionFactory.createConnection(conf1)) {

Reply via email to