[ https://issues.apache.org/jira/browse/SPARK-7002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tom Hubregtsen updated SPARK-7002: ---------------------------------- Description: The major issue is: Persist on RDD fails the second time if the action is called on a child RDD without showing a FAILED message. This is pointed out at 2) next to this: toDebugString on a child RDD does not show that the parent RDD is [Disk Serialized 1x Replicated]. This is pointed out at 1) Note: I am persisting to disk (DISK_ONLY) to validate that the RDD is or is not physically stored, as I did not want to solely rely on a missing line in .toDebugString (see comments in trace) scala> val rdd1 = sc.parallelize(List(1,2,3)) scala> val rdd2 = rdd1.map(x => (x,x+1)) scala> val rdd3 = rdd2.reduceByKey( (x,y) => x+y) scala> import org.apache.spark.storage.StorageLevel scala> rdd2.persist(StorageLevel.DISK_ONLY) scala> rdd3.collect() scala> rdd2.toDebugString res4: String = (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated] \| CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B | ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 1x Replicated] scala> rdd3.toDebugString res5: String = (100) ShuffledRDD[2] at reduceByKey at <console>:25 [] +-(100) MapPartitionsRDD[1] at map at <console>:23 [] | CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B | ParallelCollectionRDD[0] at parallelize at <console>:21 [] // 1) rdd3 does not show that the other RDD's are [Disk Serialized 1x Replicated], but the data is on disk. This is verified by // a) The line starting with CachedPartitions // b) a find in spark_local_dir: "find . -name "*" | grep rdd" returns "./spark-b39bcf9b-e7d7-4284-bdd2-1be7ac3cacef/blockmgr-4f4c0b1c-b47a-4972-b364-7179ea6e0873/1f/rdd_4_*", where * are the number of partitions scala> rdd2.unpersist() scala> rdd2.toDebugString res8: String = (100) MapPartitionsRDD[1] at map at <console>:23 [] | ParallelCollectionRDD[0] at parallelize at <console>:21 [] scala> rdd3.toDebugString res9: String = (100) ShuffledRDD[2] at reduceByKey at <console>:25 [] +-(100) MapPartitionsRDD[1] at map at <console>:23 [] | ParallelCollectionRDD[0] at parallelize at <console>:21 [] // successfully unpersisted, also not visible on disk scala> rdd2.persist(StorageLevel.DISK_ONLY) scala> rdd3.collect() scala> rdd2.toDebugString res18: String = (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated] | ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 1x Replicated] scala> rdd3.toDebugString res19: String = (100) ShuffledRDD[2] at reduceByKey at <console>:25 [] +-(100) MapPartitionsRDD[1] at map at <console>:23 [] | ParallelCollectionRDD[0] at parallelize at <console>:21 [] // 2) The data is not visible on disk though the find command previously mentioned, and is also not mentioned in the toDebugString (no line starting with CachedPartitions, even though [Disk Serialized 1x Replicated] is mentioned). It does work when you call the action on the actual RDD: scala> rdd2.collect() scala> rdd2.toDebugString res21: String = (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated] | CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B | ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 1x Replicated] scala> rdd3.toDebugString res22: String = (100) ShuffledRDD[2] at reduceByKey at <console>:25 [] +-(100) MapPartitionsRDD[1] at map at <console>:23 [] | CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B | ParallelCollectionRDD[0] at parallelize at <console>:21 [] // Data appears on disk again (using find command preciously mentioned), and line with CachedPartitions is back in the .toDebugString was: The major issue is: Persist on RDD fails the second time if the action is called on a child RDD without showing a FAILED message. This is pointed out at 2) next to this: toDebugString on a child RDD does not show that the parent RDD is [Disk Serialized 1x Replicated]. This is pointed out at 1) Note: I am persisting to disk (DISK_ONLY) to validate that the RDD is or is not physically stored, as I did not want to solely rely on a missing line in .toDebugString (see comments in trace) scala> val rdd1 = sc.parallelize(List(1,2,3)) scala> val rdd2 = rdd1.map(x => (x,x+1)) scala> val rdd3 = rdd2.reduceByKey( (x,y) => x+y) scala> import org.apache.spark.storage.StorageLevel scala> rdd2.persist(StorageLevel.DISK_ONLY) scala> rdd3.collect() scala> rdd2.toDebugString res4: String = (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated] | CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B | ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 1x Replicated] scala> rdd3.toDebugString res5: String = (100) ShuffledRDD[2] at reduceByKey at <console>:25 [] +-(100) MapPartitionsRDD[1] at map at <console>:23 [] | CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B | ParallelCollectionRDD[0] at parallelize at <console>:21 [] // 1) rdd3 does not show that the other RDD's are [Disk Serialized 1x Replicated], but the data is on disk. This is verified by // a) The line starting with CachedPartitions // b) a find in spark_local_dir: "find . -name "*" | grep rdd" returns "./spark-b39bcf9b-e7d7-4284-bdd2-1be7ac3cacef/blockmgr-4f4c0b1c-b47a-4972-b364-7179ea6e0873/1f/rdd_4_*", where * are the number of partitions scala> rdd2.unpersist() scala> rdd2.toDebugString res8: String = (100) MapPartitionsRDD[1] at map at <console>:23 [] | ParallelCollectionRDD[0] at parallelize at <console>:21 [] scala> rdd3.toDebugString res9: String = (100) ShuffledRDD[2] at reduceByKey at <console>:25 [] +-(100) MapPartitionsRDD[1] at map at <console>:23 [] | ParallelCollectionRDD[0] at parallelize at <console>:21 [] // successfully unpersisted, also not visible on disk scala> rdd2.persist(StorageLevel.DISK_ONLY) scala> rdd3.collect() scala> rdd2.toDebugString res18: String = (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated] | ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 1x Replicated] scala> rdd3.toDebugString res19: String = (100) ShuffledRDD[2] at reduceByKey at <console>:25 [] +-(100) MapPartitionsRDD[1] at map at <console>:23 [] | ParallelCollectionRDD[0] at parallelize at <console>:21 [] // 2) The data is not visible on disk though the find command previously mentioned, and is also not mentioned in the toDebugString (no line starting with CachedPartitions, even though [Disk Serialized 1x Replicated] is mentioned). It does work when you call the action on the actual RDD: scala> rdd2.collect() scala> rdd2.toDebugString res21: String = (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated] | CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B | ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 1x Replicated] scala> rdd3.toDebugString res22: String = (100) ShuffledRDD[2] at reduceByKey at <console>:25 [] +-(100) MapPartitionsRDD[1] at map at <console>:23 [] | CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B | ParallelCollectionRDD[0] at parallelize at <console>:21 [] // Data appears on disk again (using find command preciously mentioned), and line with CachedPartitions is back in the .toDebugString > Persist on RDD fails the second time if the action is called on a child RDD > without showing a FAILED message > ------------------------------------------------------------------------------------------------------------ > > Key: SPARK-7002 > URL: https://issues.apache.org/jira/browse/SPARK-7002 > Project: Spark > Issue Type: Bug > Components: Spark Core, Spark Shell > Affects Versions: 1.3.0 > Environment: Platform: Power8 > OS: Ubuntu 14.10 > Java: java-8-openjdk-ppc64el > Reporter: Tom Hubregtsen > Labels: disk, persist, unpersist > > The major issue is: Persist on RDD fails the second time if the action is > called on a child RDD without showing a FAILED message. This is pointed out > at 2) > next to this: > toDebugString on a child RDD does not show that the parent RDD is [Disk > Serialized 1x Replicated]. This is pointed out at 1) > Note: I am persisting to disk (DISK_ONLY) to validate that the RDD is or is > not physically stored, as I did not want to solely rely on a missing line in > .toDebugString (see comments in trace) > scala> val rdd1 = sc.parallelize(List(1,2,3)) > scala> val rdd2 = rdd1.map(x => (x,x+1)) > scala> val rdd3 = rdd2.reduceByKey( (x,y) => x+y) > scala> import org.apache.spark.storage.StorageLevel > scala> rdd2.persist(StorageLevel.DISK_ONLY) > scala> rdd3.collect() > scala> rdd2.toDebugString > res4: String = > (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x > Replicated] > \| CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; > DiskSize: 802.0 B > | ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk > Serialized 1x Replicated] > scala> rdd3.toDebugString > res5: String = > (100) ShuffledRDD[2] at reduceByKey at <console>:25 [] > +-(100) MapPartitionsRDD[1] at map at <console>:23 [] > | CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; > DiskSize: 802.0 B > | ParallelCollectionRDD[0] at parallelize at <console>:21 [] > // 1) rdd3 does not show that the other RDD's are [Disk Serialized 1x > Replicated], but the data is on disk. This is verified by > // a) The line starting with CachedPartitions > // b) a find in spark_local_dir: "find . -name "*" | grep rdd" returns > "./spark-b39bcf9b-e7d7-4284-bdd2-1be7ac3cacef/blockmgr-4f4c0b1c-b47a-4972-b364-7179ea6e0873/1f/rdd_4_*", > where * are the number of partitions > scala> rdd2.unpersist() > scala> rdd2.toDebugString > res8: String = > (100) MapPartitionsRDD[1] at map at <console>:23 [] > | ParallelCollectionRDD[0] at parallelize at <console>:21 [] > scala> rdd3.toDebugString > res9: String = > (100) ShuffledRDD[2] at reduceByKey at <console>:25 [] > +-(100) MapPartitionsRDD[1] at map at <console>:23 [] > | ParallelCollectionRDD[0] at parallelize at <console>:21 [] > // successfully unpersisted, also not visible on disk > scala> rdd2.persist(StorageLevel.DISK_ONLY) > scala> rdd3.collect() > scala> rdd2.toDebugString > res18: String = > (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x > Replicated] > | ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk > Serialized 1x Replicated] > scala> rdd3.toDebugString > res19: String = > (100) ShuffledRDD[2] at reduceByKey at <console>:25 [] > +-(100) MapPartitionsRDD[1] at map at <console>:23 [] > | ParallelCollectionRDD[0] at parallelize at <console>:21 [] > // 2) The data is not visible on disk though the find command previously > mentioned, and is also not mentioned in the toDebugString (no line starting > with CachedPartitions, even though [Disk Serialized 1x Replicated] is > mentioned). It does work when you call the action on the actual RDD: > scala> rdd2.collect() > scala> rdd2.toDebugString > res21: String = > (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x > Replicated] > | CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; > DiskSize: 802.0 B > | ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk > Serialized 1x Replicated] > scala> rdd3.toDebugString > res22: String = > (100) ShuffledRDD[2] at reduceByKey at <console>:25 [] > +-(100) MapPartitionsRDD[1] at map at <console>:23 [] > | CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; > DiskSize: 802.0 B > | ParallelCollectionRDD[0] at parallelize at <console>:21 [] > // Data appears on disk again (using find command preciously mentioned), and > line with CachedPartitions is back in the .toDebugString -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org