Repository: spark
Updated Branches:
  refs/heads/master 842d00032 -> 830934976


SPARK-5500. Document that feeding hadoopFile into a shuffle operation wi...

...ll cause problems

Author: Sandy Ryza <sa...@cloudera.com>

Closes #4293 from sryza/sandy-spark-5500 and squashes the following commits:

e9ce742 [Sandy Ryza] Change to warning
cc46e52 [Sandy Ryza] Add instructions and extend to NewHadoopRDD
6e1932a [Sandy Ryza] Throw exception on cache
0f6c4eb [Sandy Ryza] SPARK-5500. Document that feeding hadoopFile into a 
shuffle operation will cause problems


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83093497
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83093497
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83093497

Branch: refs/heads/master
Commit: 830934976e8cf9e894bd3e5758fb941cad5d2f0b
Parents: 842d000
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Mon Feb 2 14:52:46 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Feb 2 14:52:46 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 69 +++++++++++---------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 12 +++-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 17 +++--
 3 files changed, 62 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/83093497/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3c61c10..228076f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -687,9 +687,10 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
    * @param minPartitions Minimum number of Hadoop Splits to generate.
    *
    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable 
object for each
-   * record, directly caching the returned RDD will create many references to 
the same object.
-   * If you plan to directly cache Hadoop writable objects, you should first 
copy them using
-   * a `map` function.
+   * record, directly caching the returned RDD or directly passing it to an 
aggregation or shuffle
+   * operation will create many references to the same object.
+   * If you plan to directly cache, sort, or aggregate Hadoop writable 
objects, you should first
+   * copy them using a `map` function.
    */
   def hadoopRDD[K, V](
       conf: JobConf,
@@ -705,12 +706,13 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   }
 
   /** Get an RDD for a Hadoop file with an arbitrary InputFormat
-    *
-    * '''Note:''' Because Hadoop's RecordReader class re-uses the same 
Writable object for each
-    * record, directly caching the returned RDD will create many references to 
the same object.
-    * If you plan to directly cache Hadoop writable objects, you should first 
copy them using
-    * a `map` function.
-    * */
+   *
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable 
object for each
+   * record, directly caching the returned RDD or directly passing it to an 
aggregation or shuffle
+   * operation will create many references to the same object.
+   * If you plan to directly cache, sort, or aggregate Hadoop writable 
objects, you should first
+   * copy them using a `map` function.
+   */
   def hadoopFile[K, V](
       path: String,
       inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -741,9 +743,10 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
    * }}}
    *
    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable 
object for each
-   * record, directly caching the returned RDD will create many references to 
the same object.
-   * If you plan to directly cache Hadoop writable objects, you should first 
copy them using
-   * a `map` function.
+   * record, directly caching the returned RDD or directly passing it to an 
aggregation or shuffle
+   * operation will create many references to the same object.
+   * If you plan to directly cache, sort, or aggregate Hadoop writable 
objects, you should first
+   * copy them using a `map` function.
    */
   def hadoopFile[K, V, F <: InputFormat[K, V]]
       (path: String, minPartitions: Int)
@@ -764,9 +767,10 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
    * }}}
    *
    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable 
object for each
-   * record, directly caching the returned RDD will create many references to 
the same object.
-   * If you plan to directly cache Hadoop writable objects, you should first 
copy them using
-   * a `map` function.
+   * record, directly caching the returned RDD or directly passing it to an 
aggregation or shuffle
+   * operation will create many references to the same object.
+   * If you plan to directly cache, sort, or aggregate Hadoop writable 
objects, you should first
+   * copy them using a `map` function.
    */
   def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
       (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, 
V)] =
@@ -788,9 +792,10 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
    * and extra configuration options to pass to the input format.
    *
    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable 
object for each
-   * record, directly caching the returned RDD will create many references to 
the same object.
-   * If you plan to directly cache Hadoop writable objects, you should first 
copy them using
-   * a `map` function.
+   * record, directly caching the returned RDD or directly passing it to an 
aggregation or shuffle
+   * operation will create many references to the same object.
+   * If you plan to directly cache, sort, or aggregate Hadoop writable 
objects, you should first
+   * copy them using a `map` function.
    */
   def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
       path: String,
@@ -810,9 +815,10 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
    * and extra configuration options to pass to the input format.
    *
    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable 
object for each
-   * record, directly caching the returned RDD will create many references to 
the same object.
-   * If you plan to directly cache Hadoop writable objects, you should first 
copy them using
-   * a `map` function.
+   * record, directly caching the returned RDD or directly passing it to an 
aggregation or shuffle
+   * operation will create many references to the same object.
+   * If you plan to directly cache, sort, or aggregate Hadoop writable 
objects, you should first
+   * copy them using a `map` function.
    */
   def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
       conf: Configuration = hadoopConfiguration,
@@ -826,9 +832,10 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   /** Get an RDD for a Hadoop SequenceFile with given key and value types.
     *
     * '''Note:''' Because Hadoop's RecordReader class re-uses the same 
Writable object for each
-    * record, directly caching the returned RDD will create many references to 
the same object.
-    * If you plan to directly cache Hadoop writable objects, you should first 
copy them using
-    * a `map` function.
+    * record, directly caching the returned RDD or directly passing it to an 
aggregation or shuffle
+    * operation will create many references to the same object.
+    * If you plan to directly cache, sort, or aggregate Hadoop writable 
objects, you should first
+    * copy them using a `map` function.
     */
   def sequenceFile[K, V](path: String,
       keyClass: Class[K],
@@ -843,9 +850,10 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   /** Get an RDD for a Hadoop SequenceFile with given key and value types.
     *
     * '''Note:''' Because Hadoop's RecordReader class re-uses the same 
Writable object for each
-    * record, directly caching the returned RDD will create many references to 
the same object.
-    * If you plan to directly cache Hadoop writable objects, you should first 
copy them using
-    * a `map` function.
+    * record, directly caching the returned RDD or directly passing it to an 
aggregation or shuffle
+    * operation will create many references to the same object.
+    * If you plan to directly cache, sort, or aggregate Hadoop writable 
objects, you should first
+    * copy them using a `map` function.
     * */
   def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: 
Class[V]): RDD[(K, V)] = {
     assertNotStopped()
@@ -869,9 +877,10 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
    * allow it to figure out the Writable class to use in the subclass case.
    *
    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable 
object for each
-   * record, directly caching the returned RDD will create many references to 
the same object.
-   * If you plan to directly cache Hadoop writable objects, you should first 
copy them using
-   * a `map` function.
+   * record, directly caching the returned RDD or directly passing it to an 
aggregation or shuffle
+   * operation will create many references to the same object.
+   * If you plan to directly cache, sort, or aggregate Hadoop writable 
objects, you should first
+   * copy them using a `map` function.
    */
    def sequenceFile[K, V]
        (path: String, minPartitions: Int = defaultMinPartitions)

http://git-wip-us.apache.org/repos/asf/spark/blob/83093497/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index c3e3931..89adddc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -42,10 +42,11 @@ import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.executor.{DataReadMethod, InputMetrics}
+import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
 import org.apache.spark.util.{NextIterator, Utils}
 import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
+import org.apache.spark.storage.StorageLevel
 
 /**
  * A Spark split class that wraps around a Hadoop InputSplit.
@@ -308,6 +309,15 @@ class HadoopRDD[K, V](
     // Do nothing. Hadoop RDD should not be checkpointed.
   }
 
+  override def persist(storageLevel: StorageLevel): this.type = {
+    if (storageLevel.deserialized) {
+      logWarning("Caching NewHadoopRDDs as deserialized objects usually leads 
to undesired" +
+        " behavior because Hadoop's RecordReader reuses the same Writable 
object for all records." +
+        " Use a map transformation to make copies of the records.")
+    }
+    super.persist(storageLevel)
+  }
+
   def getConf: Configuration = getJobConf()
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/83093497/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index d86f95a..44b9ffd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -29,16 +29,13 @@ import 
org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.input.WholeTextFileInputFormat
-import org.apache.spark.InterruptibleIterator
-import org.apache.spark.Logging
-import org.apache.spark.Partition
-import org.apache.spark.SerializableWritable
-import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark._
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
 import org.apache.spark.util.Utils
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.storage.StorageLevel
 
 private[spark] class NewHadoopPartition(
     rddId: Int,
@@ -211,6 +208,16 @@ class NewHadoopRDD[K, V](
     locs.getOrElse(split.getLocations.filter(_ != "localhost"))
   }
 
+  override def persist(storageLevel: StorageLevel): this.type = {
+    if (storageLevel.deserialized) {
+      logWarning("Caching NewHadoopRDDs as deserialized objects usually leads 
to undesired" +
+        " behavior because Hadoop's RecordReader reuses the same Writable 
object for all records." +
+        " Use a map transformation to make copies of the records.")
+    }
+    super.persist(storageLevel)
+  }
+
+
   def getConf: Configuration = confBroadcast.value.value
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to