Repository: hive Updated Branches: refs/heads/master c37641840 -> 2663f4929
HIVE-12987: Add metrics for HS2 active users and SQL operations(Jimmy, reviewed by Szehon, Aihua) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2663f492 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2663f492 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2663f492 Branch: refs/heads/master Commit: 2663f4929761afa54d2c83172a8b300802a0c58e Parents: c376418 Author: Jimmy Xiang <jxi...@apache.org> Authored: Tue Feb 9 19:57:36 2016 -0800 Committer: Jimmy Xiang <jxi...@apache.org> Committed: Tue Feb 9 19:57:36 2016 -0800 ---------------------------------------------------------------------- .../common/metrics/common/MetricsConstant.java | 3 + .../hive/jdbc/miniHS2/TestHs2Metrics.java | 5 ++ .../hive/service/cli/operation/Operation.java | 50 ++++++++------- .../service/cli/operation/SQLOperation.java | 66 +++++++++++++++++++- 4 files changed, 102 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2663f492/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java index e5247c8..65b914c 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java @@ -41,6 +41,9 @@ public class MetricsConstant { public static final String OPERATION_PREFIX = "hs2_operation_"; public static final String COMPLETED_OPERATION_PREFIX = "hs2_completed_operation_"; + public static final String SQL_OPERATION_PREFIX = "hs2_sql_operation_"; + public static final String COMPLETED_SQL_OPERATION_PREFIX = "hs2_completed_sql_operation_"; + public static final String INIT_TOTAL_DATABASES = "init_total_count_dbs"; public static final String INIT_TOTAL_TABLES = "init_total_count_tables"; public static final String INIT_TOTAL_PARTITIONS = "init_total_count_partitions"; http://git-wip-us.apache.org/repos/asf/hive/blob/2663f492/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java index 0b88936..6a98968 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java @@ -57,6 +57,7 @@ public class TestHs2Metrics { MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_semanticAnalyze", 1); MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_compile", 1); MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_hs2_operation_RUNNING", 1); + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_hs2_sql_operation_RUNNING", 1); } catch (Exception e) { throw new SemanticException("metrics verification failed", e); } @@ -101,12 +102,16 @@ public class TestHs2Metrics { MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, "api_hs2_operation_PENDING", 1); MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, "api_hs2_operation_RUNNING", 1); MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "hs2_completed_operation_FINISHED", 1); + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, "api_hs2_sql_operation_PENDING", 1); + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, "api_hs2_sql_operation_RUNNING", 1); + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "hs2_completed_sql_operation_FINISHED", 1); MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, "api_Driver.run", 1); //but there should be no more active calls. MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_semanticAnalyze", 0); MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_compile", 0); MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_hs2_operation_RUNNING", 0); + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_hs2_sql_operation_RUNNING", 0); } } http://git-wip-us.apache.org/repos/asf/hive/blob/2663f492/service/src/java/org/apache/hive/service/cli/operation/Operation.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 8340202..22f725c 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -58,8 +58,8 @@ public abstract class Operation { public static final String QUERYID_LOG_KEY = "queryId"; protected final HiveSession parentSession; - private OperationState state = OperationState.INITIALIZED; - private MetricsScope currentStateScope; + private volatile OperationState state = OperationState.INITIALIZED; + private volatile MetricsScope currentStateScope; private final OperationHandle opHandle; private HiveConf configuration; public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT; @@ -155,9 +155,10 @@ public abstract class Operation { protected final OperationState setState(OperationState newState) throws HiveSQLException { state.validateTransition(newState); + OperationState prevState = state; this.state = newState; setMetrics(state); - onNewState(state); + onNewState(state, prevState); this.lastAccessTime = System.currentTimeMillis(); return this.state; } @@ -394,24 +395,31 @@ public abstract class Operation { OperationState.UNKNOWN ); - protected void setMetrics(OperationState state) { - Metrics metrics = MetricsFactory.getInstance(); - if (metrics != null) { - try { - if (currentStateScope != null) { - metrics.endScope(currentStateScope); - currentStateScope = null; - } - if (scopeStates.contains(state)) { - currentStateScope = metrics.createScope(MetricsConstant.OPERATION_PREFIX + state.toString()); - } - if (terminalStates.contains(state)) { - metrics.incrementCounter(MetricsConstant.COMPLETED_OPERATION_PREFIX + state.toString()); - } - } catch (IOException e) { - LOG.warn("Error metrics", e); - } + private void setMetrics(OperationState state) { + currentStateScope = setMetrics(currentStateScope, MetricsConstant.OPERATION_PREFIX, + MetricsConstant.COMPLETED_OPERATION_PREFIX, state); + } + + protected static MetricsScope setMetrics(MetricsScope stateScope, String operationPrefix, + String completedOperationPrefix, OperationState state) { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + try { + if (stateScope != null) { + metrics.endScope(stateScope); + stateScope = null; + } + if (scopeStates.contains(state)) { + stateScope = metrics.createScope(operationPrefix + state); + } + if (terminalStates.contains(state)) { + metrics.incrementCounter(completedOperationPrefix + state); + } + } catch (IOException e) { + LOG.warn("Error metrics", e); + } } + return stateScope; } public long getBeginTime() { @@ -422,6 +430,6 @@ public abstract class Operation { return state; } - protected void onNewState(OperationState state) { + protected void onNewState(OperationState state, OperationState prevState) { } } http://git-wip-us.apache.org/repos/asf/hive/blob/2663f492/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 3fbbb70..100dc6a 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -25,14 +25,20 @@ import java.io.UnsupportedEncodingException; import java.security.PrivilegedExceptionAction; import java.sql.SQLException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.CharEncoding; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.common.MetricsScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; @@ -68,6 +74,7 @@ import org.apache.hive.service.server.ThreadWithGarbageCleanup; * SQLOperation. * */ +@SuppressWarnings("deprecation") public class SQLOperation extends ExecuteStatementOperation { private Driver driver = null; @@ -76,10 +83,16 @@ public class SQLOperation extends ExecuteStatementOperation { private Schema mResultSchema = null; private SerDe serde = null; private boolean fetchStarted = false; + private volatile MetricsScope currentSQLStateScope; //Display for WebUI. private SQLOperationDisplay sqlOpDisplay; + /** + * A map to track query count running by each user + */ + private static Map<String, AtomicInteger> userQueries = new HashMap<String, AtomicInteger>(); + private static final String ACTIVE_SQL_USER = MetricsConstant.SQL_OPERATION_PREFIX + "active_user"; public SQLOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay, boolean runInBackground) { @@ -494,7 +507,26 @@ public class SQLOperation extends ExecuteStatementOperation { } @Override - protected void onNewState(OperationState state) { + protected void onNewState(OperationState state, OperationState prevState) { + currentSQLStateScope = setMetrics(currentSQLStateScope, MetricsConstant.SQL_OPERATION_PREFIX, + MetricsConstant.COMPLETED_SQL_OPERATION_PREFIX, state); + + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + try { + // New state is changed to running from something else (user is active) + if (state == OperationState.RUNNING && prevState != state) { + incrementUserQueries(metrics); + } + // New state is not running (user not active) any more + if (prevState == OperationState.RUNNING && prevState != state) { + decrementUserQueries(metrics); + } + } catch (IOException e) { + LOG.warn("Error metrics", e); + } + } + if (state == OperationState.CLOSED) { sqlOpDisplay.closed(); } else { @@ -502,4 +534,36 @@ public class SQLOperation extends ExecuteStatementOperation { sqlOpDisplay.updateState(state); } } + + private void incrementUserQueries(Metrics metrics) throws IOException { + String username = parentSession.getUserName(); + if (username != null) { + synchronized (userQueries) { + AtomicInteger count = userQueries.get(username); + if (count == null) { + count = new AtomicInteger(0); + AtomicInteger prev = userQueries.put(username, count); + if (prev == null) { + metrics.incrementCounter(ACTIVE_SQL_USER); + } else { + count = prev; + } + } + count.incrementAndGet(); + } + } + } + + private void decrementUserQueries(Metrics metrics) throws IOException { + String username = parentSession.getUserName(); + if (username != null) { + synchronized (userQueries) { + AtomicInteger count = userQueries.get(username); + if (count != null && count.decrementAndGet() <= 0) { + metrics.decrementCounter(ACTIVE_SQL_USER); + userQueries.remove(username); + } + } + } + } }