I am test checkpoint to understand how it works, My code as following:
scala> val data = sc.parallelize(List("a", "b", "c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
parallelize at <console>:15
scala> sc.setCheckpointDir("/tmp/checkpoint")
15/11/25 18:09:07 WARN spark.SparkContext: Checkpoint directory must be
non-local if Spark is running on a cluster: /tmp/checkpoint1
scala> data.checkpoint
scala> val temp = data.map(item => (item, 1))
temp: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at
<console>:17
scala> temp.checkpoint
scala> temp.count
but I found that only the temp RDD is checkpont in the /tmp/checkpoint
directory, The data RDD is not checkpointed! I found the doCheckpoint function
in the org.apache.spark.rdd.RDD class:
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false,
ignoreParent = true) {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.checkpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
from the code above, Only the last RDD(In my case is temp) will be
checkpointed, My question : Is deliberately designed or this is a bug?
Thank you.