Repository: hive
Updated Branches:
  refs/heads/master c37641840 -> 2663f4929


HIVE-12987: Add metrics for HS2 active users and SQL operations(Jimmy, reviewed 
by Szehon, Aihua)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2663f492
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2663f492
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2663f492

Branch: refs/heads/master
Commit: 2663f4929761afa54d2c83172a8b300802a0c58e
Parents: c376418
Author: Jimmy Xiang <jxi...@apache.org>
Authored: Tue Feb 9 19:57:36 2016 -0800
Committer: Jimmy Xiang <jxi...@apache.org>
Committed: Tue Feb 9 19:57:36 2016 -0800

----------------------------------------------------------------------
 .../common/metrics/common/MetricsConstant.java  |  3 +
 .../hive/jdbc/miniHS2/TestHs2Metrics.java       |  5 ++
 .../hive/service/cli/operation/Operation.java   | 50 ++++++++-------
 .../service/cli/operation/SQLOperation.java     | 66 +++++++++++++++++++-
 4 files changed, 102 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2663f492/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
----------------------------------------------------------------------
diff --git 
a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
 
b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
index e5247c8..65b914c 100644
--- 
a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
+++ 
b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
@@ -41,6 +41,9 @@ public class MetricsConstant {
   public static final String OPERATION_PREFIX = "hs2_operation_";
   public static final String COMPLETED_OPERATION_PREFIX = 
"hs2_completed_operation_";
 
+  public static final String SQL_OPERATION_PREFIX = "hs2_sql_operation_";
+  public static final String COMPLETED_SQL_OPERATION_PREFIX = 
"hs2_completed_sql_operation_";
+
   public static final String INIT_TOTAL_DATABASES = "init_total_count_dbs";
   public static final String INIT_TOTAL_TABLES = "init_total_count_tables";
   public static final String INIT_TOTAL_PARTITIONS = 
"init_total_count_partitions";

http://git-wip-us.apache.org/repos/asf/hive/blob/2663f492/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java
index 0b88936..6a98968 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java
@@ -57,6 +57,7 @@ public class TestHs2Metrics {
         MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, 
"active_calls_api_semanticAnalyze", 1);
         MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, 
"active_calls_api_compile", 1);
         MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, 
"active_calls_api_hs2_operation_RUNNING", 1);
+        MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, 
"active_calls_api_hs2_sql_operation_RUNNING", 1);
       } catch (Exception e) {
         throw new SemanticException("metrics verification failed", e);
       }
@@ -101,12 +102,16 @@ public class TestHs2Metrics {
     MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, 
"api_hs2_operation_PENDING", 1);
     MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, 
"api_hs2_operation_RUNNING", 1);
     MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, 
"hs2_completed_operation_FINISHED", 1);
+    MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, 
"api_hs2_sql_operation_PENDING", 1);
+    MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, 
"api_hs2_sql_operation_RUNNING", 1);
+    MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, 
"hs2_completed_sql_operation_FINISHED", 1);
     MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, 
"api_Driver.run", 1);
 
     //but there should be no more active calls.
     MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, 
"active_calls_api_semanticAnalyze", 0);
     MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, 
"active_calls_api_compile", 0);
     MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, 
"active_calls_api_hs2_operation_RUNNING", 0);
+    MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, 
"active_calls_api_hs2_sql_operation_RUNNING", 0);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2663f492/service/src/java/org/apache/hive/service/cli/operation/Operation.java
----------------------------------------------------------------------
diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/Operation.java 
b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 8340202..22f725c 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -58,8 +58,8 @@ public abstract class Operation {
   public static final String QUERYID_LOG_KEY = "queryId";
 
   protected final HiveSession parentSession;
-  private OperationState state = OperationState.INITIALIZED;
-  private MetricsScope currentStateScope;
+  private volatile OperationState state = OperationState.INITIALIZED;
+  private volatile MetricsScope currentStateScope;
   private final OperationHandle opHandle;
   private HiveConf configuration;
   public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = 
FetchOrientation.FETCH_NEXT;
@@ -155,9 +155,10 @@ public abstract class Operation {
 
   protected final OperationState setState(OperationState newState) throws 
HiveSQLException {
     state.validateTransition(newState);
+    OperationState prevState = state;
     this.state = newState;
     setMetrics(state);
-    onNewState(state);
+    onNewState(state, prevState);
     this.lastAccessTime = System.currentTimeMillis();
     return this.state;
   }
@@ -394,24 +395,31 @@ public abstract class Operation {
     OperationState.UNKNOWN
   );
 
-  protected void setMetrics(OperationState state) {
-     Metrics metrics = MetricsFactory.getInstance();
-     if (metrics != null) {
-       try {
-         if (currentStateScope != null) {
-           metrics.endScope(currentStateScope);
-           currentStateScope = null;
-         }
-         if (scopeStates.contains(state)) {
-           currentStateScope = 
metrics.createScope(MetricsConstant.OPERATION_PREFIX + state.toString());
-         }
-         if (terminalStates.contains(state)) {
-           metrics.incrementCounter(MetricsConstant.COMPLETED_OPERATION_PREFIX 
+ state.toString());
-         }
-       } catch (IOException e) {
-         LOG.warn("Error metrics", e);
-       }
+  private void setMetrics(OperationState state) {
+    currentStateScope = setMetrics(currentStateScope, 
MetricsConstant.OPERATION_PREFIX,
+      MetricsConstant.COMPLETED_OPERATION_PREFIX, state);
+  }
+
+  protected static MetricsScope setMetrics(MetricsScope stateScope, String 
operationPrefix,
+      String completedOperationPrefix, OperationState state) {
+    Metrics metrics = MetricsFactory.getInstance();
+    if (metrics != null) {
+      try {
+        if (stateScope != null) {
+          metrics.endScope(stateScope);
+          stateScope = null;
+        }
+        if (scopeStates.contains(state)) {
+          stateScope = metrics.createScope(operationPrefix + state);
+        }
+        if (terminalStates.contains(state)) {
+          metrics.incrementCounter(completedOperationPrefix + state);
+        }
+      } catch (IOException e) {
+        LOG.warn("Error metrics", e);
+      }
     }
+    return stateScope;
   }
 
   public long getBeginTime() {
@@ -422,6 +430,6 @@ public abstract class Operation {
     return state;
   }
 
-  protected void onNewState(OperationState state) {
+  protected void onNewState(OperationState state, OperationState prevState) {
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2663f492/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java 
b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 3fbbb70..100dc6a 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -25,14 +25,20 @@ import java.io.UnsupportedEncodingException;
 import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.CharEncoding;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.common.MetricsScope;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Schema;
@@ -68,6 +74,7 @@ import 
org.apache.hive.service.server.ThreadWithGarbageCleanup;
  * SQLOperation.
  *
  */
+@SuppressWarnings("deprecation")
 public class SQLOperation extends ExecuteStatementOperation {
 
   private Driver driver = null;
@@ -76,10 +83,16 @@ public class SQLOperation extends ExecuteStatementOperation 
{
   private Schema mResultSchema = null;
   private SerDe serde = null;
   private boolean fetchStarted = false;
+  private volatile MetricsScope currentSQLStateScope;
 
   //Display for WebUI.
   private SQLOperationDisplay sqlOpDisplay;
 
+  /**
+   * A map to track query count running by each user
+   */
+  private static Map<String, AtomicInteger> userQueries = new HashMap<String, 
AtomicInteger>();
+  private static final String ACTIVE_SQL_USER = 
MetricsConstant.SQL_OPERATION_PREFIX + "active_user";
 
   public SQLOperation(HiveSession parentSession, String statement, Map<String,
       String> confOverlay, boolean runInBackground) {
@@ -494,7 +507,26 @@ public class SQLOperation extends 
ExecuteStatementOperation {
   }
 
   @Override
-  protected void onNewState(OperationState state) {
+  protected void onNewState(OperationState state, OperationState prevState) {
+    currentSQLStateScope = setMetrics(currentSQLStateScope, 
MetricsConstant.SQL_OPERATION_PREFIX,
+      MetricsConstant.COMPLETED_SQL_OPERATION_PREFIX, state);
+
+    Metrics metrics = MetricsFactory.getInstance();
+    if (metrics != null) {
+      try {
+        // New state is changed to running from something else (user is active)
+        if (state == OperationState.RUNNING && prevState != state) {
+          incrementUserQueries(metrics);
+        }
+        // New state is not running (user not active) any more
+        if (prevState == OperationState.RUNNING && prevState != state) {
+          decrementUserQueries(metrics);
+        }
+      } catch (IOException e) {
+        LOG.warn("Error metrics", e);
+      }
+    }
+
     if (state == OperationState.CLOSED) {
       sqlOpDisplay.closed();
     } else {
@@ -502,4 +534,36 @@ public class SQLOperation extends 
ExecuteStatementOperation {
       sqlOpDisplay.updateState(state);
     }
   }
+
+  private void incrementUserQueries(Metrics metrics) throws IOException {
+    String username = parentSession.getUserName();
+    if (username != null) {
+      synchronized (userQueries) {
+        AtomicInteger count = userQueries.get(username);
+        if (count == null) {
+          count = new AtomicInteger(0);
+          AtomicInteger prev = userQueries.put(username, count);
+          if (prev == null) {
+            metrics.incrementCounter(ACTIVE_SQL_USER);
+          } else {
+            count = prev;
+          }
+        }
+        count.incrementAndGet();
+      }
+    }
+  }
+
+  private void decrementUserQueries(Metrics metrics) throws IOException {
+    String username = parentSession.getUserName();
+    if (username != null) {
+      synchronized (userQueries) {
+        AtomicInteger count = userQueries.get(username);
+        if (count != null && count.decrementAndGet() <= 0) {
+          metrics.decrementCounter(ACTIVE_SQL_USER);
+          userQueries.remove(username);
+        }
+      }
+    }
+  }
 }

Reply via email to