HIVE-14358: Add metrics for number of queries executed for each execution engine (Barna Zsombor Klara, reviewed by Gabor Szadovszky, Yongzhi Chen)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0c55d46f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0c55d46f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0c55d46f Branch: refs/heads/repl2 Commit: 0c55d46f2afdc7c282304839a10ac39221520316 Parents: 737fd09 Author: Yongzhi Chen <ych...@apache.org> Authored: Mon Sep 26 13:55:28 2016 -0400 Committer: Yongzhi Chen <ych...@apache.org> Committed: Tue Sep 27 09:23:16 2016 -0400 ---------------------------------------------------------------------- .../common/metrics/common/MetricsConstant.java | 7 +++ .../java/org/apache/hadoop/hive/ql/Driver.java | 5 +++ .../org/apache/hadoop/hive/ql/exec/Task.java | 9 ++++ .../hadoop/hive/ql/exec/mr/MapRedTask.java | 11 +++++ .../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 11 +++++ .../hadoop/hive/ql/exec/spark/SparkTask.java | 11 +++++ .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 11 +++++ .../hadoop/hive/ql/exec/mr/TestMapRedTask.java | 47 ++++++++++++++++++++ .../hive/ql/exec/mr/TestMapredLocalTask.java | 46 +++++++++++++++++++ .../hive/ql/exec/spark/TestSparkTask.java | 46 +++++++++++++++++++ .../hadoop/hive/ql/exec/tez/TestTezTask.java | 17 +++++++ 11 files changed, 221 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0c55d46f/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java index 9dc96f9..c9d4087 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java @@ -61,4 +61,11 @@ public class MetricsConstant { // The number of Hive operations that are waiting to enter the compile block public static final String WAITING_COMPILE_OPS = "waiting_compile_ops"; + // The number of map reduce tasks executed by the HiveServer2 since the last restart + public static final String HIVE_MR_TASKS = "hive_mapred_tasks"; + // The number of spark tasks executed by the HiveServer2 since the last restart + public static final String HIVE_SPARK_TASKS = "hive_spark_tasks"; + // The number of tez tasks executed by the HiveServer2 since the last restart + public static final String HIVE_TEZ_TASKS = "hive_tez_tasks"; + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/0c55d46f/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 42d398d..03c56e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1671,6 +1671,11 @@ public class Driver implements CommandProcessor { // incorrect results. assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty(); driverCxt.addToRunnable(tsk); + + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + tsk.updateTaskMetrics(metrics); + } } perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); http://git-wip-us.apache.org/repos/asf/hive/blob/0c55d46f/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index eeaa543..e1bd291 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -27,6 +27,8 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; @@ -534,6 +536,13 @@ public abstract class Task<T extends Serializable> implements Serializable, Node } } + /** + * Provide metrics on the type and number of tasks executed by the HiveServer + * @param metrics + */ + public void updateTaskMetrics(Metrics metrics) { + // no metrics gathered by default + } public int getTaskTag() { return taskTag; http://git-wip-us.apache.org/repos/asf/hive/blob/0c55d46f/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index ce1106d9..f48d511 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -33,6 +33,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.CachingPrintStream; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; @@ -371,6 +373,15 @@ public class MapRedTask extends ExecDriver implements Serializable { return runningViaChild ? done() : b; } + @Override + public void updateTaskMetrics(Metrics metrics) { + try { + metrics.incrementCounter(MetricsConstant.HIVE_MR_TASKS); + } catch (IOException ex) { + LOG.warn("Could not increment metrics for " + this, ex); + } + } + /** * Set the number of reducers for the mapred work. */ http://git-wip-us.apache.org/repos/asf/hive/blob/0c55d46f/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 48d2540..f81fc71 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -43,6 +43,8 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.io.CachingPrintStream; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.CompilationOpContext; @@ -123,6 +125,15 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab } @Override + public void updateTaskMetrics(Metrics metrics) { + try { + metrics.incrementCounter(MetricsConstant.HIVE_MR_TASKS); + } catch (IOException ex) { + LOG.warn("Could not increment metrics for " + this, ex); + } + } + + @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, CompilationOpContext opContext) { super.initialize(queryState, queryPlan, driverContext, opContext); http://git-wip-us.apache.org/repos/asf/hive/blob/0c55d46f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 0b494aa..72c8bf7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -26,6 +26,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -180,6 +182,15 @@ public class SparkTask extends Task<SparkWork> { } @Override + public void updateTaskMetrics(Metrics metrics) { + try { + metrics.incrementCounter(MetricsConstant.HIVE_SPARK_TASKS); + } catch (IOException ex) { + LOG.warn("Could not increment metrics for " + this, ex); + } + } + + @Override public boolean isMapRedTask() { return true; } http://git-wip-us.apache.org/repos/asf/hive/blob/0c55d46f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 25c4514..c51c92f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -35,6 +35,8 @@ import javax.annotation.Nullable; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; @@ -529,6 +531,15 @@ public class TezTask extends Task<TezWork> { } @Override + public void updateTaskMetrics(Metrics metrics) { + try { + metrics.incrementCounter(MetricsConstant.HIVE_TEZ_TASKS); + } catch (IOException ex) { + LOG.warn("Could not increment metrics for " + this, ex); + } + } + + @Override public boolean isMapRedTask() { return true; } http://git-wip-us.apache.org/repos/asf/hive/blob/0c55d46f/ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapRedTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapRedTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapRedTask.java new file mode 100644 index 0000000..5ec7c0d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapRedTask.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.mr; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestMapRedTask { + + @Test + public void mrTask_updates_Metrics() throws IOException { + + Metrics mockMetrics = Mockito.mock(Metrics.class); + + MapRedTask mapRedTask = new MapRedTask(); + mapRedTask.updateTaskMetrics(mockMetrics); + + verify(mockMetrics, times(1)).incrementCounter(MetricsConstant.HIVE_MR_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_TEZ_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_SPARK_TASKS); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/0c55d46f/ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapredLocalTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapredLocalTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapredLocalTask.java new file mode 100644 index 0000000..4a0fafe --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapredLocalTask.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.mr; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestMapredLocalTask { + + @Test + public void localMRTask_updates_Metrics() throws IOException { + + Metrics mockMetrics = Mockito.mock(Metrics.class); + + MapredLocalTask localMrTask = new MapredLocalTask(); + localMrTask.updateTaskMetrics(mockMetrics); + + verify(mockMetrics, times(1)).incrementCounter(MetricsConstant.HIVE_MR_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_TEZ_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_SPARK_TASKS); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/0c55d46f/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java new file mode 100644 index 0000000..4c7ec76 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestSparkTask { + + @Test + public void sparkTask_updates_Metrics() throws IOException { + + Metrics mockMetrics = Mockito.mock(Metrics.class); + + SparkTask sparkTask = new SparkTask(); + sparkTask.updateTaskMetrics(mockMetrics); + + verify(mockMetrics, times(1)).incrementCounter(MetricsConstant.HIVE_SPARK_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_TEZ_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_MR_TASKS); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/0c55d46f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 53672a9..5c012f3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.never; import java.io.IOException; import java.util.ArrayList; @@ -39,6 +40,8 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.Operator; @@ -67,6 +70,7 @@ import org.apache.tez.dag.api.client.DAGClient; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -326,4 +330,17 @@ public class TestTezTask { heapSize = DagUtils.parseRightmostXmx(javaOpts); assertEquals("Unexpected maximum heap size", -1, heapSize); } + + @Test + public void tezTask_updates_Metrics() throws IOException { + + Metrics mockMetrics = Mockito.mock(Metrics.class); + + TezTask tezTask = new TezTask(); + tezTask.updateTaskMetrics(mockMetrics); + + verify(mockMetrics, times(1)).incrementCounter(MetricsConstant.HIVE_TEZ_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_SPARK_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_MR_TASKS); + } }