Repository: spark Updated Branches: refs/heads/master 842d00032 -> 830934976
SPARK-5500. Document that feeding hadoopFile into a shuffle operation wi... ...ll cause problems Author: Sandy Ryza <sa...@cloudera.com> Closes #4293 from sryza/sandy-spark-5500 and squashes the following commits: e9ce742 [Sandy Ryza] Change to warning cc46e52 [Sandy Ryza] Add instructions and extend to NewHadoopRDD 6e1932a [Sandy Ryza] Throw exception on cache 0f6c4eb [Sandy Ryza] SPARK-5500. Document that feeding hadoopFile into a shuffle operation will cause problems Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83093497 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83093497 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83093497 Branch: refs/heads/master Commit: 830934976e8cf9e894bd3e5758fb941cad5d2f0b Parents: 842d000 Author: Sandy Ryza <sa...@cloudera.com> Authored: Mon Feb 2 14:52:46 2015 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Mon Feb 2 14:52:46 2015 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 69 +++++++++++--------- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 12 +++- .../org/apache/spark/rdd/NewHadoopRDD.scala | 17 +++-- 3 files changed, 62 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/83093497/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3c61c10..228076f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -687,9 +687,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @param minPartitions Minimum number of Hadoop Splits to generate. * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. */ def hadoopRDD[K, V]( conf: JobConf, @@ -705,12 +706,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** Get an RDD for a Hadoop file with an arbitrary InputFormat - * - * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. - * */ + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. + */ def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], @@ -741,9 +743,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * }}} * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. */ def hadoopFile[K, V, F <: InputFormat[K, V]] (path: String, minPartitions: Int) @@ -764,9 +767,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * }}} * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = @@ -788,9 +792,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * and extra configuration options to pass to the input format. * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. */ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( path: String, @@ -810,9 +815,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * and extra configuration options to pass to the input format. * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. */ def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( conf: Configuration = hadoopConfiguration, @@ -826,9 +832,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** Get an RDD for a Hadoop SequenceFile with given key and value types. * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. */ def sequenceFile[K, V](path: String, keyClass: Class[K], @@ -843,9 +850,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** Get an RDD for a Hadoop SequenceFile with given key and value types. * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. * */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = { assertNotStopped() @@ -869,9 +877,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * allow it to figure out the Writable class to use in the subclass case. * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. */ def sequenceFile[K, V] (path: String, minPartitions: Int = defaultMinPartitions) http://git-wip-us.apache.org/repos/asf/spark/blob/83093497/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index c3e3931..89adddc 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -42,10 +42,11 @@ import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.executor.{DataReadMethod, InputMetrics} +import org.apache.spark.executor.DataReadMethod import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD import org.apache.spark.util.{NextIterator, Utils} import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation} +import org.apache.spark.storage.StorageLevel /** * A Spark split class that wraps around a Hadoop InputSplit. @@ -308,6 +309,15 @@ class HadoopRDD[K, V]( // Do nothing. Hadoop RDD should not be checkpointed. } + override def persist(storageLevel: StorageLevel): this.type = { + if (storageLevel.deserialized) { + logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" + + " behavior because Hadoop's RecordReader reuses the same Writable object for all records." + + " Use a map transformation to make copies of the records.") + } + super.persist(storageLevel) + } + def getConf: Configuration = getJobConf() } http://git-wip-us.apache.org/repos/asf/spark/blob/83093497/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index d86f95a..44b9ffd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -29,16 +29,13 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.input.WholeTextFileInputFormat -import org.apache.spark.InterruptibleIterator -import org.apache.spark.Logging -import org.apache.spark.Partition -import org.apache.spark.SerializableWritable -import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark._ import org.apache.spark.executor.DataReadMethod import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.storage.StorageLevel private[spark] class NewHadoopPartition( rddId: Int, @@ -211,6 +208,16 @@ class NewHadoopRDD[K, V]( locs.getOrElse(split.getLocations.filter(_ != "localhost")) } + override def persist(storageLevel: StorageLevel): this.type = { + if (storageLevel.deserialized) { + logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" + + " behavior because Hadoop's RecordReader reuses the same Writable object for all records." + + " Use a map transformation to make copies of the records.") + } + super.persist(storageLevel) + } + + def getConf: Configuration = confBroadcast.value.value } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org