Author: xuefu Date: Mon Jan 26 19:05:57 2015 New Revision: 1654873 URL: http://svn.apache.org/r1654873 Log: HIVE-9449: Push YARN configuration to Spark while deply Spark on YARN[Spark Branch] (Chengxiang via Xuefu)
Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1654873&r1=1654872&r2=1654873&view=diff ============================================================================== --- hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Jan 26 19:05:57 2015 @@ -2258,10 +2258,33 @@ public class HiveConf extends Configurat throw new IllegalArgumentException("Cannot modify " + name + " at runtime. It is in the list" + "of parameters that can't be modified at runtime"); } - isSparkConfigUpdated = name.startsWith("spark"); + isSparkConfigUpdated = isSparkRelatedConfig(name); set(name, value); } + /** + * check whether spark related property is updated, which includes spark configurations, + * RSC configurations and yarn configuration in Spark on YARN mode. + * @param name + * @return + */ + private boolean isSparkRelatedConfig(String name) { + boolean result = false; + if (name.startsWith("spark")) { // Spark property. + result = true; + } else if (name.startsWith("yarn")) { // YARN property in Spark on YARN mode. + String sparkMaster = get("spark.master"); + if (sparkMaster != null && + (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) { + result = true; + } + } else if (name.startsWith("hive.spark")) { // Remote Spark Context property. + result = true; + } + + return result; + } + public static int getIntVar(Configuration conf, ConfVars var) { assert (var.valClass == Integer.class) : var.varname; return conf.getInt(var.varname, var.defaultIntVal); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1654873&r1=1654872&r2=1654873&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Mon Jan 26 19:05:57 2015 @@ -42,6 +42,7 @@ public class HiveSparkClientFactory { private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf"; private static final String SPARK_DEFAULT_MASTER = "local"; private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark"; + private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer"; public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws IOException, SparkException { @@ -64,8 +65,7 @@ public class HiveSparkClientFactory { // set default spark configurations. sparkConf.put("spark.master", SPARK_DEFAULT_MASTER); sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME); - sparkConf.put("spark.serializer", - "org.apache.spark.serializer.KryoSerializer"); + sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER); // load properties from spark-defaults.conf. InputStream inputStream = null; @@ -81,7 +81,7 @@ public class HiveSparkClientFactory { String value = properties.getProperty(propertyName); sparkConf.put(propertyName, properties.getProperty(propertyName)); LOG.info(String.format( - "load spark configuration from %s (%s -> %s).", + "load spark property from %s (%s -> %s).", SPARK_DEFAULT_CONF_FILE, propertyName, value)); } } @@ -99,22 +99,36 @@ public class HiveSparkClientFactory { } } - // load properties from hive configurations, including both spark.* properties - // and properties for remote driver RPC. + // load properties from hive configurations, including both spark.* properties, + // properties for remote driver RPC, and yarn properties for Spark on YARN mode. + String sparkMaster = hiveConf.get("spark.master"); + if (sparkMaster == null) { + sparkMaster = sparkConf.get("spark.master"); + } for (Map.Entry<String, String> entry : hiveConf) { String propertyName = entry.getKey(); if (propertyName.startsWith("spark")) { String value = hiveConf.get(propertyName); sparkConf.put(propertyName, value); LOG.info(String.format( - "load spark configuration from hive configuration (%s -> %s).", + "load spark property from hive configuration (%s -> %s).", propertyName, value)); + } else if (propertyName.startsWith("yarn") && + (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) { + String value = hiveConf.get(propertyName); + // Add spark.hadoop prefix for yarn properties as SparkConf only accept properties + // started with spark prefix, Spark would remove spark.hadoop prefix lately and add + // it to its hadoop configuration. + sparkConf.put("spark.hadoop." + propertyName, value); + LOG.info(String.format( + "load yarn property from hive configuration in %s mode (%s -> %s).", + sparkMaster, propertyName, value)); } if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) { String value = RpcConfiguration.getValue(hiveConf, propertyName); sparkConf.put(propertyName, value); LOG.info(String.format( - "load RPC configuration from hive configuration (%s -> %s).", + "load RPC property from hive configuration (%s -> %s).", propertyName, value)); } }