[ 
https://issues.apache.org/jira/browse/SPARK-7002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Hubregtsen updated SPARK-7002:
----------------------------------
    Description: 
The major issue is: Persist on RDD fails the second time if the action is 
called on a child RDD without showing a FAILED message. This is pointed out at 
2)
next to this:
toDebugString on a child RDD does not show that the parent RDD is [Disk 
Serialized 1x Replicated]. This is pointed out at 1)

Note: I am persisting to disk (DISK_ONLY) to validate that the RDD is or is not 
physically stored, as I did not want to solely rely on a missing line in 
.toDebugString (see comments in trace)

scala> val rdd1 = sc.parallelize(List(1,2,3))
scala> val rdd2 = rdd1.map(x => (x,x+1))
scala> val rdd3 = rdd2.reduceByKey( (x,y) => x+y)
scala> import org.apache.spark.storage.StorageLevel
scala> rdd2.persist(StorageLevel.DISK_ONLY)
scala> rdd3.collect()
scala> rdd2.toDebugString
res4: String = 
(100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
  \|        CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; 
DiskSize: 802.0 B
  |   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 
1x Replicated]
scala> rdd3.toDebugString
res5: String = 
(100) ShuffledRDD[2] at reduceByKey at <console>:25 []
  +-(100) MapPartitionsRDD[1] at map at <console>:23 []
      |       CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; 
DiskSize: 802.0 B
      |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
// 1) rdd3 does not show that the other RDD's are [Disk Serialized 1x 
Replicated], but the data is on disk. This is verified by
// a) The line starting with CachedPartitions
// b) a find in spark_local_dir: "find . -name "*"  | grep rdd" returns 
"./spark-b39bcf9b-e7d7-4284-bdd2-1be7ac3cacef/blockmgr-4f4c0b1c-b47a-4972-b364-7179ea6e0873/1f/rdd_4_*",
 where * are the number of partitions
scala> rdd2.unpersist()
scala> rdd2.toDebugString
res8: String = 
(100) MapPartitionsRDD[1] at map at <console>:23 []
  |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
scala> rdd3.toDebugString
res9: String = 
(100) ShuffledRDD[2] at reduceByKey at <console>:25 []
  +-(100) MapPartitionsRDD[1] at map at <console>:23 []
      |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
// successfully unpersisted, also not visible on disk
scala> rdd2.persist(StorageLevel.DISK_ONLY)
scala> rdd3.collect()
scala> rdd2.toDebugString
res18: String = 
(100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
  |   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 
1x Replicated]
scala> rdd3.toDebugString
res19: String = 
(100) ShuffledRDD[2] at reduceByKey at <console>:25 []
  +-(100) MapPartitionsRDD[1] at map at <console>:23 []
      |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
// 2) The data is not visible on disk though the find command previously 
mentioned, and is also not mentioned in the toDebugString (no line starting 
with CachedPartitions, even though  [Disk Serialized 1x Replicated] is 
mentioned). It does work when you call the action on the actual RDD:
scala> rdd2.collect()
scala> rdd2.toDebugString
res21: String = 
(100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
  |        CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; 
DiskSize: 802.0 B
  |   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 
1x Replicated]
scala> rdd3.toDebugString
res22: String = 
(100) ShuffledRDD[2] at reduceByKey at <console>:25 []
  +-(100) MapPartitionsRDD[1] at map at <console>:23 []
      |       CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; 
DiskSize: 802.0 B
      |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
// Data appears on disk again (using find command preciously mentioned), and 
line with CachedPartitions is back in the .toDebugString


  was:
The major issue is: Persist on RDD fails the second time if the action is 
called on a child RDD without showing a FAILED message. This is pointed out at 
2)
next to this:
toDebugString on a child RDD does not show that the parent RDD is [Disk 
Serialized 1x Replicated]. This is pointed out at 1)

Note: I am persisting to disk (DISK_ONLY) to validate that the RDD is or is not 
physically stored, as I did not want to solely rely on a missing line in 
.toDebugString (see comments in trace)

scala> val rdd1 = sc.parallelize(List(1,2,3))
scala> val rdd2 = rdd1.map(x => (x,x+1))
scala> val rdd3 = rdd2.reduceByKey( (x,y) => x+y)
scala> import org.apache.spark.storage.StorageLevel
scala> rdd2.persist(StorageLevel.DISK_ONLY)
scala> rdd3.collect()
scala> rdd2.toDebugString
res4: String = 
(100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
  |        CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; 
DiskSize: 802.0 B
  |   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 
1x Replicated]
scala> rdd3.toDebugString
res5: String = 
(100) ShuffledRDD[2] at reduceByKey at <console>:25 []
  +-(100) MapPartitionsRDD[1] at map at <console>:23 []
      |       CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; 
DiskSize: 802.0 B
      |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
// 1) rdd3 does not show that the other RDD's are [Disk Serialized 1x 
Replicated], but the data is on disk. This is verified by
// a) The line starting with CachedPartitions
// b) a find in spark_local_dir: "find . -name "*"  | grep rdd" returns 
"./spark-b39bcf9b-e7d7-4284-bdd2-1be7ac3cacef/blockmgr-4f4c0b1c-b47a-4972-b364-7179ea6e0873/1f/rdd_4_*",
 where * are the number of partitions
scala> rdd2.unpersist()
scala> rdd2.toDebugString
res8: String = 
(100) MapPartitionsRDD[1] at map at <console>:23 []
  |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
scala> rdd3.toDebugString
res9: String = 
(100) ShuffledRDD[2] at reduceByKey at <console>:25 []
  +-(100) MapPartitionsRDD[1] at map at <console>:23 []
      |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
// successfully unpersisted, also not visible on disk
scala> rdd2.persist(StorageLevel.DISK_ONLY)
scala> rdd3.collect()
scala> rdd2.toDebugString
res18: String = 
(100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
  |   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 
1x Replicated]
scala> rdd3.toDebugString
res19: String = 
(100) ShuffledRDD[2] at reduceByKey at <console>:25 []
  +-(100) MapPartitionsRDD[1] at map at <console>:23 []
      |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
// 2) The data is not visible on disk though the find command previously 
mentioned, and is also not mentioned in the toDebugString (no line starting 
with CachedPartitions, even though  [Disk Serialized 1x Replicated] is 
mentioned). It does work when you call the action on the actual RDD:
scala> rdd2.collect()
scala> rdd2.toDebugString
res21: String = 
(100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
  |        CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; 
DiskSize: 802.0 B
  |   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 
1x Replicated]
scala> rdd3.toDebugString
res22: String = 
(100) ShuffledRDD[2] at reduceByKey at <console>:25 []
  +-(100) MapPartitionsRDD[1] at map at <console>:23 []
      |       CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; 
DiskSize: 802.0 B
      |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
// Data appears on disk again (using find command preciously mentioned), and 
line with CachedPartitions is back in the .toDebugString



> Persist on RDD fails the second time if the action is called on a child RDD 
> without showing a FAILED message
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-7002
>                 URL: https://issues.apache.org/jira/browse/SPARK-7002
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, Spark Shell
>    Affects Versions: 1.3.0
>         Environment: Platform: Power8
> OS: Ubuntu 14.10
> Java: java-8-openjdk-ppc64el
>            Reporter: Tom Hubregtsen
>              Labels: disk, persist, unpersist
>
> The major issue is: Persist on RDD fails the second time if the action is 
> called on a child RDD without showing a FAILED message. This is pointed out 
> at 2)
> next to this:
> toDebugString on a child RDD does not show that the parent RDD is [Disk 
> Serialized 1x Replicated]. This is pointed out at 1)
> Note: I am persisting to disk (DISK_ONLY) to validate that the RDD is or is 
> not physically stored, as I did not want to solely rely on a missing line in 
> .toDebugString (see comments in trace)
> scala> val rdd1 = sc.parallelize(List(1,2,3))
> scala> val rdd2 = rdd1.map(x => (x,x+1))
> scala> val rdd3 = rdd2.reduceByKey( (x,y) => x+y)
> scala> import org.apache.spark.storage.StorageLevel
> scala> rdd2.persist(StorageLevel.DISK_ONLY)
> scala> rdd3.collect()
> scala> rdd2.toDebugString
> res4: String = 
> (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x 
> Replicated]
>   \|        CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; 
> DiskSize: 802.0 B
>   |   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk 
> Serialized 1x Replicated]
> scala> rdd3.toDebugString
> res5: String = 
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
>   +-(100) MapPartitionsRDD[1] at map at <console>:23 []
>       |       CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; 
> DiskSize: 802.0 B
>       |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // 1) rdd3 does not show that the other RDD's are [Disk Serialized 1x 
> Replicated], but the data is on disk. This is verified by
> // a) The line starting with CachedPartitions
> // b) a find in spark_local_dir: "find . -name "*"  | grep rdd" returns 
> "./spark-b39bcf9b-e7d7-4284-bdd2-1be7ac3cacef/blockmgr-4f4c0b1c-b47a-4972-b364-7179ea6e0873/1f/rdd_4_*",
>  where * are the number of partitions
> scala> rdd2.unpersist()
> scala> rdd2.toDebugString
> res8: String = 
> (100) MapPartitionsRDD[1] at map at <console>:23 []
>   |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> scala> rdd3.toDebugString
> res9: String = 
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
>   +-(100) MapPartitionsRDD[1] at map at <console>:23 []
>       |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // successfully unpersisted, also not visible on disk
> scala> rdd2.persist(StorageLevel.DISK_ONLY)
> scala> rdd3.collect()
> scala> rdd2.toDebugString
> res18: String = 
> (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x 
> Replicated]
>   |   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk 
> Serialized 1x Replicated]
> scala> rdd3.toDebugString
> res19: String = 
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
>   +-(100) MapPartitionsRDD[1] at map at <console>:23 []
>       |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // 2) The data is not visible on disk though the find command previously 
> mentioned, and is also not mentioned in the toDebugString (no line starting 
> with CachedPartitions, even though  [Disk Serialized 1x Replicated] is 
> mentioned). It does work when you call the action on the actual RDD:
> scala> rdd2.collect()
> scala> rdd2.toDebugString
> res21: String = 
> (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x 
> Replicated]
>   |        CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; 
> DiskSize: 802.0 B
>   |   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk 
> Serialized 1x Replicated]
> scala> rdd3.toDebugString
> res22: String = 
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
>   +-(100) MapPartitionsRDD[1] at map at <console>:23 []
>       |       CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; 
> DiskSize: 802.0 B
>       |   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // Data appears on disk again (using find command preciously mentioned), and 
> line with CachedPartitions is back in the .toDebugString



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to