[jira] [Comment Edited] (SPARK-6808) Checkpointing after zipPartitions results in NODE_LOCAL execution
[ https://issues.apache.org/jira/browse/SPARK-6808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14493769#comment-14493769 ] Liang-Chi Hsieh edited comment on SPARK-6808 at 4/14/15 8:18 AM: - I think that is because `CheckpointRDD.getPreferredLocations` can only returns host-level preferred locations. That is reasonable due to it is the locations of file blocks. Then when a RDD is asked about `preferredLocations`, it first consult its checkpointRDD preferred locations by calling checkpointRDD's`getPreferredLocations`. If it has no checkpointRDD, it then calls its `getPreferredLocations`. I am not sure if this is a bug or intended behavior. was (Author: viirya): I think that is because `CheckpointRDD.getPreferredLocations` can only returns host-level preferred locations. That is reasonable due to it is the locations of file blocks. Then when a RDD is asked about `preferredLocations`, it first consult its checkpointRDD preferred locations by calling `getPreferredLocations`. If it has no checkpointRDD, it then calls its `getPreferredLocations`. I am not sure if this is a bug or intended behavior. Checkpointing after zipPartitions results in NODE_LOCAL execution - Key: SPARK-6808 URL: https://issues.apache.org/jira/browse/SPARK-6808 Project: Spark Issue Type: Bug Components: GraphX, Spark Core Affects Versions: 1.2.1, 1.3.0 Environment: EC2 Ubuntu r3.8xlarge machines Reporter: Xinghao Pan I'm encountering a weird issue where a simple iterative zipPartition is PROCESS_LOCAL before checkpointing, but turns NODE_LOCAL for all iterations after checkpointing. Here's an example snippet of code: var R : RDD[(Long,Int)] = sc.parallelize((0 until numPartitions), numPartitions) .mapPartitions(_ = new Array[(Long,Int)](1000).map(i = (0L,0)).toSeq.iterator).cache() sc.setCheckpointDir(checkpointDir) var iteration = 0 while (iteration 50){ R = R.zipPartitions(R)((x,y) = x).cache() if ((iteration+1) % checkpointIter == 0) R.checkpoint() R.foreachPartition(_ = {}) iteration += 1 } I've also tried to unpersist the old RDDs, and increased spark.locality.wait but nether helps. Strangely, by adding a simple identity map R = R.map(x = x).cache() after the zipPartitions appears to partially mitigate the issue. The problem was originally triggered when I attempted to checkpoint after doing joinVertices in GraphX, but the above example shows that the issue is in Spark core too. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-6808) Checkpointing after zipPartitions results in NODE_LOCAL execution
[ https://issues.apache.org/jira/browse/SPARK-6808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14493769#comment-14493769 ] Liang-Chi Hsieh edited comment on SPARK-6808 at 4/14/15 8:18 AM: - I think that is because {{CheckpointRDD.getPreferredLocations}} can only returns host-level preferred locations. That is reasonable due to it is the locations of file blocks. Then when a RDD is asked about {{preferredLocations}}, it first consult its checkpointRDD preferred locations by calling checkpointRDD's{{getPreferredLocations}}. If it has no checkpointRDD, it then calls its {{getPreferredLocations}}. I am not sure if this is a bug or intended behavior. was (Author: viirya): I think that is because `CheckpointRDD.getPreferredLocations` can only returns host-level preferred locations. That is reasonable due to it is the locations of file blocks. Then when a RDD is asked about `preferredLocations`, it first consult its checkpointRDD preferred locations by calling checkpointRDD's`getPreferredLocations`. If it has no checkpointRDD, it then calls its `getPreferredLocations`. I am not sure if this is a bug or intended behavior. Checkpointing after zipPartitions results in NODE_LOCAL execution - Key: SPARK-6808 URL: https://issues.apache.org/jira/browse/SPARK-6808 Project: Spark Issue Type: Bug Components: GraphX, Spark Core Affects Versions: 1.2.1, 1.3.0 Environment: EC2 Ubuntu r3.8xlarge machines Reporter: Xinghao Pan I'm encountering a weird issue where a simple iterative zipPartition is PROCESS_LOCAL before checkpointing, but turns NODE_LOCAL for all iterations after checkpointing. Here's an example snippet of code: var R : RDD[(Long,Int)] = sc.parallelize((0 until numPartitions), numPartitions) .mapPartitions(_ = new Array[(Long,Int)](1000).map(i = (0L,0)).toSeq.iterator).cache() sc.setCheckpointDir(checkpointDir) var iteration = 0 while (iteration 50){ R = R.zipPartitions(R)((x,y) = x).cache() if ((iteration+1) % checkpointIter == 0) R.checkpoint() R.foreachPartition(_ = {}) iteration += 1 } I've also tried to unpersist the old RDDs, and increased spark.locality.wait but nether helps. Strangely, by adding a simple identity map R = R.map(x = x).cache() after the zipPartitions appears to partially mitigate the issue. The problem was originally triggered when I attempted to checkpoint after doing joinVertices in GraphX, but the above example shows that the issue is in Spark core too. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org