Repository: hive Updated Branches: refs/heads/master 152ee221e -> a284df1f8
HIVE-16601: Display Session Id and Query Name / Id in Spark UI (Sahil Takiar, reviewed by Barna Zsombor Klara, Peter Vary, Xuefu Zhang) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a284df1f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a284df1f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a284df1f Branch: refs/heads/master Commit: a284df1f87eccee8bdad04afea2150e6c07337a0 Parents: 152ee22 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Tue Oct 24 08:27:24 2017 -0700 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Tue Oct 24 08:27:24 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 +- .../java/org/apache/hadoop/hive/ql/Driver.java | 6 ++- .../apache/hadoop/hive/ql/exec/DagUtils.java | 39 ++++++++++++++++++++ .../ql/exec/spark/HiveSparkClientFactory.java | 16 ++++++-- .../ql/exec/spark/RemoteHiveSparkClient.java | 3 ++ .../ql/exec/spark/session/SparkSessionImpl.java | 2 +- .../spark/session/SparkSessionManagerImpl.java | 2 +- .../apache/hadoop/hive/ql/plan/SparkWork.java | 15 +++++--- ql/src/test/queries/clientpositive/parallel.q | 2 +- 9 files changed, 73 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a6ecb37..62dcbd5 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1510,7 +1510,8 @@ public class HiveConf extends Configuration { "Whether to grant access to the hs2/hive user for queries"), HIVEQUERYNAME ("hive.query.name", null, "This named is used by Tez to set the dag name. This name in turn will appear on \n" + - "the Tez UI representing the work that was done."), + "the Tez UI representing the work that was done. Used by Spark to set the query name, will show up in the\n" + + "Spark UI."), HIVEOPTIMIZEBUCKETINGSORTING("hive.optimize.bucketingsorting", true, "Don't create a reducer for enforcing \n" + http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index b7f76ab..6c6ad92 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -132,6 +132,8 @@ import com.google.common.collect.Sets; public class Driver implements CommandProcessor { + public static final String MAPREDUCE_WORKFLOW_NODE_NAME = "mapreduce.workflow.node.name"; + static final private String CLASS_NAME = Driver.class.getName(); private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private LogHelper console = new LogHelper(LOG); @@ -2248,9 +2250,9 @@ public class Driver implements CommandProcessor { } if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) { if (noName) { - conf.set(MRJobConfig.JOB_NAME, jobname + "(" + tsk.getId() + ")"); + conf.set(MRJobConfig.JOB_NAME, jobname + " (" + tsk.getId() + ")"); } - conf.set("mapreduce.workflow.node.name", tsk.getId()); + conf.set(MAPREDUCE_WORKFLOW_NODE_NAME, tsk.getId()); Utilities.setWorkflowAdjacencies(conf, plan); cxt.incCurJobNo(1); console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs); http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java new file mode 100644 index 0000000..aed1b2c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import com.google.common.base.Strings; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.mapreduce.MRJobConfig; + + +public class DagUtils { + + public static String getQueryName(Configuration conf) { + String name = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYNAME); + if (Strings.isNullOrEmpty(name)) { + return conf.get(MRJobConfig.JOB_NAME); + } else { + return name + " (" + conf.get(Driver.MAPREDUCE_WORKFLOW_NODE_NAME) + ")"; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 597fcab..4fcb9bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -63,8 +63,9 @@ public class HiveSparkClientFactory { @VisibleForTesting public static final String SPARK_CLONE_CONFIGURATION = "spark.hadoop.cloneConf"; - public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception { - Map<String, String> sparkConf = initiateSparkConf(hiveconf); + public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sessionId) throws Exception { + Map<String, String> sparkConf = initiateSparkConf(hiveconf, sessionId); + // Submit spark job through local spark context while spark master is local mode, otherwise submit // spark job through remote spark context. String master = sparkConf.get("spark.master"); @@ -76,7 +77,7 @@ public class HiveSparkClientFactory { } } - public static Map<String, String> initiateSparkConf(HiveConf hiveConf) { + public static Map<String, String> initiateSparkConf(HiveConf hiveConf, String sessionId) { Map<String, String> sparkConf = new HashMap<String, String>(); HBaseConfiguration.addHbaseResources(hiveConf); @@ -84,8 +85,15 @@ public class HiveSparkClientFactory { sparkConf.put("spark.master", SPARK_DEFAULT_MASTER); final String appNameKey = "spark.app.name"; String appName = hiveConf.get(appNameKey); + final String sessionIdString = " (sessionId = " + sessionId + ")"; if (appName == null) { - appName = SPARK_DEFAULT_APP_NAME; + if (sessionId == null) { + appName = SPARK_DEFAULT_APP_NAME; + } else { + appName = SPARK_DEFAULT_APP_NAME + sessionIdString; + } + } else { + appName = appName + sessionIdString; } sparkConf.put(appNameKey, appName); sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER); http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 102e41b..9b38c1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef; @@ -350,6 +351,8 @@ public class RemoteHiveSparkClient implements HiveSparkClient { new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, sparkReporter); SparkPlan plan = gen.generate(localSparkWork); + jc.sc().setJobGroup("queryId = " + localSparkWork.getQueryId(), DagUtils.getQueryName(localJobConf)); + // Execute generated plan. JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 8d79dd9..ba61868 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -61,7 +61,7 @@ public class SparkSessionImpl implements SparkSession { this.conf = conf; isOpen = true; try { - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf); + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId); } catch (Throwable e) { // It's possible that user session is closed while creating Spark client. String msg = isOpen ? "Failed to create Spark client for Spark session " + sessionId : http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index 3c2f0e6..7722a0a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -79,7 +79,7 @@ public class SparkSessionManagerImpl implements SparkSessionManager { synchronized (this) { if (!inited) { LOG.info("Setting up the session manager."); - Map<String, String> conf = HiveSparkClientFactory.initiateSparkConf(hiveConf); + Map<String, String> conf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null); try { SparkClientFactory.initialize(conf); inited = true; http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java index fda7080..8332272 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java @@ -50,7 +50,8 @@ import com.google.common.base.Preconditions; public class SparkWork extends AbstractOperatorDesc { private static final AtomicInteger counter = new AtomicInteger(1); - private final String name; + private final String dagName; + private final String queryId; private final Set<BaseWork> roots = new LinkedHashSet<BaseWork>(); private final Set<BaseWork> leaves = new LinkedHashSet<>(); @@ -66,15 +67,19 @@ public class SparkWork extends AbstractOperatorDesc { private Map<BaseWork, BaseWork> cloneToWork; - public SparkWork(String name) { - this.name = name + ":" + counter.getAndIncrement(); + public SparkWork(String queryId) { + this.queryId = queryId; + this.dagName = queryId + ":" + counter.getAndIncrement(); cloneToWork = new HashMap<BaseWork, BaseWork>(); } - @Explain(displayName = "DagName") public String getName() { - return name; + return this.dagName; + } + + public String getQueryId() { + return this.queryId; } /** http://git-wip-us.apache.org/repos/asf/hive/blob/a284df1f/ql/src/test/queries/clientpositive/parallel.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/parallel.q b/ql/src/test/queries/clientpositive/parallel.q index b8c0445..f2f0d35 100644 --- a/ql/src/test/queries/clientpositive/parallel.q +++ b/ql/src/test/queries/clientpositive/parallel.q @@ -1,5 +1,5 @@ set hive.explain.user=false; -set mapred.job.name='test_parallel'; +set hive.query.name='test_parallel'; set hive.exec.parallel=true; set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;