Repository: hive Updated Branches: refs/heads/branch-1 e934eba13 -> 7605ba9aa
HIVE-11434: Followup for HIVE-10166: reuse existing configurations for prewarming Spark executors (reviewed by Chao) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7605ba9a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7605ba9a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7605ba9a Branch: refs/heads/branch-1 Commit: 7605ba9aaafbb8ebda832636b577336083012687 Parents: e934eba Author: Xuefu Zhang <xzh...@cloudera.com> Authored: Mon Aug 3 19:14:20 2015 -0700 Committer: Xuefu Zhang <xzh...@cloudera.com> Committed: Mon Aug 3 19:14:48 2015 -0700 ---------------------------------------------------------------------- .../src/java/org/apache/hadoop/hive/conf/HiveConf.java | 9 ++------- .../hive/ql/exec/spark/RemoteHiveSparkClient.java | 13 ++++++------- .../org/apache/hive/spark/client/SparkClientImpl.java | 1 - 3 files changed, 8 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7605ba9a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3544142..6f66772 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2107,8 +2107,8 @@ public class HiveConf extends Configuration { HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true, "Whether to generate the splits locally or in the AM (tez only)"), - HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez (Hadoop 2 only)"), - HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez (Hadoop 2 only)"), + HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez/Spark (Hadoop 2 only)"), + HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez/Spark (Hadoop 2 only)"), HIVESTAGEIDREARRANGE("hive.stageid.rearrange", "none", new StringSet("none", "idonly", "traverse", "execution"), ""), HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES("hive.explain.dependency.append.tasktype", false, ""), @@ -2218,11 +2218,6 @@ public class HiveConf extends Configuration { SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE( "hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L, "Maximum total data size in dynamic pruning."), - SPARK_PREWARM_CONTAINERS("hive.spark.prewarm.containers", false, "Whether to prewarn containers for Spark." + - "If enabled, Hive will spend no more than 60 seconds to wait for the containers to come up " + - "before any query can be executed."), - SPARK_PREWARM_NUM_CONTAINERS("hive.spark.prewarm.num.containers", 10, "The minimum number of containers to be prewarmed for Spark." + - "Applicable only if hive.spark.prewarm.containers is set to true."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null), http://git-wip-us.apache.org/repos/asf/hive/blob/7605ba9a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 92167e4..7d43160 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -71,9 +71,8 @@ public class RemoteHiveSparkClient implements HiveSparkClient { private static final long serialVersionUID = 1L; private static final String MR_JAR_PROPERTY = "tmpjars"; - protected static final transient Log LOG = LogFactory - .getLog(RemoteHiveSparkClient.class); - + private static final transient Log LOG = LogFactory.getLog(RemoteHiveSparkClient.class); + private static final long MAX_PREWARM_TIME = 30000; // 30s private static final transient Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings(); private transient SparkClient remoteClient; @@ -92,7 +91,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient { sparkConf = HiveSparkClientFactory.generateSparkConf(conf); remoteClient = SparkClientFactory.createClient(conf, hiveConf); - if (HiveConf.getBoolVar(hiveConf, ConfVars.SPARK_PREWARM_CONTAINERS) && + if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) && hiveConf.get("spark.master").startsWith("yarn-")) { int minExecutors = getExecutorsToWarm(); if (minExecutors <= 0) { @@ -101,7 +100,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient { LOG.info("Prewarm Spark executors. The minimum number of executors to warm is " + minExecutors); - // Spend at most 60s to wait for executors to come up. + // Spend at most MAX_PREWARM_TIME to wait for executors to come up. int curExecutors = 0; long ts = System.currentTimeMillis(); do { @@ -111,7 +110,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient { return; } Thread.sleep(1000); // sleep 1 second - } while (System.currentTimeMillis() - ts < 60000); + } while (System.currentTimeMillis() - ts < MAX_PREWARM_TIME); LOG.info("Timeout (60s) occurred while prewarming executors. The current number of executors is " + curExecutors); } @@ -124,7 +123,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient { */ private int getExecutorsToWarm() { int minExecutors = - HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.SPARK_PREWARM_NUM_CONTAINERS); + HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.HIVE_PREWARM_NUM_CONTAINERS); boolean dynamicAllocation = hiveConf.getBoolean("spark.dynamicAllocation.enabled", false); if (dynamicAllocation) { int min = sparkConf.getInt("spark.dynamicAllocation.minExecutors", 0); http://git-wip-us.apache.org/repos/asf/hive/blob/7605ba9a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 60baa31..e1e64a7 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.spark.client.rpc.Rpc; import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.hive.spark.client.rpc.RpcServer;