HBASE-20858 Port HBASE-20695 (Implement table level RegionServer replication metrics) to branch-1
Signed-off-by: Andrew Purtell <apurt...@apache.org> Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9e39a200 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9e39a200 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9e39a200 Branch: refs/heads/branch-1.3 Commit: 9e39a2009648ba21d55676ea90b328941bb9b7db Parents: 614b5f6 Author: Xu Cang <xc...@salesforce.com> Authored: Fri Jul 6 16:36:05 2018 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Wed Dec 12 18:08:20 2018 -0800 ---------------------------------------------------------------------- .../replication/regionserver/MetricsSource.java | 43 +++++++++++++--- .../regionserver/ReplicationSource.java | 5 ++ .../replication/TestReplicationEndpoint.java | 52 ++++++++++++++++++++ 3 files changed, 94 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9e39a200/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 9b99f2a..56baa05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.HashMap; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -35,8 +33,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) public class MetricsSource { - private static final Log LOG = LogFactory.getLog(MetricsSource.class); - // tracks last shipped timestamp for each wal group private Map<String, Long> lastTimeStamps = new HashMap<String, Long>(); private long lastHFileRefsQueueSize = 0; @@ -44,7 +40,7 @@ public class MetricsSource { private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource globalSourceSource; - + private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable; /** * Constructor used to register the metrics @@ -56,7 +52,24 @@ public class MetricsSource { singleSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) .getSource(id); - globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); + globalSourceSource = CompatibilitySingletonFactory + .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); + singleSourceSourceByTable = new HashMap<>(); + } + + /** + * Constructor for injecting custom (or test) MetricsReplicationSourceSources + * @param id Name of the source this class is monitoring + * @param singleSourceSource Class to monitor id-scoped metrics + * @param globalSourceSource Class to monitor global-scoped metrics + */ + public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, + MetricsReplicationSourceSource globalSourceSource, + Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) { + this.id = id; + this.singleSourceSource = singleSourceSource; + this.globalSourceSource = globalSourceSource; + this.singleSourceSourceByTable = singleSourceSourceByTable; } /** @@ -72,6 +85,20 @@ public class MetricsSource { } /** + * Set the age of the last edit that was shipped group by table + * @param timestamp write time of the edit + * @param tableName String as group and tableName + */ + public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) { + long age = EnvironmentEdgeManager.currentTime() - timestamp; + if (!this.getSingleSourceSourceByTable().containsKey(tableName)) { + this.getSingleSourceSourceByTable().put(tableName, + CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) + .getSource(tableName)); + } + this.singleSourceSourceByTable.get(tableName).setLastShippedAge(age); + } + /** * Convenience method to use the last given timestamp to refresh the age of the last edit. Used * when replication fails and need to keep that metric accurate. * @param walGroupId id of the group to update @@ -262,4 +289,8 @@ public class MetricsSource { singleSourceSource.incrCompletedRecoveryQueue(); globalSourceSource.incrCompletedRecoveryQueue(); } + + public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() { + return singleSourceSourceByTable; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/9e39a200/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 78b465c..8112553 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -1066,6 +1066,11 @@ public class ReplicationSource extends Thread int size = entries.size(); for (int i = 0; i < size; i++) { cleanUpHFileRefs(entries.get(i).getEdit()); + + TableName tableName = entries.get(i).getKey().getTablename(); + source.getSourceMetrics().setAgeOfLastShippedOpByTable( + entries.get(i).getKey().getWriteTime(), + tableName.getNameAsString()); } //Log and clean up WAL logs manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode, http://git-wip-us.apache.org/repos/asf/hbase/blob/9e39a200/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index d570549..c3822c1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -18,9 +18,15 @@ package org.apache.hadoop.hbase.replication; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; + import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -39,10 +45,16 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -253,6 +265,46 @@ public class TestReplicationEndpoint extends TestReplicationBase { admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); } + @Test + public void testMetricsSourceBaseSourcePassthrough(){ + /* + The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl + and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. + Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which + allows for custom JMX metrics. + This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through + the two layers of wrapping to the actual BaseSource. + */ + String id = "id"; + DynamicMetricsRegistry mockRegistry = new DynamicMetricsRegistry(id); + MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); + when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry); + MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); + when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); + + MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id); + MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms); + MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource); + Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>(); + MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource, + singleSourceSourceByTable); + + // check singleSourceSourceByTable metrics. + // singleSourceSourceByTable map entry will be created only + // after calling #setAgeOfLastShippedOpByTable + boolean containsRandomNewTable = source.getSingleSourceSourceByTable() + .containsKey("RandomNewTable"); + Assert.assertEquals(false, containsRandomNewTable); + source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable"); + containsRandomNewTable = source.getSingleSourceSourceByTable() + .containsKey("RandomNewTable"); + Assert.assertEquals(true, containsRandomNewTable); + MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable() + .get("RandomNewTable"); + // cannot put more concreate value here to verify because the age is arbitrary. + // as long as it's greater than 0, we see it as correct answer. + Assert.assertTrue(msr.getLastShippedAge() > 0); + } private void doPut(byte[] row) throws IOException { try (Connection connection = ConnectionFactory.createConnection(conf1)) {