Repository: hive
Updated Branches:
  refs/heads/master 152ee221e -> a284df1f8


HIVE-16601: Display Session Id and Query Name / Id in Spark UI (Sahil Takiar, 
reviewed by Barna Zsombor Klara, Peter Vary, Xuefu Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a284df1f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a284df1f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a284df1f

Branch: refs/heads/master
Commit: a284df1f87eccee8bdad04afea2150e6c07337a0
Parents: 152ee22
Author: Sahil Takiar <takiar.sa...@gmail.com>
Authored: Tue Oct 24 08:27:24 2017 -0700
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Tue Oct 24 08:27:24 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  3 +-
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  6 ++-
 .../apache/hadoop/hive/ql/exec/DagUtils.java    | 39 ++++++++++++++++++++
 .../ql/exec/spark/HiveSparkClientFactory.java   | 16 ++++++--
 .../ql/exec/spark/RemoteHiveSparkClient.java    |  3 ++
 .../ql/exec/spark/session/SparkSessionImpl.java |  2 +-
 .../spark/session/SparkSessionManagerImpl.java  |  2 +-
 .../apache/hadoop/hive/ql/plan/SparkWork.java   | 15 +++++---
 ql/src/test/queries/clientpositive/parallel.q   |  2 +-
 9 files changed, 73 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a6ecb37..62dcbd5 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1510,7 +1510,8 @@ public class HiveConf extends Configuration {
         "Whether to grant access to the hs2/hive user for queries"),
     HIVEQUERYNAME ("hive.query.name", null,
         "This named is used by Tez to set the dag name. This name in turn will 
appear on \n" +
-        "the Tez UI representing the work that was done."),
+        "the Tez UI representing the work that was done. Used by Spark to set 
the query name, will show up in the\n" +
+        "Spark UI."),
 
     HIVEOPTIMIZEBUCKETINGSORTING("hive.optimize.bucketingsorting", true,
         "Don't create a reducer for enforcing \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index b7f76ab..6c6ad92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -132,6 +132,8 @@ import com.google.common.collect.Sets;
 
 public class Driver implements CommandProcessor {
 
+  public static final String MAPREDUCE_WORKFLOW_NODE_NAME = 
"mapreduce.workflow.node.name";
+
   static final private String CLASS_NAME = Driver.class.getName();
   private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   static final private LogHelper console = new LogHelper(LOG);
@@ -2248,9 +2250,9 @@ public class Driver implements CommandProcessor {
     }
     if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
       if (noName) {
-        conf.set(MRJobConfig.JOB_NAME, jobname + "(" + tsk.getId() + ")");
+        conf.set(MRJobConfig.JOB_NAME, jobname + " (" + tsk.getId() + ")");
       }
-      conf.set("mapreduce.workflow.node.name", tsk.getId());
+      conf.set(MAPREDUCE_WORKFLOW_NODE_NAME, tsk.getId());
       Utilities.setWorkflowAdjacencies(conf, plan);
       cxt.incCurJobNo(1);
       console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + 
jobs);

http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java
new file mode 100644
index 0000000..aed1b2c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+
+
+public class DagUtils {
+
+  public static String getQueryName(Configuration conf) {
+    String name = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYNAME);
+    if (Strings.isNullOrEmpty(name)) {
+      return conf.get(MRJobConfig.JOB_NAME);
+    } else {
+      return name + " (" + conf.get(Driver.MAPREDUCE_WORKFLOW_NODE_NAME) + ")";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 597fcab..4fcb9bd 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -63,8 +63,9 @@ public class HiveSparkClientFactory {
   @VisibleForTesting
   public static final String SPARK_CLONE_CONFIGURATION = 
"spark.hadoop.cloneConf";
 
-  public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) 
throws Exception {
-    Map<String, String> sparkConf = initiateSparkConf(hiveconf);
+  public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, 
String sessionId) throws Exception {
+    Map<String, String> sparkConf = initiateSparkConf(hiveconf, sessionId);
+
     // Submit spark job through local spark context while spark master is 
local mode, otherwise submit
     // spark job through remote spark context.
     String master = sparkConf.get("spark.master");
@@ -76,7 +77,7 @@ public class HiveSparkClientFactory {
     }
   }
 
-  public static Map<String, String> initiateSparkConf(HiveConf hiveConf) {
+  public static Map<String, String> initiateSparkConf(HiveConf hiveConf, 
String sessionId) {
     Map<String, String> sparkConf = new HashMap<String, String>();
     HBaseConfiguration.addHbaseResources(hiveConf);
 
@@ -84,8 +85,15 @@ public class HiveSparkClientFactory {
     sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
     final String appNameKey = "spark.app.name";
     String appName = hiveConf.get(appNameKey);
+    final String sessionIdString = " (sessionId = " + sessionId + ")";
     if (appName == null) {
-      appName = SPARK_DEFAULT_APP_NAME;
+      if (sessionId == null) {
+        appName = SPARK_DEFAULT_APP_NAME;
+      } else {
+        appName = SPARK_DEFAULT_APP_NAME + sessionIdString;
+      }
+    } else {
+      appName = appName + sessionIdString;
     }
     sparkConf.put(appNameKey, appName);
     sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER);

http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 102e41b..9b38c1a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.conf.HiveConfUtil;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.DagUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef;
@@ -350,6 +351,8 @@ public class RemoteHiveSparkClient implements 
HiveSparkClient {
         new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, 
sparkReporter);
       SparkPlan plan = gen.generate(localSparkWork);
 
+      jc.sc().setJobGroup("queryId = " + localSparkWork.getQueryId(), 
DagUtils.getQueryName(localJobConf));
+
       // Execute generated plan.
       JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
       // We use Spark RDD async action to submit job as it's the only way to 
get jobId now.

http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index 8d79dd9..ba61868 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -61,7 +61,7 @@ public class SparkSessionImpl implements SparkSession {
     this.conf = conf;
     isOpen = true;
     try {
-      hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf);
+      hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, 
sessionId);
     } catch (Throwable e) {
       // It's possible that user session is closed while creating Spark client.
       String msg = isOpen ? "Failed to create Spark client for Spark session " 
+ sessionId :

http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
index 3c2f0e6..7722a0a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
@@ -79,7 +79,7 @@ public class SparkSessionManagerImpl implements 
SparkSessionManager {
       synchronized (this) {
         if (!inited) {
           LOG.info("Setting up the session manager.");
-          Map<String, String> conf = 
HiveSparkClientFactory.initiateSparkConf(hiveConf);
+          Map<String, String> conf = 
HiveSparkClientFactory.initiateSparkConf(hiveConf, null);
           try {
             SparkClientFactory.initialize(conf);
             inited = true;

http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
index fda7080..8332272 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
@@ -50,7 +50,8 @@ import com.google.common.base.Preconditions;
 public class SparkWork extends AbstractOperatorDesc {
 
   private static final AtomicInteger counter = new AtomicInteger(1);
-  private final String name;
+  private final String dagName;
+  private final String queryId;
 
   private final Set<BaseWork> roots = new LinkedHashSet<BaseWork>();
   private final Set<BaseWork> leaves = new LinkedHashSet<>();
@@ -66,15 +67,19 @@ public class SparkWork extends AbstractOperatorDesc {
 
   private Map<BaseWork, BaseWork> cloneToWork;
 
-  public SparkWork(String name) {
-    this.name = name + ":" + counter.getAndIncrement();
+  public SparkWork(String queryId) {
+    this.queryId = queryId;
+    this.dagName = queryId + ":" + counter.getAndIncrement();
     cloneToWork = new HashMap<BaseWork, BaseWork>();
   }
 
-
   @Explain(displayName = "DagName")
   public String getName() {
-    return name;
+    return this.dagName;
+  }
+
+  public String getQueryId() {
+    return this.queryId;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/test/queries/clientpositive/parallel.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/parallel.q 
b/ql/src/test/queries/clientpositive/parallel.q
index b8c0445..f2f0d35 100644
--- a/ql/src/test/queries/clientpositive/parallel.q
+++ b/ql/src/test/queries/clientpositive/parallel.q
@@ -1,5 +1,5 @@
 set hive.explain.user=false;
-set mapred.job.name='test_parallel';
+set hive.query.name='test_parallel';
 set hive.exec.parallel=true;
 set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
 

Reply via email to