Author: xuefu
Date: Mon Jan 26 19:05:57 2015
New Revision: 1654873

URL: http://svn.apache.org/r1654873
Log:
HIVE-9449: Push YARN configuration to Spark while deply Spark on YARN[Spark 
Branch] (Chengxiang via Xuefu)

Modified:
    
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java

Modified: 
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1654873&r1=1654872&r2=1654873&view=diff
==============================================================================
--- 
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
(original)
+++ 
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
Mon Jan 26 19:05:57 2015
@@ -2258,10 +2258,33 @@ public class HiveConf extends Configurat
       throw new IllegalArgumentException("Cannot modify " + name + " at 
runtime. It is in the list"
           + "of parameters that can't be modified at runtime");
     }
-    isSparkConfigUpdated = name.startsWith("spark");
+    isSparkConfigUpdated = isSparkRelatedConfig(name);
     set(name, value);
   }
 
+  /**
+   * check whether spark related property is updated, which includes spark 
configurations,
+   * RSC configurations and yarn configuration in Spark on YARN mode.
+   * @param name
+   * @return
+   */
+  private boolean isSparkRelatedConfig(String name) {
+    boolean result = false;
+    if (name.startsWith("spark")) { // Spark property.
+      result = true;
+    } else if (name.startsWith("yarn")) { // YARN property in Spark on YARN 
mode.
+      String sparkMaster = get("spark.master");
+      if (sparkMaster != null &&
+        (sparkMaster.equals("yarn-client") || 
sparkMaster.equals("yarn-cluster"))) {
+        result = true;
+      }
+    } else if (name.startsWith("hive.spark")) { // Remote Spark Context 
property.
+      result = true;
+    }
+
+    return result;
+  }
+
   public static int getIntVar(Configuration conf, ConfVars var) {
     assert (var.valClass == Integer.class) : var.varname;
     return conf.getInt(var.varname, var.defaultIntVal);

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1654873&r1=1654872&r2=1654873&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
 Mon Jan 26 19:05:57 2015
@@ -42,6 +42,7 @@ public class HiveSparkClientFactory {
   private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
   private static final String SPARK_DEFAULT_MASTER = "local";
   private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
+  private static final String SPARK_DEFAULT_SERIALIZER = 
"org.apache.spark.serializer.KryoSerializer";
 
   public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf)
     throws IOException, SparkException {
@@ -64,8 +65,7 @@ public class HiveSparkClientFactory {
     // set default spark configurations.
     sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
     sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME);
-    sparkConf.put("spark.serializer",
-      "org.apache.spark.serializer.KryoSerializer");
+    sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER);
 
     // load properties from spark-defaults.conf.
     InputStream inputStream = null;
@@ -81,7 +81,7 @@ public class HiveSparkClientFactory {
             String value = properties.getProperty(propertyName);
             sparkConf.put(propertyName, properties.getProperty(propertyName));
             LOG.info(String.format(
-              "load spark configuration from %s (%s -> %s).",
+              "load spark property from %s (%s -> %s).",
               SPARK_DEFAULT_CONF_FILE, propertyName, value));
           }
         }
@@ -99,22 +99,36 @@ public class HiveSparkClientFactory {
       }
     }
 
-    // load properties from hive configurations, including both spark.* 
properties
-    // and properties for remote driver RPC.
+    // load properties from hive configurations, including both spark.* 
properties,
+    // properties for remote driver RPC, and yarn properties for Spark on YARN 
mode.
+    String sparkMaster = hiveConf.get("spark.master");
+    if (sparkMaster == null) {
+      sparkMaster = sparkConf.get("spark.master");
+    }
     for (Map.Entry<String, String> entry : hiveConf) {
       String propertyName = entry.getKey();
       if (propertyName.startsWith("spark")) {
         String value = hiveConf.get(propertyName);
         sparkConf.put(propertyName, value);
         LOG.info(String.format(
-          "load spark configuration from hive configuration (%s -> %s).",
+          "load spark property from hive configuration (%s -> %s).",
           propertyName, value));
+      } else if (propertyName.startsWith("yarn") &&
+        (sparkMaster.equals("yarn-client") || 
sparkMaster.equals("yarn-cluster"))) {
+        String value = hiveConf.get(propertyName);
+        // Add spark.hadoop prefix for yarn properties as SparkConf only 
accept properties
+        // started with spark prefix, Spark would remove spark.hadoop prefix 
lately and add
+        // it to its hadoop configuration.
+        sparkConf.put("spark.hadoop." + propertyName, value);
+        LOG.info(String.format(
+          "load yarn property from hive configuration in %s mode (%s -> %s).",
+          sparkMaster, propertyName, value));
       }
       if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) {
         String value = RpcConfiguration.getValue(hiveConf, propertyName);
         sparkConf.put(propertyName, value);
         LOG.info(String.format(
-          "load RPC configuration from hive configuration (%s -> %s).",
+          "load RPC property from hive configuration (%s -> %s).",
           propertyName, value));
       }
     }


Reply via email to