HBASE-16855 Avoid NPE in MetricsConnectionâs construction (ChiaPing Tsai)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6df7554d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6df7554d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6df7554d Branch: refs/heads/branch-1 Commit: 6df7554d299ce72e6d8dca35c5de119a47f5594d Parents: 08498c6 Author: tedyu <yuzhih...@gmail.com> Authored: Mon Oct 17 09:35:02 2016 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Mon Oct 17 09:35:02 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/client/MetricsConnection.java | 38 +++++++++++++++++--- .../hbase/client/TestMetricsConnection.java | 36 ++++++++++++++++--- 2 files changed, 65 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6df7554d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index b3c8180..9dd803a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -22,6 +22,7 @@ import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.MetricsRegistry; import com.yammer.metrics.core.Timer; import com.yammer.metrics.reporting.JmxReporter; @@ -294,24 +295,38 @@ public class MetricsConnection implements StatisticTrackable { public MetricsConnection(final ConnectionManager.HConnectionImplementation conn) { this.scope = conn.toString(); this.registry = new MetricsRegistry(); - final ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool(); - final ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool(); - this.registry.newGauge(this.getClass(), "executorPoolActiveThreads", scope, + this.registry.newGauge(getExecutorPoolName(), new RatioGauge() { @Override protected double getNumerator() { + ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool(); + if (batchPool == null) { + return 0; + } return batchPool.getActiveCount(); } @Override protected double getDenominator() { + ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool(); + if (batchPool == null) { + return 0; + } return batchPool.getMaximumPoolSize(); } }); - this.registry.newGauge(this.getClass(), "metaPoolActiveThreads", scope, + this.registry.newGauge(getMetaPoolName(), new RatioGauge() { @Override protected double getNumerator() { + ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool(); + if (metaPool == null) { + return 0; + } return metaPool.getActiveCount(); } @Override protected double getDenominator() { + ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool(); + if (metaPool == null) { + return 0; + } return metaPool.getMaximumPoolSize(); } }); @@ -334,6 +349,21 @@ public class MetricsConnection implements StatisticTrackable { this.reporter.start(); } + @VisibleForTesting + final MetricName getExecutorPoolName() { + return new MetricName(getClass(), "executorPoolActiveThreads", scope); + } + + @VisibleForTesting + final MetricName getMetaPoolName() { + return new MetricName(getClass(), "metaPoolActiveThreads", scope); + } + + @VisibleForTesting + MetricsRegistry getMetricsRegistry() { + return registry; + } + public void shutdown() { this.reporter.shutdown(); this.registry.shutdown(); http://git-wip-us.apache.org/repos/asf/hbase/blob/6df7554d/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java index 10a913e..b9a9a73 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import com.google.protobuf.ByteString; +import com.yammer.metrics.util.RatioGauge; import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; @@ -37,24 +38,43 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; - import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.junit.Assert.assertEquals; @Category({MetricsTests.class, SmallTests.class}) public class TestMetricsConnection { - + private static final ExecutorService BATCH_POOL = Executors.newFixedThreadPool(2); private static MetricsConnection METRICS; - + private static final AtomicBoolean closed = new AtomicBoolean(false); + private static final Runnable RUNNER = new Runnable() { + @Override + public void run() { + try { + while (!closed.get() && !Thread.interrupted()) { + TimeUnit.MILLISECONDS.sleep(10); + } + } catch (InterruptedException e) { + } + } + }; @BeforeClass public static void beforeClass() { HConnectionImplementation mocked = Mockito.mock(HConnectionImplementation.class); Mockito.when(mocked.toString()).thenReturn("mocked-connection"); - METRICS = new MetricsConnection(Mockito.mock(HConnectionImplementation.class)); + Mockito.when(mocked.getCurrentBatchPool()).thenReturn(BATCH_POOL); + BATCH_POOL.submit(RUNNER); + METRICS = new MetricsConnection(mocked); } @AfterClass - public static void afterClass() { + public static void afterClass() throws InterruptedException { METRICS.shutdown(); + BATCH_POOL.shutdownNow(); + BATCH_POOL.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); } @Test @@ -116,5 +136,11 @@ public class TestMetricsConnection { Assert.assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.count()); Assert.assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.count()); } + RatioGauge executorMetrics = (RatioGauge) METRICS.getMetricsRegistry() + .allMetrics().get(METRICS.getExecutorPoolName()); + RatioGauge metaMetrics = (RatioGauge) METRICS.getMetricsRegistry() + .allMetrics().get(METRICS.getMetaPoolName()); + assertEquals((double) 0.5, executorMetrics.value(), 0); + assertEquals(Double.NaN, metaMetrics.value(), 0); } }