Repository: spark
Updated Branches:
  refs/heads/master 22036aeb1 -> acc01ab32


SPARK-2038: rename "conf" parameters in the saveAsHadoop functions with 
source-compatibility

https://issues.apache.org/jira/browse/SPARK-2038

to differentiate with SparkConf object and at the same time keep the source 
level compatibility

Author: CodingCat <zhunans...@gmail.com>

Closes #1137 from CodingCat/SPARK-2038 and squashes the following commits:

11abeba [CodingCat] revise the comments
7ee5712 [CodingCat] to keep the source-compatibility
763975f [CodingCat] style fix
d91288d [CodingCat] rename "conf" parameters in the saveAsHadoop functions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acc01ab3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acc01ab3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acc01ab3

Branch: refs/heads/master
Commit: acc01ab3265c317f36a4fca28d3b9d72b0096c12
Parents: 22036ae
Author: CodingCat <zhunans...@gmail.com>
Authored: Wed Jun 25 00:23:32 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Wed Jun 25 00:23:32 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 49 ++++++++++++--------
 1 file changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/acc01ab3/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 2b2b9ae..fc9beb1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -762,7 +762,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
       conf: Configuration = self.context.hadoopConfiguration)
   {
-    val job = new NewAPIHadoopJob(conf)
+    // Rename this as hadoopConf internally to avoid shadowing (see 
SPARK-2038).
+    val hadoopConf = conf
+    val job = new NewAPIHadoopJob(hadoopConf)
     job.setOutputKeyClass(keyClass)
     job.setOutputValueClass(valueClass)
     job.setOutputFormatClass(outputFormatClass)
@@ -795,22 +797,25 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
       conf: JobConf = new JobConf(self.context.hadoopConfiguration),
       codec: Option[Class[_ <: CompressionCodec]] = None) {
-    conf.setOutputKeyClass(keyClass)
-    conf.setOutputValueClass(valueClass)
+    // Rename this as hadoopConf internally to avoid shadowing (see 
SPARK-2038).
+    val hadoopConf = conf
+    hadoopConf.setOutputKeyClass(keyClass)
+    hadoopConf.setOutputValueClass(valueClass)
     // Doesn't work in Scala 2.9 due to what may be a generics bug
     // TODO: Should we uncomment this for Scala 2.10?
     // conf.setOutputFormat(outputFormatClass)
-    conf.set("mapred.output.format.class", outputFormatClass.getName)
+    hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
     for (c <- codec) {
-      conf.setCompressMapOutput(true)
-      conf.set("mapred.output.compress", "true")
-      conf.setMapOutputCompressorClass(c)
-      conf.set("mapred.output.compression.codec", c.getCanonicalName)
-      conf.set("mapred.output.compression.type", 
CompressionType.BLOCK.toString)
+      hadoopConf.setCompressMapOutput(true)
+      hadoopConf.set("mapred.output.compress", "true")
+      hadoopConf.setMapOutputCompressorClass(c)
+      hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
+      hadoopConf.set("mapred.output.compression.type", 
CompressionType.BLOCK.toString)
     }
-    conf.setOutputCommitter(classOf[FileOutputCommitter])
-    FileOutputFormat.setOutputPath(conf, 
SparkHadoopWriter.createPathFromString(path, conf))
-    saveAsHadoopDataset(conf)
+    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
+    FileOutputFormat.setOutputPath(hadoopConf,
+      SparkHadoopWriter.createPathFromString(path, hadoopConf))
+    saveAsHadoopDataset(hadoopConf)
   }
 
   /**
@@ -820,7 +825,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * configured for a Hadoop MapReduce job.
    */
   def saveAsNewAPIHadoopDataset(conf: Configuration) {
-    val job = new NewAPIHadoopJob(conf)
+    // Rename this as hadoopConf internally to avoid shadowing (see 
SPARK-2038).
+    val hadoopConf = conf
+    val job = new NewAPIHadoopJob(hadoopConf)
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     val jobtrackerID = formatter.format(new Date())
     val stageId = self.id
@@ -877,9 +884,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * MapReduce job.
    */
   def saveAsHadoopDataset(conf: JobConf) {
-    val outputFormatInstance = conf.getOutputFormat
-    val keyClass = conf.getOutputKeyClass
-    val valueClass = conf.getOutputValueClass
+    // Rename this as hadoopConf internally to avoid shadowing (see 
SPARK-2038).
+    val hadoopConf = conf
+    val outputFormatInstance = hadoopConf.getOutputFormat
+    val keyClass = hadoopConf.getOutputKeyClass
+    val valueClass = hadoopConf.getOutputValueClass
     if (outputFormatInstance == null) {
       throw new SparkException("Output format class not set")
     }
@@ -889,18 +898,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     if (valueClass == null) {
       throw new SparkException("Output value class not set")
     }
-    SparkHadoopUtil.get.addCredentials(conf)
+    SparkHadoopUtil.get.addCredentials(hadoopConf)
 
     logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " 
+
       valueClass.getSimpleName + ")")
 
     if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
       // FileOutputFormat ignores the filesystem parameter
-      val ignoredFs = FileSystem.get(conf)
-      conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
+      val ignoredFs = FileSystem.get(hadoopConf)
+      hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
     }
 
-    val writer = new SparkHadoopWriter(conf)
+    val writer = new SparkHadoopWriter(hadoopConf)
     writer.preSetup()
 
     def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {

Reply via email to