HIVE-10927 : Add number of HMS/HS2 connection metrics (Szehon, reviewed by Jimmy Xiang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f6ea8cb6 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f6ea8cb6 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f6ea8cb6 Branch: refs/heads/beeline-cli Commit: f6ea8cb6fd74c8ccef2b712c0e6da76c30266f53 Parents: 7df153d Author: Szehon Ho <sze...@cloudera.com> Authored: Wed Jul 8 11:38:41 2015 -0700 Committer: Szehon Ho <sze...@cloudera.com> Committed: Wed Jul 8 11:38:41 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/common/JvmPauseMonitor.java | 7 ++- .../hive/common/metrics/LegacyMetrics.java | 30 ++++++++- .../hive/common/metrics/common/Metrics.java | 27 ++++++++ .../common/metrics/common/MetricsConstant.java | 35 +++++++++++ .../common/metrics/common/MetricsVariable.java | 26 ++++++++ .../metrics/metrics2/CodahaleMetrics.java | 58 ++++++++++++++++- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 +- .../metrics/metrics2/TestCodahaleMetrics.java | 42 +++++++++++++ .../hive/metastore/TestMetaStoreMetrics.java | 66 +++++++++++++++++--- .../hadoop/hive/metastore/HiveMetaStore.java | 59 ++++++++++++++--- .../hadoop/hive/metastore/ObjectStore.java | 30 ++++++++- .../service/cli/thrift/ThriftCLIService.java | 21 ++++++- 12 files changed, 378 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f6ea8cb6/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java index ec5ac4a..6ffaf94 100644 --- a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java +++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.util.Daemon; @@ -186,14 +187,14 @@ public class JvmPauseMonitor { ++numGcWarnThresholdExceeded; LOG.warn(formatMessage( extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); - incrementMetricsCounter("jvm.pause.warn-threshold", 1); + incrementMetricsCounter(MetricsConstant.JVM_PAUSE_WARN, 1); } else if (extraSleepTime > infoThresholdMs) { ++numGcInfoThresholdExceeded; LOG.info(formatMessage( extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); - incrementMetricsCounter("jvm.pause.info-threshold", 1); + incrementMetricsCounter(MetricsConstant.JVM_PAUSE_INFO, 1); } - incrementMetricsCounter("jvm.pause.extraSleepTime", extraSleepTime); + incrementMetricsCounter(MetricsConstant.JVM_EXTRA_SLEEP, extraSleepTime); totalGcExtraSleepTime += extraSleepTime; gcTimesBeforeSleep = gcTimesAfterSleep; } http://git-wip-us.apache.org/repos/asf/hive/blob/f6ea8cb6/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java index e811339..52d99e4 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.common.metrics; import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import java.io.IOException; @@ -162,11 +163,11 @@ public class LegacyMetrics implements Metrics { mbs.registerMBean(metrics, oname); } - public Long incrementCounter(String name) throws IOException{ + public Long incrementCounter(String name) throws IOException { return incrementCounter(name,Long.valueOf(1)); } - public Long incrementCounter(String name, long increment) throws IOException{ + public Long incrementCounter(String name, long increment) throws IOException { Long value; synchronized(metrics) { if (!metrics.hasKey(name)) { @@ -180,6 +181,29 @@ public class LegacyMetrics implements Metrics { return value; } + public Long decrementCounter(String name) throws IOException{ + return decrementCounter(name, Long.valueOf(1)); + } + + public Long decrementCounter(String name, long decrement) throws IOException { + Long value; + synchronized(metrics) { + if (!metrics.hasKey(name)) { + value = Long.valueOf(decrement); + set(name, -value); + } else { + value = ((Long)get(name)) - decrement; + set(name, value); + } + } + return value; + } + + @Override + public void addGauge(String name, MetricsVariable variable) { + //Not implemented. + } + public void set(String name, Object value) throws IOException{ metrics.put(name,value); } @@ -210,6 +234,8 @@ public class LegacyMetrics implements Metrics { } } + + /** * Resets the static context state to initial. * Used primarily for testing purposes. http://git-wip-us.apache.org/repos/asf/hive/blob/f6ea8cb6/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java index 27b69cc..49b2b32 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java @@ -61,4 +61,31 @@ public interface Metrics { * @throws IOException */ public Long incrementCounter(String name, long increment) throws IOException; + + + /** + * Decrements a counter of the given name by 1. + * @param name + * @return + * @throws IOException + */ + public Long decrementCounter(String name) throws IOException; + + /** + * Decrements a counter of the given name by "decrement" + * @param name + * @param decrement + * @return + * @throws IOException + */ + public Long decrementCounter(String name, long decrement) throws IOException; + + + /** + * Adds a metrics-gauge to track variable. For example, number of open database connections. + * @param name name of gauge + * @param variable variable to track. + * @throws IOException + */ + public void addGauge(String name, final MetricsVariable variable); } http://git-wip-us.apache.org/repos/asf/hive/blob/f6ea8cb6/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java new file mode 100644 index 0000000..d1ebe12 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics.common; + +/** + * This class defines some metrics generated by Hive processes. + */ +public class MetricsConstant { + + public static String JVM_PAUSE_INFO = "jvm.pause.info-threshold"; + public static String JVM_PAUSE_WARN = "jvm.pause.warn-threshold"; + public static String JVM_EXTRA_SLEEP = "jvm.pause.extraSleepTime"; + + public static String OPEN_CONNECTIONS = "open_connections"; + + public static String JDO_ACTIVE_TRANSACTIONS = "active_jdo_transactions"; + public static String JDO_ROLLBACK_TRANSACTIONS = "rollbacked_jdo_transactions"; + public static String JDO_COMMIT_TRANSACTIONS = "committed_jdo_transactions"; + public static String JDO_OPEN_TRANSACTIONS = "opened_jdo_transactions"; +} http://git-wip-us.apache.org/repos/asf/hive/blob/f6ea8cb6/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsVariable.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsVariable.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsVariable.java new file mode 100644 index 0000000..8cf6608 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsVariable.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics.common; + +/** + * Interface for metrics variables. <p/> For example a the database service could expose the number of + * currently active connections. + */ +public interface MetricsVariable<T> { + public T getValue(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/f6ea8cb6/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java index ae353d0..7756f43 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.common.metrics.metrics2; import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.Counter; import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.codahale.metrics.Gauge; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; @@ -44,6 +45,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import java.io.BufferedReader; @@ -52,12 +54,14 @@ import java.io.Closeable; import java.io.IOException; import java.io.OutputStreamWriter; import java.lang.management.ManagementFactory; +import java.net.URI; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -73,9 +77,11 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co public final MetricRegistry metricRegistry = new MetricRegistry(); private final Lock timersLock = new ReentrantLock(); private final Lock countersLock = new ReentrantLock(); + private final Lock gaugesLock = new ReentrantLock(); private LoadingCache<String, Timer> timers; private LoadingCache<String, Counter> counters; + private ConcurrentHashMap<String, Gauge> gauges; private HiveConf conf; private final Set<Closeable> reporters = new HashSet<Closeable>(); @@ -161,6 +167,7 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co } } ); + gauges = new ConcurrentHashMap<String, Gauge>(); //register JVM metrics registerAll("gc", new GarbageCollectorMetricSet()); @@ -218,7 +225,7 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co } public Long incrementCounter(String name) throws IOException { - return incrementCounter(name, 1); + return incrementCounter(name, 1L); } public Long incrementCounter(String name, long increment) throws IOException { @@ -234,6 +241,45 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co } } + public Long decrementCounter(String name) throws IOException { + return decrementCounter(name, 1L); + } + + public Long decrementCounter(String name, long decrement) throws IOException { + String key = name; + try { + countersLock.lock(); + counters.get(key).dec(decrement); + return counters.get(key).getCount(); + } catch(ExecutionException ee) { + throw new RuntimeException(ee); + } finally { + countersLock.unlock(); + } + } + + public void addGauge(String name, final MetricsVariable variable) { + Gauge gauge = new Gauge() { + @Override + public Object getValue() { + return variable.getValue(); + } + }; + try { + gaugesLock.lock(); + gauges.put(name, gauge); + // Metrics throws an Exception if we don't do this when the key already exists + if (metricRegistry.getGauges().containsKey(name)) { + LOGGER.warn("A Gauge with name [" + name + "] already exists. " + + " The old gauge will be overwritten, but this is not recommended"); + metricRegistry.remove(name); + } + metricRegistry.register(name, gauge); + } finally { + gaugesLock.unlock(); + } + } + // This method is necessary to synchronize lazy-creation to the timers. private Timer getTimer(String name) throws IOException { String key = name; @@ -312,11 +358,19 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co try { String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry); Path tmpPath = new Path(pathString + ".tmp"); - FileSystem fs = FileSystem.get(conf); + URI tmpPathURI = tmpPath.toUri(); + FileSystem fs = null; + if (tmpPathURI.getScheme() == null && tmpPathURI.getAuthority() == null) { + //default local + fs = FileSystem.getLocal(conf); + } else { + fs = FileSystem.get(tmpPathURI, conf); + } fs.delete(tmpPath, true); bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true))); bw.write(json); bw.close(); + fs.setPermission(tmpPath, FsPermission.createImmutable((short) 0644)); Path path = new Path(pathString); fs.rename(tmpPath, path); http://git-wip-us.apache.org/repos/asf/hive/blob/f6ea8cb6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6d0cf15..4549105 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1725,8 +1725,8 @@ public class HiveConf extends Configuration { "Hive metrics subsystem implementation class."), HIVE_METRICS_REPORTER("hive.service.metrics.reporter", "JSON_FILE, JMX", "Reporter type for metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics, comma separated list of JMX, CONSOLE, JSON_FILE"), - HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "file:///tmp/report.json", - "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, the location of JSON metrics file. " + + HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "/tmp/report.json", + "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, the location of local JSON metrics file. " + "This file will get overwritten at every interval."), HIVE_METRICS_JSON_FILE_INTERVAL("hive.service.metrics.file.frequency", "5s", new TimeValidator(TimeUnit.MILLISECONDS), http://git-wip-us.apache.org/repos/asf/hive/blob/f6ea8cb6/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java ---------------------------------------------------------------------- diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java index 954b388..a3aa549 100644 --- a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java +++ b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java @@ -22,7 +22,9 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.ShimLoader; import org.junit.After; @@ -135,4 +137,44 @@ public class TestCodahaleMetrics { JsonNode countNode = methodCounterNode.path("count"); Assert.assertEquals(countNode.asInt(), 5); } + + class TestMetricsVariable implements MetricsVariable { + private int gaugeVal; + + @Override + public Object getValue() { + return gaugeVal; + } + public void setValue(int gaugeVal) { + this.gaugeVal = gaugeVal; + } + }; + + @Test + public void testGauge() throws Exception { + TestMetricsVariable testVar = new TestMetricsVariable(); + testVar.setValue(20); + + MetricsFactory.getInstance().addGauge("gauge1", testVar); + Thread.sleep(2000); + byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); + ObjectMapper objectMapper = new ObjectMapper(); + + JsonNode rootNode = objectMapper.readTree(jsonData); + JsonNode gaugesNode = rootNode.path("gauges"); + JsonNode methodGaugeNode = gaugesNode.path("gauge1"); + JsonNode countNode = methodGaugeNode.path("value"); + Assert.assertEquals(countNode.asInt(), testVar.getValue()); + + testVar.setValue(40); + Thread.sleep(2000); + + jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); + + rootNode = objectMapper.readTree(jsonData); + gaugesNode = rootNode.path("gauges"); + methodGaugeNode = gaugesNode.path("gauge1"); + countNode = methodGaugeNode.path("value"); + Assert.assertEquals(countNode.asInt(), testVar.getValue()); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f6ea8cb6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java index 25f34d1..c9da95a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java @@ -24,8 +24,10 @@ import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hive.service.server.HiveServer2; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -37,9 +39,11 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Map; /** * Tests Hive Metastore Metrics. + * */ public class TestMetaStoreMetrics { @@ -49,9 +53,8 @@ public class TestMetaStoreMetrics { private static HiveConf hiveConf; private static Driver driver; - - @Before - public void before() throws Exception { + @BeforeClass + public static void before() throws Exception { int port = MetaStoreUtils.findFreePort(); @@ -86,9 +89,58 @@ public class TestMetaStoreMetrics { ObjectMapper objectMapper = new ObjectMapper(); JsonNode rootNode = objectMapper.readTree(jsonData); - JsonNode countersNode = rootNode.path("timers"); - JsonNode methodCounterNode = countersNode.path("api_get_all_databases"); - JsonNode countNode = methodCounterNode.path("count"); - Assert.assertTrue(countNode.asInt() > 0); + JsonNode timersNode = rootNode.path("timers"); + JsonNode methodCounterNode = timersNode.path("api_get_all_databases"); + JsonNode methodCountNode = methodCounterNode.path("count"); + Assert.assertTrue(methodCountNode.asInt() > 0); + + JsonNode countersNode = rootNode.path("counters"); + JsonNode committedJdoTxNode = countersNode.path("committed_jdo_transactions"); + JsonNode committedCountNode = committedJdoTxNode.path("count"); + Assert.assertTrue(committedCountNode.asInt() > 0); + } + + + @Test + public void testConnections() throws Exception { + byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode rootNode = objectMapper.readTree(jsonData); + JsonNode countersNode = rootNode.path("counters"); + JsonNode openCnxNode = countersNode.path("open_connections"); + JsonNode openCnxCountNode = openCnxNode.path("count"); + Assert.assertTrue(openCnxCountNode.asInt() == 1); + + //create a second connection + HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf); + HiveMetaStoreClient msc2 = new HiveMetaStoreClient(hiveConf); + Thread.sleep(2000); + + jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); + rootNode = objectMapper.readTree(jsonData); + countersNode = rootNode.path("counters"); + openCnxNode = countersNode.path("open_connections"); + openCnxCountNode = openCnxNode.path("count"); + Assert.assertTrue(openCnxCountNode.asInt() == 3); + + msc.close(); + Thread.sleep(2000); + + jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); + rootNode = objectMapper.readTree(jsonData); + countersNode = rootNode.path("counters"); + openCnxNode = countersNode.path("open_connections"); + openCnxCountNode = openCnxNode.path("count"); + Assert.assertTrue(openCnxCountNode.asInt() == 2); + + msc2.close(); + Thread.sleep(2000); + + jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); + rootNode = objectMapper.readTree(jsonData); + countersNode = rootNode.path("counters"); + openCnxNode = countersNode.path("open_connections"); + openCnxCountNode = openCnxNode.path("count"); + Assert.assertTrue(openCnxCountNode.asInt() == 1); } } http://git-wip-us.apache.org/repos/asf/hive/blob/f6ea8cb6/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 0bcd053..4c9cd79 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.common.cli.CommonCliOptions; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -188,8 +190,11 @@ import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.ServerContext; import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TServerEventHandler; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TServerSocket; @@ -821,14 +826,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { threadLocalMS.remove(); } } - if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { - try { - MetricsFactory.close(); - } catch (Exception e) { - LOG.error("error in Metrics deinit: " + e.getClass().getName() + " " - + e.getMessage(), e); - } - } logInfo("Metastore shutdown complete."); } @@ -5878,7 +5875,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { */ public static void main(String[] args) throws Throwable { HiveConf.setLoadMetastoreConfig(true); - HiveConf conf = new HiveConf(HMSHandler.class); + final HiveConf conf = new HiveConf(HMSHandler.class); HiveMetastoreCli cli = new HiveMetastoreCli(conf); cli.parse(args); @@ -5921,6 +5918,14 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (isCliVerbose) { System.err.println(shutdownMsg); } + if (conf.getBoolVar(ConfVars.METASTORE_METRICS)) { + try { + MetricsFactory.close(); + } catch (Exception e) { + LOG.error("error in Metrics deinit: " + e.getClass().getName() + " " + + e.getMessage(), e); + } + } } }); @@ -6057,6 +6062,42 @@ public class HiveMetaStore extends ThriftHiveMetastore { .maxWorkerThreads(maxWorkerThreads); TServer tServer = new TThreadPoolServer(args); + TServerEventHandler tServerEventHandler = new TServerEventHandler() { + @Override + public void preServe() { + } + + @Override + public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) { + try { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS); + } + } catch (Exception e) { + LOG.warn("Error Reporting Metastore open connection to Metrics system", e); + } + return null; + } + + @Override + public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) { + try { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS); + } + } catch (Exception e) { + LOG.warn("Error Reporting Metastore close connection to Metrics system", e); + } + } + + @Override + public void processContext(ServerContext serverContext, TTransport tTransport, TTransport tTransport1) { + } + }; + + tServer.setServerEventHandler(tServerEventHandler); HMSHandler.LOG.info("Started the new metaserver on port [" + port + "]..."); HMSHandler.LOG.info("Options.minWorkerThreads = " http://git-wip-us.apache.org/repos/asf/hive/blob/f6ea8cb6/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 4273c0b..8f52f83 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -62,6 +62,10 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.AggrStats; @@ -207,7 +211,7 @@ public class ObjectStore implements RawStore, Configurable { private MetaStoreDirectSql directSql = null; private PartitionExpressionProxy expressionProxy = null; private Configuration hiveConf; - int openTrasactionCalls = 0; + private volatile int openTrasactionCalls = 0; private Transaction currentTransaction = null; private TXN_STATUS transactionStatus = TXN_STATUS.NO_STATE; @@ -257,6 +261,17 @@ public class ObjectStore implements RawStore, Configurable { initialize(propsFromConf); + //Add metric for number of active JDO transactions. + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + metrics.addGauge(MetricsConstant.JDO_ACTIVE_TRANSACTIONS, new MetricsVariable() { + @Override + public Object getValue() { + return openTrasactionCalls; + } + }); + } + String partitionValidationRegex = hiveConf.get(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.name()); if (partitionValidationRegex != null && partitionValidationRegex.equals("")) { @@ -430,6 +445,7 @@ public class ObjectStore implements RawStore, Configurable { boolean result = currentTransaction.isActive(); debugLog("Open transaction: count = " + openTrasactionCalls + ", isActive = " + result); + incrementMetricsCount(MetricsConstant.JDO_OPEN_TRANSACTIONS); return result; } @@ -468,6 +484,7 @@ public class ObjectStore implements RawStore, Configurable { currentTransaction.commit(); } + incrementMetricsCount(MetricsConstant.JDO_COMMIT_TRANSACTIONS); return true; } @@ -505,6 +522,7 @@ public class ObjectStore implements RawStore, Configurable { // from reattaching in future transactions pm.evictAll(); } + incrementMetricsCount(MetricsConstant.JDO_ROLLBACK_TRANSACTIONS); } @Override @@ -6807,6 +6825,16 @@ public class ObjectStore implements RawStore, Configurable { } } + private void incrementMetricsCount(String name) { + try { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + metrics.incrementCounter(name); + } + } catch (Exception e) { + LOG.warn("Error Reporting JDO operation to Metrics system", e); + } + } private void debugLog(String message) { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/f6ea8cb6/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index dfb7faa..67bc778 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -30,6 +30,9 @@ import javax.security.auth.login.LoginException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hive.service.AbstractService; @@ -108,13 +111,29 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe @Override public ServerContext createContext( TProtocol input, TProtocol output) { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + try { + metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS); + } catch (Exception e) { + LOG.warn("Error Reporting JDO operation to Metrics system", e); + } + } return new ThriftCLIServerContext(); } @Override public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { - ThriftCLIServerContext context = (ThriftCLIServerContext)serverContext; + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + try { + metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS); + } catch (Exception e) { + LOG.warn("Error Reporting JDO operation to Metrics system", e); + } + } + ThriftCLIServerContext context = (ThriftCLIServerContext) serverContext; SessionHandle sessionHandle = context.getSessionHandle(); if (sessionHandle != null) { LOG.info("Session disconnected without closing properly, close it now");