HIVE-10761 : Create codahale-based metrics system for Hive (Szehon, reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a96fbdee Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a96fbdee Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a96fbdee Branch: refs/heads/spark Commit: a96fbdeef442fdeecca74d1c3f8a855b5c9d514b Parents: 82beb2b Author: Szehon Ho <sze...@cloudera.com> Authored: Wed Jun 3 23:46:28 2015 -0700 Committer: Szehon Ho <sze...@cloudera.com> Committed: Wed Jun 3 23:46:28 2015 -0700 ---------------------------------------------------------------------- common/pom.xml | 20 + .../hadoop/hive/common/JvmPauseMonitor.java | 225 ++++++++++++ .../hive/common/metrics/LegacyMetrics.java | 262 +++++++++++++ .../hadoop/hive/common/metrics/Metrics.java | 253 ------------- .../hive/common/metrics/common/Metrics.java | 68 ++++ .../common/metrics/common/MetricsFactory.java | 48 +++ .../metrics/metrics2/CodahaleMetrics.java | 366 +++++++++++++++++++ .../metrics/metrics2/MetricsReporting.java | 27 ++ .../org/apache/hadoop/hive/conf/HiveConf.java | 18 +- .../hive/common/metrics/TestLegacyMetrics.java | 295 +++++++++++++++ .../hadoop/hive/common/metrics/TestMetrics.java | 286 --------------- .../metrics/metrics2/TestCodahaleMetrics.java | 138 +++++++ .../hive/metastore/TestMetaStoreMetrics.java | 94 +++++ .../hadoop/hive/metastore/HiveMetaStore.java | 132 ++++--- pom.xml | 3 + .../apache/hive/service/server/HiveServer2.java | 25 +- .../hadoop/hive/shims/Hadoop20SShims.java | 5 - .../apache/hadoop/hive/shims/Hadoop23Shims.java | 13 - .../apache/hadoop/hive/shims/HadoopShims.java | 2 - 19 files changed, 1665 insertions(+), 615 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index a615c1e..8d4b1ea 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -98,6 +98,26 @@ <artifactId>json</artifactId> <version>${json.version}</version> </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${dropwizard.version}</version> + </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-jvm</artifactId> + <version>${dropwizard.version}</version> + </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-json</artifactId> + <version>${dropwizard.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.new.version}</version> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java new file mode 100644 index 0000000..c3949f2 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.util.Daemon; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Based on the JvmPauseMonitor from Hadoop. + */ +public class JvmPauseMonitor { + private static final Log LOG = LogFactory.getLog( + JvmPauseMonitor.class); + + /** The target sleep time */ + private static final long SLEEP_INTERVAL_MS = 500; + + /** log WARN if we detect a pause longer than this threshold */ + private final long warnThresholdMs; + private static final String WARN_THRESHOLD_KEY = + "jvm.pause.warn-threshold.ms"; + private static final long WARN_THRESHOLD_DEFAULT = 10000; + + /** log INFO if we detect a pause longer than this threshold */ + private final long infoThresholdMs; + private static final String INFO_THRESHOLD_KEY = + "jvm.pause.info-threshold.ms"; + private static final long INFO_THRESHOLD_DEFAULT = 1000; + + private long numGcWarnThresholdExceeded = 0; + private long numGcInfoThresholdExceeded = 0; + private long totalGcExtraSleepTime = 0; + + private Thread monitorThread; + private volatile boolean shouldRun = true; + + public JvmPauseMonitor(Configuration conf) { + this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT); + this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT); + } + + public void start() { + Preconditions.checkState(monitorThread == null, + "JvmPauseMonitor thread is Already started"); + monitorThread = new Daemon(new Monitor()); + monitorThread.start(); + } + + public void stop() { + shouldRun = false; + if (isStarted()) { + monitorThread.interrupt(); + try { + monitorThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + public boolean isStarted() { + return monitorThread != null; + } + + public long getNumGcWarnThreadholdExceeded() { + return numGcWarnThresholdExceeded; + } + + public long getNumGcInfoThresholdExceeded() { + return numGcInfoThresholdExceeded; + } + + public long getTotalGcExtraSleepTime() { + return totalGcExtraSleepTime; + } + + private String formatMessage(long extraSleepTime, + Map<String, GcTimes> gcTimesAfterSleep, + Map<String, GcTimes> gcTimesBeforeSleep) { + + Set<String> gcBeanNames = Sets.intersection( + gcTimesAfterSleep.keySet(), + gcTimesBeforeSleep.keySet()); + List<String> gcDiffs = Lists.newArrayList(); + for (String name : gcBeanNames) { + GcTimes diff = gcTimesAfterSleep.get(name).subtract( + gcTimesBeforeSleep.get(name)); + if (diff.gcCount != 0) { + gcDiffs.add("GC pool '" + name + "' had collection(s): " + + diff.toString()); + } + } + + String ret = "Detected pause in JVM or host machine (eg GC): " + + "pause of approximately " + extraSleepTime + "ms\n"; + if (gcDiffs.isEmpty()) { + ret += "No GCs detected"; + } else { + ret += Joiner.on("\n").join(gcDiffs); + } + return ret; + } + + private Map<String, GcTimes> getGcTimes() { + Map<String, GcTimes> map = Maps.newHashMap(); + List<GarbageCollectorMXBean> gcBeans = + ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + map.put(gcBean.getName(), new GcTimes(gcBean)); + } + return map; + } + + private static class GcTimes { + private GcTimes(GarbageCollectorMXBean gcBean) { + gcCount = gcBean.getCollectionCount(); + gcTimeMillis = gcBean.getCollectionTime(); + } + + private GcTimes(long count, long time) { + this.gcCount = count; + this.gcTimeMillis = time; + } + + private GcTimes subtract(GcTimes other) { + return new GcTimes(this.gcCount - other.gcCount, + this.gcTimeMillis - other.gcTimeMillis); + } + + @Override + public String toString() { + return "count=" + gcCount + " time=" + gcTimeMillis + "ms"; + } + + private long gcCount; + private long gcTimeMillis; + } + + private class Monitor implements Runnable { + @Override + public void run() { + Stopwatch sw = new Stopwatch(); + Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes(); + while (shouldRun) { + sw.reset().start(); + try { + Thread.sleep(SLEEP_INTERVAL_MS); + } catch (InterruptedException ie) { + return; + } + long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS; + Map<String, GcTimes> gcTimesAfterSleep = getGcTimes(); + + if (extraSleepTime > warnThresholdMs) { + ++numGcWarnThresholdExceeded; + LOG.warn(formatMessage( + extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + incrementMetricsCounter("jvm.pause.warn-threshold", 1); + } else if (extraSleepTime > infoThresholdMs) { + ++numGcInfoThresholdExceeded; + LOG.info(formatMessage( + extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + incrementMetricsCounter("jvm.pause.info-threshold", 1); + } + incrementMetricsCounter("jvm.pause.extraSleepTime", extraSleepTime); + totalGcExtraSleepTime += extraSleepTime; + gcTimesBeforeSleep = gcTimesAfterSleep; + } + } + + private void incrementMetricsCounter(String name, long count) { + try { + MetricsFactory.getMetricsInstance().incrementCounter(name, count); + } catch (Exception e) { + LOG.warn("Error Reporting JvmPauseMonitor to Metrics system", e); + } + } + } + + /** + * Simple 'main' to facilitate manual testing of the pause monitor. + * + * This main function just leaks memory into a list. Running this class + * with a 1GB heap will very quickly go into "GC hell" and result in + * log messages about the GC pauses. + */ + public static void main(String []args) throws Exception { + new JvmPauseMonitor(new Configuration()).start(); + List<String> list = Lists.newArrayList(); + int i = 0; + while (true) { + list.add(String.valueOf(i++)); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java new file mode 100644 index 0000000..14f7afb --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics; + +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.conf.HiveConf; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.HashMap; + +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +/** + * This class may eventually get superseded by org.apache.hadoop.hive.common.metrics2.Metrics. + * + * Metrics Subsystem - allows exposure of a number of named parameters/counters + * via jmx, intended to be used as a static subsystem + * + * Has a couple of primary ways it can be used: + * (i) Using the set and get methods to set and get named parameters + * (ii) Using the incrementCounter method to increment and set named + * parameters in one go, rather than having to make a get and then a set. + * (iii) Using the startScope and endScope methods to start and end + * named "scopes" that record the number of times they've been + * instantiated and amount of time(in milliseconds) spent inside + * the scopes. + */ +public class LegacyMetrics implements Metrics { + + private LegacyMetrics() { + // block + } + + /** + * MetricsScope : A class that encapsulates an idea of a metered scope. + * Instantiating a named scope and then closing it exposes two counters: + * (i) a "number of calls" counter ( <name>.n ), and + * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t) + */ + public static class MetricsScope { + + final LegacyMetrics metrics; + + final String name; + final String numCounter; + final String timeCounter; + final String avgTimeCounter; + + private boolean isOpen = false; + private Long startTime = null; + + /** + * Instantiates a named scope - intended to only be called by Metrics, so locally scoped. + * @param name - name of the variable + * @throws IOException + */ + private MetricsScope(String name, LegacyMetrics metrics) throws IOException { + this.metrics = metrics; + this.name = name; + this.numCounter = name + ".n"; + this.timeCounter = name + ".t"; + this.avgTimeCounter = name + ".avg_t"; + open(); + } + + public Long getNumCounter() throws IOException { + return (Long) metrics.get(numCounter); + } + + public Long getTimeCounter() throws IOException { + return (Long) metrics.get(timeCounter); + } + + /** + * Opens scope, and makes note of the time started, increments run counter + * @throws IOException + * + */ + public void open() throws IOException { + if (!isOpen) { + isOpen = true; + startTime = System.currentTimeMillis(); + } else { + throw new IOException("Scope named " + name + " is not closed, cannot be opened."); + } + } + + /** + * Closes scope, and records the time taken + * @throws IOException + */ + public void close() throws IOException { + if (isOpen) { + Long endTime = System.currentTimeMillis(); + synchronized(metrics) { + Long num = metrics.incrementCounter(numCounter); + Long time = metrics.incrementCounter(timeCounter, endTime - startTime); + if (num != null && time != null) { + metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue())); + } + } + } else { + throw new IOException("Scope named " + name + " is not open, cannot be closed."); + } + isOpen = false; + } + + + /** + * Closes scope if open, and reopens it + * @throws IOException + */ + public void reopen() throws IOException { + if(isOpen) { + close(); + } + open(); + } + + } + + private static final MetricsMBean metrics = new MetricsMBeanImpl(); + + private static final ObjectName oname; + static { + try { + oname = new ObjectName( + "org.apache.hadoop.hive.common.metrics:type=MetricsMBean"); + } catch (MalformedObjectNameException mone) { + throw new RuntimeException(mone); + } + } + + + private static final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes + = new ThreadLocal<HashMap<String,MetricsScope>>() { + @Override + protected HashMap<String,MetricsScope> initialValue() { + return new HashMap<String,MetricsScope>(); + } + }; + + private boolean initialized = false; + + public void init(HiveConf conf) throws Exception { + if (!initialized) { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + mbs.registerMBean(metrics, oname); + initialized = true; + } + } + + public boolean isInitialized() { + return initialized; + } + + public Long incrementCounter(String name) throws IOException{ + if (!initialized) { + return null; + } + return incrementCounter(name,Long.valueOf(1)); + } + + public Long incrementCounter(String name, long increment) throws IOException{ + if (!initialized) { + return null; + } + Long value; + synchronized(metrics) { + if (!metrics.hasKey(name)) { + value = Long.valueOf(increment); + set(name, value); + } else { + value = ((Long)get(name)) + increment; + set(name, value); + } + } + return value; + } + + public void set(String name, Object value) throws IOException{ + if (!initialized) { + return; + } + metrics.put(name,value); + } + + public Object get(String name) throws IOException{ + if (!initialized) { + return null; + } + return metrics.get(name); + } + + public void startScope(String name) throws IOException{ + if (!initialized) { + return; + } + if (threadLocalScopes.get().containsKey(name)) { + threadLocalScopes.get().get(name).open(); + } else { + threadLocalScopes.get().put(name, new MetricsScope(name, this)); + } + } + + public MetricsScope getScope(String name) throws IOException { + if (!initialized) { + return null; + } + if (threadLocalScopes.get().containsKey(name)) { + return threadLocalScopes.get().get(name); + } else { + throw new IOException("No metrics scope named " + name); + } + } + + public void endScope(String name) throws IOException{ + if (!initialized) { + return; + } + if (threadLocalScopes.get().containsKey(name)) { + threadLocalScopes.get().get(name).close(); + } + } + + /** + * Resets the static context state to initial. + * Used primarily for testing purposes. + * + * Note that threadLocalScopes ThreadLocal is *not* cleared in this call. + */ + public void deInit() throws Exception { + synchronized (metrics) { + if (initialized) { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + if (mbs.isRegistered(oname)) { + mbs.unregisterMBean(oname); + } + metrics.clear(); + initialized = false; + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java deleted file mode 100644 index 01c9d1d..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.common.metrics; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.HashMap; - -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -/** - * Metrics Subsystem - allows exposure of a number of named parameters/counters - * via jmx, intended to be used as a static subsystem - * - * Has a couple of primary ways it can be used: - * (i) Using the set and get methods to set and get named parameters - * (ii) Using the incrementCounter method to increment and set named - * parameters in one go, rather than having to make a get and then a set. - * (iii) Using the startScope and endScope methods to start and end - * named "scopes" that record the number of times they've been - * instantiated and amount of time(in milliseconds) spent inside - * the scopes. - */ -public class Metrics { - - private Metrics() { - // block - } - - /** - * MetricsScope : A class that encapsulates an idea of a metered scope. - * Instantiating a named scope and then closing it exposes two counters: - * (i) a "number of calls" counter ( <name>.n ), and - * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t) - */ - public static class MetricsScope { - - final String name; - final String numCounter; - final String timeCounter; - final String avgTimeCounter; - - private boolean isOpen = false; - private Long startTime = null; - - /** - * Instantiates a named scope - intended to only be called by Metrics, so locally scoped. - * @param name - name of the variable - * @throws IOException - */ - private MetricsScope(String name) throws IOException { - this.name = name; - this.numCounter = name + ".n"; - this.timeCounter = name + ".t"; - this.avgTimeCounter = name + ".avg_t"; - open(); - } - - public Long getNumCounter() throws IOException { - return (Long)Metrics.get(numCounter); - } - - public Long getTimeCounter() throws IOException { - return (Long)Metrics.get(timeCounter); - } - - /** - * Opens scope, and makes note of the time started, increments run counter - * @throws IOException - * - */ - public void open() throws IOException { - if (!isOpen) { - isOpen = true; - startTime = System.currentTimeMillis(); - } else { - throw new IOException("Scope named " + name + " is not closed, cannot be opened."); - } - } - - /** - * Closes scope, and records the time taken - * @throws IOException - */ - public void close() throws IOException { - if (isOpen) { - Long endTime = System.currentTimeMillis(); - synchronized(metrics) { - Long num = Metrics.incrementCounter(numCounter); - Long time = Metrics.incrementCounter(timeCounter, endTime - startTime); - if (num != null && time != null) { - Metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue())); - } - } - } else { - throw new IOException("Scope named " + name + " is not open, cannot be closed."); - } - isOpen = false; - } - - - /** - * Closes scope if open, and reopens it - * @throws IOException - */ - public void reopen() throws IOException { - if(isOpen) { - close(); - } - open(); - } - - } - - private static final MetricsMBean metrics = new MetricsMBeanImpl(); - - private static final ObjectName oname; - static { - try { - oname = new ObjectName( - "org.apache.hadoop.hive.common.metrics:type=MetricsMBean"); - } catch (MalformedObjectNameException mone) { - throw new RuntimeException(mone); - } - } - - - private static final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes - = new ThreadLocal<HashMap<String,MetricsScope>>() { - @Override - protected HashMap<String,MetricsScope> initialValue() { - return new HashMap<String,MetricsScope>(); - } - }; - - private static boolean initialized = false; - - public static void init() throws Exception { - synchronized (metrics) { - if (!initialized) { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(metrics, oname); - initialized = true; - } - } - } - - public static Long incrementCounter(String name) throws IOException{ - if (!initialized) { - return null; - } - return incrementCounter(name,Long.valueOf(1)); - } - - public static Long incrementCounter(String name, long increment) throws IOException{ - if (!initialized) { - return null; - } - Long value; - synchronized(metrics) { - if (!metrics.hasKey(name)) { - value = Long.valueOf(increment); - set(name, value); - } else { - value = ((Long)get(name)) + increment; - set(name, value); - } - } - return value; - } - - public static void set(String name, Object value) throws IOException{ - if (!initialized) { - return; - } - metrics.put(name,value); - } - - public static Object get(String name) throws IOException{ - if (!initialized) { - return null; - } - return metrics.get(name); - } - - public static MetricsScope startScope(String name) throws IOException{ - if (!initialized) { - return null; - } - if (threadLocalScopes.get().containsKey(name)) { - threadLocalScopes.get().get(name).open(); - } else { - threadLocalScopes.get().put(name, new MetricsScope(name)); - } - return threadLocalScopes.get().get(name); - } - - public static MetricsScope getScope(String name) throws IOException { - if (!initialized) { - return null; - } - if (threadLocalScopes.get().containsKey(name)) { - return threadLocalScopes.get().get(name); - } else { - throw new IOException("No metrics scope named " + name); - } - } - - public static void endScope(String name) throws IOException{ - if (!initialized) { - return; - } - if (threadLocalScopes.get().containsKey(name)) { - threadLocalScopes.get().get(name).close(); - } - } - - /** - * Resets the static context state to initial. - * Used primarily for testing purposes. - * - * Note that threadLocalScopes ThreadLocal is *not* cleared in this call. - */ - static void uninit() throws Exception { - synchronized (metrics) { - if (initialized) { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - if (mbs.isRegistered(oname)) { - mbs.unregisterMBean(oname); - } - metrics.clear(); - initialized = false; - } - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java new file mode 100644 index 0000000..13a5336 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics.common; + +import java.io.IOException; + +import org.apache.hadoop.hive.conf.HiveConf; + +import java.io.IOException; + +/** + * Generic Metics interface. + */ +public interface Metrics { + + /** + * Initialize Metrics system with given Hive configuration. + * @param conf + */ + public void init(HiveConf conf) throws Exception; + + /** + * Deinitializes the Metrics system. + */ + public void deInit() throws Exception; + + /** + * @param name + * @throws IOException + */ + public void startScope(String name) throws IOException; + + public void endScope(String name) throws IOException; + + //Counter-related methods + + /** + * Increments a counter of the given name by 1. + * @param name + * @return + * @throws IOException + */ + public Long incrementCounter(String name) throws IOException; + + /** + * Increments a counter of the given name by "increment" + * @param name + * @param increment + * @return + * @throws IOException + */ + public Long incrementCounter(String name, long increment) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java new file mode 100644 index 0000000..12a309d --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics.common; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Class that manages a static Metric instance for this process. + */ +public class MetricsFactory { + + private static Metrics metrics; + private static Object initLock = new Object(); + + public synchronized static void init(HiveConf conf) throws Exception { + if (metrics == null) { + metrics = (Metrics) ReflectionUtils.newInstance(conf.getClassByName( + conf.getVar(HiveConf.ConfVars.HIVE_METRICS_CLASS)), conf); + } + metrics.init(conf); + } + + public synchronized static Metrics getMetricsInstance() { + return metrics; + } + + public synchronized static void deInit() throws Exception { + if (metrics != null) { + metrics.deInit(); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java new file mode 100644 index 0000000..e59da99 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java @@ -0,0 +1,366 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.metrics.metrics2; + +import com.codahale.metrics.ConsoleReporter; +import com.codahale.metrics.Counter; +import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.MetricSet; +import com.codahale.metrics.Timer; +import com.codahale.metrics.json.MetricsModule; +import com.codahale.metrics.jvm.BufferPoolMetricSet; +import com.codahale.metrics.jvm.ClassLoadingGaugeSet; +import com.codahale.metrics.jvm.GarbageCollectorMetricSet; +import com.codahale.metrics.jvm.MemoryUsageGaugeSet; +import com.codahale.metrics.jvm.ThreadStatesGaugeSet; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Splitter; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimerTask; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Codahale-backed Metrics implementation. + */ +public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.common.Metrics { + public static final String API_PREFIX = "api_"; + public static final Log LOGGER = LogFactory.getLog(CodahaleMetrics.class); + + public final MetricRegistry metricRegistry = new MetricRegistry(); + private final Lock timersLock = new ReentrantLock(); + private final Lock countersLock = new ReentrantLock(); + + private LoadingCache<String, Timer> timers; + private LoadingCache<String, Counter> counters; + + private boolean initialized = false; + private HiveConf conf; + private final Set<Closeable> reporters = new HashSet<Closeable>(); + + private final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes + = new ThreadLocal<HashMap<String,MetricsScope>>() { + @Override + protected HashMap<String,MetricsScope> initialValue() { + return new HashMap<String,MetricsScope>(); + } + }; + + public static class MetricsScope { + + final String name; + final Timer timer; + Timer.Context timerContext; + CodahaleMetrics metrics; + + private boolean isOpen = false; + + /** + * Instantiates a named scope - intended to only be called by Metrics, so locally scoped. + * @param name - name of the variable + * @throws IOException + */ + private MetricsScope(String name, CodahaleMetrics metrics) throws IOException { + this.name = name; + this.metrics = metrics; + this.timer = metrics.getTimer(name); + open(); + } + + /** + * Opens scope, and makes note of the time started, increments run counter + * @throws IOException + * + */ + public void open() throws IOException { + if (!isOpen) { + isOpen = true; + this.timerContext = timer.time(); + } else { + throw new IOException("Scope named " + name + " is not closed, cannot be opened."); + } + } + + /** + * Closes scope, and records the time taken + * @throws IOException + */ + public void close() throws IOException { + if (isOpen) { + timerContext.close(); + + } else { + throw new IOException("Scope named " + name + " is not open, cannot be closed."); + } + isOpen = false; + } + } + + public synchronized void init(HiveConf conf) throws Exception { + if (initialized) { + return; + } + + this.conf = conf; + //Codahale artifacts are lazily-created. + timers = CacheBuilder.newBuilder().build( + new CacheLoader<String, com.codahale.metrics.Timer>() { + @Override + public com.codahale.metrics.Timer load(String key) throws Exception { + Timer timer = new Timer(new ExponentiallyDecayingReservoir()); + metricRegistry.register(key, timer); + return timer; + } + } + ); + counters = CacheBuilder.newBuilder().build( + new CacheLoader<String, Counter>() { + @Override + public Counter load(String key) throws Exception { + Counter counter = new Counter(); + metricRegistry.register(key, counter); + return counter; + } + } + ); + + //register JVM metrics + registerAll("gc", new GarbageCollectorMetricSet()); + registerAll("buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer())); + registerAll("memory", new MemoryUsageGaugeSet()); + registerAll("threads", new ThreadStatesGaugeSet()); + registerAll("classLoading", new ClassLoadingGaugeSet()); + + //Metrics reporter + Set<MetricsReporting> finalReporterList = new HashSet<MetricsReporting>(); + List<String> metricsReporterNames = Lists.newArrayList( + Splitter.on(",").trimResults().omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER))); + + if(metricsReporterNames != null) { + for (String metricsReportingName : metricsReporterNames) { + try { + MetricsReporting reporter = MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase()); + finalReporterList.add(reporter); + } catch (IllegalArgumentException e) { + LOGGER.warn("Metrics reporter skipped due to invalid configured reporter: " + metricsReportingName); + } + } + } + initReporting(finalReporterList); + initialized = true; + } + + + public synchronized void deInit() throws Exception { + if (initialized) { + if (reporters != null) { + for (Closeable reporter : reporters) { + reporter.close(); + } + } + for (Map.Entry<String, Metric> metric : metricRegistry.getMetrics().entrySet()) { + metricRegistry.remove(metric.getKey()); + } + timers.invalidateAll(); + counters.invalidateAll(); + initialized = false; + } + } + + public void startScope(String name) throws IOException { + synchronized (this) { + if (!initialized) { + return; + } + } + name = API_PREFIX + name; + if (threadLocalScopes.get().containsKey(name)) { + threadLocalScopes.get().get(name).open(); + } else { + threadLocalScopes.get().put(name, new MetricsScope(name, this)); + } + } + + public void endScope(String name) throws IOException{ + synchronized (this) { + if (!initialized) { + return; + } + } + name = API_PREFIX + name; + if (threadLocalScopes.get().containsKey(name)) { + threadLocalScopes.get().get(name).close(); + } + } + + public Long incrementCounter(String name) throws IOException { + return incrementCounter(name, 1); + } + + public Long incrementCounter(String name, long increment) throws IOException { + String key = name; + try { + countersLock.lock(); + counters.get(key).inc(increment); + return counters.get(key).getCount(); + } catch(ExecutionException ee) { + throw new RuntimeException(ee); + } finally { + countersLock.unlock(); + } + } + + // This method is necessary to synchronize lazy-creation to the timers. + private Timer getTimer(String name) throws IOException { + String key = name; + try { + timersLock.lock(); + Timer timer = timers.get(key); + return timer; + } catch (ExecutionException e) { + throw new IOException(e); + } finally { + timersLock.unlock(); + } + } + + private void registerAll(String prefix, MetricSet metricSet) { + for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) { + if (entry.getValue() instanceof MetricSet) { + registerAll(prefix + "." + entry.getKey(), (MetricSet) entry.getValue()); + } else { + metricRegistry.register(prefix + "." + entry.getKey(), entry.getValue()); + } + } + } + + @VisibleForTesting + public MetricRegistry getMetricRegistry() { + return metricRegistry; + } + + /** + * Should be only called once to initialize the reporters + */ + private void initReporting(Set<MetricsReporting> reportingSet) throws Exception { + for (MetricsReporting reporting : reportingSet) { + switch(reporting) { + case CONSOLE: + final ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(); + consoleReporter.start(1, TimeUnit.SECONDS); + reporters.add(consoleReporter); + break; + case JMX: + final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(); + jmxReporter.start(); + reporters.add(jmxReporter); + break; + case JSON_FILE: + final JsonFileReporter jsonFileReporter = new JsonFileReporter(); + jsonFileReporter.start(); + reporters.add(jsonFileReporter); + break; + } + } + } + + class JsonFileReporter implements Closeable { + private ObjectMapper jsonMapper = null; + private java.util.Timer timer = null; + + public void start() { + this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS, false)); + this.timer = new java.util.Timer(true); + + long time = conf.getTimeVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, TimeUnit.MILLISECONDS); + final String pathString = conf.getVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION); + + timer.schedule(new TimerTask() { + @Override + public void run() { + BufferedWriter bw = null; + try { + String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry); + Path tmpPath = new Path(pathString + ".tmp"); + FileSystem fs = FileSystem.get(conf); + fs.delete(tmpPath, true); + bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true))); + bw.write(json); + bw.close(); + + Path path = new Path(pathString); + fs.rename(tmpPath, path); + fs.setPermission(path, FsPermission.createImmutable((short) 0644)); + } catch (Exception e) { + LOGGER.warn("Error writing JSON Metrics to file", e); + } finally { + try { + if (bw != null) { + bw.close(); + } + } catch (IOException e) { + //Ignore. + } + } + + + } + }, 0, time); + } + + public void close() { + if (timer != null) { + this.timer.cancel(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java new file mode 100644 index 0000000..643246f --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics.metrics2; + +/** + * Reporting options for org.apache.hadoop.hive.common.metrics.metrics2.Metrics. + */ +public enum MetricsReporting { + JMX, + CONSOLE, + JSON_FILE +} http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d733d71..a724fd1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -645,6 +645,7 @@ public class HiveConf extends Configuration { "Maximum cache full % after which the cache cleaner thread kicks in."), METASTORE_AGGREGATE_STATS_CACHE_CLEAN_UNTIL("hive.metastore.aggregate.stats.cache.clean.until", (float) 0.8, "The cleaner thread cleans until cache reaches this % full size."), + METASTORE_METRICS("hive.metastore.metrics.enabled", false, "Enable metrics on the metastore."), // Parameters for exporting metadata on table drop (requires the use of the) // org.apache.hadoop.hive.ql.parse.MetaDataExportListener preevent listener @@ -1688,6 +1689,7 @@ public class HiveConf extends Configuration { " EXECUTION: Log completion of tasks\n" + " PERFORMANCE: Execution + Performance logs \n" + " VERBOSE: All logs" ), + HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."), // logging configuration HIVE_LOG4J_FILE("hive.log4j.file", "", "Hive log4j configuration file.\n" + @@ -1715,7 +1717,21 @@ public class HiveConf extends Configuration { HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME( "hive.autogen.columnalias.prefix.includefuncname", false, "Whether to include function name in the column alias auto generated by Hive."), - + HIVE_METRICS_CLASS("hive.service.metrics.class", + "org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics", + new StringSet( + "org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics", + "org.apache.hadoop.hive.common.metrics.LegacyMetrics"), + "Hive metrics subsystem implementation class."), + HIVE_METRICS_REPORTER("hive.service.metrics.reporter", "JSON_FILE, JMX", + "Reporter type for metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics, comma separated list of JMX, CONSOLE, JSON_FILE"), + HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "file:///tmp/my-logging.properties", + "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, the location of JSON metrics file. " + + "This file will get overwritten at every interval."), + HIVE_METRICS_JSON_FILE_INTERVAL("hive.service.metrics.file.frequency", "5s", + new TimeValidator(TimeUnit.MILLISECONDS), + "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, " + + "the frequency of updating JSON metrics file."), HIVE_PERF_LOGGER("hive.exec.perf.logger", "org.apache.hadoop.hive.ql.log.PerfLogger", "The class responsible for logging client side performance metrics. \n" + "Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger"), http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java ---------------------------------------------------------------------- diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java new file mode 100644 index 0000000..c14c7ee --- /dev/null +++ b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.management.Attribute; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanOperationInfo; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.LegacyMetrics.MetricsScope; +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestLegacyMetrics { + + private static final String scopeName = "foo"; + private static final long periodMs = 50L; + private static LegacyMetrics metrics; + + @Before + public void before() throws Exception { + MetricsFactory.deInit(); + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, LegacyMetrics.class.getCanonicalName()); + MetricsFactory.init(conf); + metrics = (LegacyMetrics) MetricsFactory.getMetricsInstance(); + } + + @After + public void after() throws Exception { + MetricsFactory.deInit(); + } + + @Test + public void testMetricsMBean() throws Exception { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + final ObjectName oname = new ObjectName( + "org.apache.hadoop.hive.common.metrics:type=MetricsMBean"); + MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname); + // check implementation class: + assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName()); + + // check reset operation: + MBeanOperationInfo[] oops = mBeanInfo.getOperations(); + boolean resetFound = false; + for (MBeanOperationInfo op : oops) { + if ("reset".equals(op.getName())) { + resetFound = true; + break; + } + } + assertTrue(resetFound); + + // add metric with a non-null value: + Attribute attr = new Attribute("fooMetric", Long.valueOf(-77)); + mbs.setAttribute(oname, attr); + + mBeanInfo = mbs.getMBeanInfo(oname); + MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes(); + assertEquals(1, attrinuteInfos.length); + boolean attrFound = false; + for (MBeanAttributeInfo info : attrinuteInfos) { + if ("fooMetric".equals(info.getName())) { + assertEquals("java.lang.Long", info.getType()); + assertTrue(info.isReadable()); + assertTrue(info.isWritable()); + assertFalse(info.isIs()); + + attrFound = true; + break; + } + } + assertTrue(attrFound); + + // check metric value: + Object v = mbs.getAttribute(oname, "fooMetric"); + assertEquals(Long.valueOf(-77), v); + + // reset the bean: + Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]); + assertNull(result); + + // the metric value must be zeroed: + v = mbs.getAttribute(oname, "fooMetric"); + assertEquals(Long.valueOf(0), v); + } + + private <T> void expectIOE(Callable<T> c) throws Exception { + try { + T t = c.call(); + fail("IOE expected but ["+t+"] was returned."); + } catch (IOException ioe) { + // ok, expected + } + } + + @Test + public void testScopeSingleThread() throws Exception { + metrics.startScope(scopeName); + final MetricsScope fooScope = metrics.getScope(scopeName); + // the time and number counters become available only after the 1st + // scope close: + expectIOE(new Callable<Long>() { + @Override + public Long call() throws Exception { + Long num = fooScope.getNumCounter(); + return num; + } + }); + expectIOE(new Callable<Long>() { + @Override + public Long call() throws Exception { + Long time = fooScope.getTimeCounter(); + return time; + } + }); + // cannot open scope that is already open: + expectIOE(new Callable<Void>() { + @Override + public Void call() throws Exception { + fooScope.open(); + return null; + } + }); + + assertSame(fooScope, metrics.getScope(scopeName)); + Thread.sleep(periodMs+ 1); + // 1st close: + // closing of open scope should be ok: + metrics.endScope(scopeName); + expectIOE(new Callable<Void>() { + @Override + public Void call() throws Exception { + metrics.endScope(scopeName); // closing of closed scope not allowed + return null; + } + }); + + assertEquals(Long.valueOf(1), fooScope.getNumCounter()); + final long t1 = fooScope.getTimeCounter().longValue(); + assertTrue(t1 > periodMs); + + assertSame(fooScope, metrics.getScope(scopeName)); + + // opening allowed after closing: + metrics.startScope(scopeName); + // opening of already open scope not allowed: + expectIOE(new Callable<Void>() { + @Override + public Void call() throws Exception { + metrics.startScope(scopeName); + return null; + } + }); + + assertEquals(Long.valueOf(1), fooScope.getNumCounter()); + assertEquals(t1, fooScope.getTimeCounter().longValue()); + + assertSame(fooScope, metrics.getScope(scopeName)); + Thread.sleep(periodMs + 1); + // Reopening (close + open) allowed in opened state: + fooScope.reopen(); + + assertEquals(Long.valueOf(2), fooScope.getNumCounter()); + assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs); + + Thread.sleep(periodMs + 1); + // 3rd close: + fooScope.close(); + + assertEquals(Long.valueOf(3), fooScope.getNumCounter()); + assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs); + Double avgT = (Double) metrics.get("foo.avg_t"); + assertTrue(avgT.doubleValue() > periodMs); + } + + @Test + public void testScopeConcurrency() throws Exception { + metrics.startScope(scopeName); + MetricsScope fooScope = metrics.getScope(scopeName); + final int threads = 10; + ExecutorService executorService = Executors.newFixedThreadPool(threads); + for (int i=0; i<threads; i++) { + final int n = i; + executorService.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + testScopeImpl(n); + return null; + } + }); + } + executorService.shutdown(); + assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS)); + + fooScope = metrics.getScope(scopeName); + assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter()); + assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads); + Double avgT = (Double) metrics.get("foo.avg_t"); + assertTrue(avgT.doubleValue() > periodMs); + metrics.endScope(scopeName); + } + + void testScopeImpl(int n) throws Exception { + metrics.startScope(scopeName); + final MetricsScope fooScope = metrics.getScope(scopeName); + // cannot open scope that is already open: + expectIOE(new Callable<Void>() { + @Override + public Void call() throws Exception { + fooScope.open(); + return null; + } + }); + + assertSame(fooScope, metrics.getScope(scopeName)); + Thread.sleep(periodMs+ 1); + // 1st close: + metrics.endScope(scopeName); // closing of open scope should be ok. + + assertTrue(fooScope.getNumCounter().longValue() >= 1); + final long t1 = fooScope.getTimeCounter().longValue(); + assertTrue(t1 > periodMs); + + expectIOE(new Callable<Void>() { + @Override + public Void call() throws Exception { + metrics.endScope(scopeName); // closing of closed scope not allowed + return null; + } + }); + + assertSame(fooScope, metrics.getScope(scopeName)); + + // opening allowed after closing: + metrics.startScope(scopeName); + + assertTrue(fooScope.getNumCounter().longValue() >= 1); + assertTrue(fooScope.getTimeCounter().longValue() >= t1); + + // opening of already open scope not allowed: + expectIOE(new Callable<Void>() { + @Override + public Void call() throws Exception { + metrics.startScope(scopeName); + return null; + } + }); + + assertSame(fooScope, metrics.getScope(scopeName)); + Thread.sleep(periodMs + 1); + // Reopening (close + open) allowed in opened state: + fooScope.reopen(); + + assertTrue(fooScope.getNumCounter().longValue() >= 2); + assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs); + + Thread.sleep(periodMs + 1); + // 3rd close: + fooScope.close(); + + assertTrue(fooScope.getNumCounter().longValue() >= 3); + assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs); + Double avgT = (Double) metrics.get("foo.avg_t"); + assertTrue(avgT.doubleValue() > periodMs); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java ---------------------------------------------------------------------- diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java deleted file mode 100644 index e85d3f8..0000000 --- a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.common.metrics; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import javax.management.Attribute; -import javax.management.MBeanAttributeInfo; -import javax.management.MBeanInfo; -import javax.management.MBeanOperationInfo; -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import org.apache.hadoop.hive.common.metrics.Metrics.MetricsScope; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.*; - -public class TestMetrics { - - private static final String scopeName = "foo"; - private static final long periodMs = 50L; - - @Before - public void before() throws Exception { - Metrics.uninit(); - Metrics.init(); - } - - @After - public void after() throws Exception { - Metrics.uninit(); - } - - @Test - public void testMetricsMBean() throws Exception { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - final ObjectName oname = new ObjectName( - "org.apache.hadoop.hive.common.metrics:type=MetricsMBean"); - MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname); - // check implementation class: - assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName()); - - // check reset operation: - MBeanOperationInfo[] oops = mBeanInfo.getOperations(); - boolean resetFound = false; - for (MBeanOperationInfo op : oops) { - if ("reset".equals(op.getName())) { - resetFound = true; - break; - } - } - assertTrue(resetFound); - - // add metric with a non-null value: - Attribute attr = new Attribute("fooMetric", Long.valueOf(-77)); - mbs.setAttribute(oname, attr); - - mBeanInfo = mbs.getMBeanInfo(oname); - MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes(); - assertEquals(1, attrinuteInfos.length); - boolean attrFound = false; - for (MBeanAttributeInfo info : attrinuteInfos) { - if ("fooMetric".equals(info.getName())) { - assertEquals("java.lang.Long", info.getType()); - assertTrue(info.isReadable()); - assertTrue(info.isWritable()); - assertFalse(info.isIs()); - - attrFound = true; - break; - } - } - assertTrue(attrFound); - - // check metric value: - Object v = mbs.getAttribute(oname, "fooMetric"); - assertEquals(Long.valueOf(-77), v); - - // reset the bean: - Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]); - assertNull(result); - - // the metric value must be zeroed: - v = mbs.getAttribute(oname, "fooMetric"); - assertEquals(Long.valueOf(0), v); - } - - private <T> void expectIOE(Callable<T> c) throws Exception { - try { - T t = c.call(); - fail("IOE expected but ["+t+"] was returned."); - } catch (IOException ioe) { - // ok, expected - } - } - - @Test - public void testScopeSingleThread() throws Exception { - final MetricsScope fooScope = Metrics.startScope(scopeName); - // the time and number counters become available only after the 1st - // scope close: - expectIOE(new Callable<Long>() { - @Override - public Long call() throws Exception { - Long num = fooScope.getNumCounter(); - return num; - } - }); - expectIOE(new Callable<Long>() { - @Override - public Long call() throws Exception { - Long time = fooScope.getTimeCounter(); - return time; - } - }); - // cannot open scope that is already open: - expectIOE(new Callable<Void>() { - @Override - public Void call() throws Exception { - fooScope.open(); - return null; - } - }); - - assertSame(fooScope, Metrics.getScope(scopeName)); - Thread.sleep(periodMs+1); - // 1st close: - // closing of open scope should be ok: - Metrics.endScope(scopeName); - expectIOE(new Callable<Void>() { - @Override - public Void call() throws Exception { - Metrics.endScope(scopeName); // closing of closed scope not allowed - return null; - } - }); - - assertEquals(Long.valueOf(1), fooScope.getNumCounter()); - final long t1 = fooScope.getTimeCounter().longValue(); - assertTrue(t1 > periodMs); - - assertSame(fooScope, Metrics.getScope(scopeName)); - - // opening allowed after closing: - Metrics.startScope(scopeName); - // opening of already open scope not allowed: - expectIOE(new Callable<Void>() { - @Override - public Void call() throws Exception { - Metrics.startScope(scopeName); - return null; - } - }); - - assertEquals(Long.valueOf(1), fooScope.getNumCounter()); - assertEquals(t1, fooScope.getTimeCounter().longValue()); - - assertSame(fooScope, Metrics.getScope(scopeName)); - Thread.sleep(periodMs + 1); - // Reopening (close + open) allowed in opened state: - fooScope.reopen(); - - assertEquals(Long.valueOf(2), fooScope.getNumCounter()); - assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs); - - Thread.sleep(periodMs + 1); - // 3rd close: - fooScope.close(); - - assertEquals(Long.valueOf(3), fooScope.getNumCounter()); - assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs); - Double avgT = (Double)Metrics.get("foo.avg_t"); - assertTrue(avgT.doubleValue() > periodMs); - } - - @Test - public void testScopeConcurrency() throws Exception { - MetricsScope fooScope = Metrics.startScope(scopeName); - final int threads = 10; - ExecutorService executorService = Executors.newFixedThreadPool(threads); - for (int i=0; i<threads; i++) { - final int n = i; - executorService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - testScopeImpl(n); - return null; - } - }); - } - executorService.shutdown(); - assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS)); - - fooScope = Metrics.getScope(scopeName); - assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter()); - assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads); - Double avgT = (Double)Metrics.get("foo.avg_t"); - assertTrue(avgT.doubleValue() > periodMs); - Metrics.endScope(scopeName); - } - - void testScopeImpl(int n) throws Exception { - final MetricsScope fooScope = Metrics.startScope(scopeName); - // cannot open scope that is already open: - expectIOE(new Callable<Void>() { - @Override - public Void call() throws Exception { - fooScope.open(); - return null; - } - }); - - assertSame(fooScope, Metrics.getScope(scopeName)); - Thread.sleep(periodMs+1); - // 1st close: - Metrics.endScope(scopeName); // closing of open scope should be ok. - - assertTrue(fooScope.getNumCounter().longValue() >= 1); - final long t1 = fooScope.getTimeCounter().longValue(); - assertTrue(t1 > periodMs); - - expectIOE(new Callable<Void>() { - @Override - public Void call() throws Exception { - Metrics.endScope(scopeName); // closing of closed scope not allowed - return null; - } - }); - - assertSame(fooScope, Metrics.getScope(scopeName)); - - // opening allowed after closing: - Metrics.startScope(scopeName); - - assertTrue(fooScope.getNumCounter().longValue() >= 1); - assertTrue(fooScope.getTimeCounter().longValue() >= t1); - - // opening of already open scope not allowed: - expectIOE(new Callable<Void>() { - @Override - public Void call() throws Exception { - Metrics.startScope(scopeName); - return null; - } - }); - - assertSame(fooScope, Metrics.getScope(scopeName)); - Thread.sleep(periodMs + 1); - // Reopening (close + open) allowed in opened state: - fooScope.reopen(); - - assertTrue(fooScope.getNumCounter().longValue() >= 2); - assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs); - - Thread.sleep(periodMs + 1); - // 3rd close: - fooScope.close(); - - assertTrue(fooScope.getNumCounter().longValue() >= 3); - assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs); - Double avgT = (Double)Metrics.get("foo.avg_t"); - assertTrue(avgT.doubleValue() > periodMs); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java ---------------------------------------------------------------------- diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java new file mode 100644 index 0000000..8749349 --- /dev/null +++ b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +/** + * Unit test for new Metrics subsystem. + */ +public class TestCodahaleMetrics { + + private static File workDir = new File(System.getProperty("test.tmp.dir")); + private static File jsonReportFile; + public static MetricRegistry metricRegistry; + + @Before + public void before() throws Exception { + HiveConf conf = new HiveConf(); + + jsonReportFile = new File(workDir, "json_reporting"); + jsonReportFile.delete(); + String defaultFsName = ShimLoader.getHadoopShims().getHadoopConfNames().get("HADOOPFS"); + conf.set(defaultFsName, "local"); + conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, CodahaleMetrics.class.getCanonicalName()); + conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name()); + conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString()); + conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms"); + + MetricsFactory.init(conf); + metricRegistry = ((CodahaleMetrics) MetricsFactory.getMetricsInstance()).getMetricRegistry(); + } + + @After + public void after() throws Exception { + MetricsFactory.deInit(); + } + + @Test + public void testScope() throws Exception { + int runs = 5; + for (int i = 0; i < runs; i++) { + MetricsFactory.getMetricsInstance().startScope("method1"); + MetricsFactory.getMetricsInstance().endScope("method1"); + } + + Timer timer = metricRegistry.getTimers().get("api_method1"); + Assert.assertEquals(5, timer.getCount()); + Assert.assertTrue(timer.getMeanRate() > 0); + } + + + @Test + public void testCount() throws Exception { + int runs = 5; + for (int i = 0; i < runs; i++) { + MetricsFactory.getMetricsInstance().incrementCounter("count1"); + } + Counter counter = metricRegistry.getCounters().get("count1"); + Assert.assertEquals(5L, counter.getCount()); + } + + @Test + public void testConcurrency() throws Exception { + int threads = 4; + ExecutorService executorService = Executors.newFixedThreadPool(threads); + for (int i=0; i< threads; i++) { + final int n = i; + executorService.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + MetricsFactory.getMetricsInstance().startScope("method2"); + MetricsFactory.getMetricsInstance().endScope("method2"); + return null; + } + }); + } + executorService.shutdown(); + assertTrue(executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)); + Timer timer = metricRegistry.getTimers().get("api_method2"); + Assert.assertEquals(4, timer.getCount()); + Assert.assertTrue(timer.getMeanRate() > 0); + } + + @Test + public void testFileReporting() throws Exception { + int runs = 5; + for (int i = 0; i < runs; i++) { + MetricsFactory.getMetricsInstance().incrementCounter("count2"); + Thread.sleep(100); + } + + Thread.sleep(2000); + byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); + ObjectMapper objectMapper = new ObjectMapper(); + + JsonNode rootNode = objectMapper.readTree(jsonData); + JsonNode countersNode = rootNode.path("counters"); + JsonNode methodCounterNode = countersNode.path("count2"); + JsonNode countNode = methodCounterNode.path("count"); + Assert.assertEquals(countNode.asInt(), 5); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java new file mode 100644 index 0000000..25f34d1 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import junit.framework.TestCase; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * Tests Hive Metastore Metrics. + */ +public class TestMetaStoreMetrics { + + private static File workDir = new File(System.getProperty("test.tmp.dir")); + private static File jsonReportFile; + + private static HiveConf hiveConf; + private static Driver driver; + + + @Before + public void before() throws Exception { + + int port = MetaStoreUtils.findFreePort(); + + jsonReportFile = new File(workDir, "json_reporting"); + jsonReportFile.delete(); + + hiveConf = new HiveConf(TestMetaStoreMetrics.class); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); + hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, true); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name()); + hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString()); + hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms"); + + MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf); + + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + } + + @Test + public void testMetricsFile() throws Exception { + driver.run("show databases"); + + //give timer thread a chance to print the metrics + Thread.sleep(2000); + + //As the file is being written, try a few times. + //This can be replaced by CodahaleMetrics's JsonServlet reporter once it is exposed. + byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); + ObjectMapper objectMapper = new ObjectMapper(); + + JsonNode rootNode = objectMapper.readTree(jsonData); + JsonNode countersNode = rootNode.path("timers"); + JsonNode methodCounterNode = countersNode.path("api_get_all_databases"); + JsonNode countNode = methodCounterNode.path("count"); + Assert.assertTrue(countNode.asInt() > 0); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a96fbdee/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index d81c856..1688920 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -18,39 +18,14 @@ package org.apache.hadoop.hive.metastore; -import static org.apache.commons.lang.StringUtils.join; -import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_COMMENT; -import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; -import static org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName; - -import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Formatter; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.Timer; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Pattern; - -import javax.jdo.JDOException; - +import com.facebook.fb303.FacebookBase; +import com.facebook.fb303.fb_status; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimaps; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -58,12 +33,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.JvmPauseMonitor; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.common.cli.CommonCliOptions; -import org.apache.hadoop.hive.common.metrics.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; @@ -221,14 +197,35 @@ import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportFactory; -import com.facebook.fb303.FacebookBase; -import com.facebook.fb303.fb_status; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimaps; +import javax.jdo.JDOException; +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Formatter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.Timer; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; + +import static org.apache.commons.lang.StringUtils.join; +import static org.apache.hadoop.hive.metastore.MetaStoreUtils.*; /** * TODO:pc remove application logic to a separate interface. @@ -464,9 +461,10 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } - if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) { + //Start Metrics for Embedded mode + if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { try { - Metrics.init(); + MetricsFactory.init(hiveConf); } catch (Exception e) { // log exception, but ignore inability to start LOG.error("error in Metrics init: " + e.getClass().getName() + " " @@ -750,11 +748,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { incrementCounter(function); logInfo((getIpAddress() == null ? "" : "source:" + getIpAddress() + " ") + function + extraLogInfo); - try { - Metrics.startScope(function); - } catch (IOException e) { - LOG.debug("Exception when starting metrics scope" + if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { + try { + MetricsFactory.getMetricsInstance().startScope(function); + } catch (IOException e) { + LOG.debug("Exception when starting metrics scope" + e.getClass().getName() + " " + e.getMessage(), e); + } } return function; } @@ -792,10 +792,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { } private void endFunction(String function, MetaStoreEndFunctionContext context) { - try { - Metrics.endScope(function); - } catch (IOException e) { - LOG.debug("Exception when closing metrics scope" + e); + if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { + try { + MetricsFactory.getMetricsInstance().endScope(function); + } catch (IOException e) { + LOG.debug("Exception when closing metrics scope" + e); + } } for (MetaStoreEndFunctionListener listener : endFunctionListeners) { @@ -819,6 +821,14 @@ public class HiveMetaStore extends ThriftHiveMetastore { threadLocalMS.remove(); } } + if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { + try { + MetricsFactory.deInit(); + } catch (Exception e) { + LOG.error("error in Metrics deinit: " + e.getClass().getName() + " " + + e.getMessage(), e); + } + } logInfo("Metastore shutdown complete."); } @@ -5901,6 +5911,16 @@ public class HiveMetaStore extends ThriftHiveMetastore { } }); + //Start Metrics for Standalone (Remote) Mode + if (conf.getBoolVar(ConfVars.METASTORE_METRICS)) { + try { + MetricsFactory.init(conf); + } catch (Exception e) { + // log exception, but ignore inability to start + LOG.error("error in Metrics init: " + e.getClass().getName() + " " + + e.getMessage(), e); + } + } Lock startLock = new ReentrantLock(); Condition startCondition = startLock.newCondition(); @@ -6091,7 +6111,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { // Wrap the start of the threads in a catch Throwable loop so that any failures // don't doom the rest of the metastore. startLock.lock(); - ShimLoader.getHadoopShims().startPauseMonitor(conf); + try { + JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf); + pauseMonitor.start(); + } catch (Throwable t) { + LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " + + "warned upon.", t); + } try { // Per the javadocs on Condition, do not depend on the condition alone as a start gate