I am test checkpoint to understand how it works, My code as following:


scala> val data = sc.parallelize(List("a", "b", "c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at 
parallelize at <console>:15


scala> sc.setCheckpointDir("/tmp/checkpoint")
15/11/25 18:09:07 WARN spark.SparkContext: Checkpoint directory must be 
non-local if Spark is running on a cluster: /tmp/checkpoint1


scala> data.checkpoint


scala> val temp = data.map(item => (item, 1))
temp: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at 
<console>:17


scala> temp.checkpoint


scala> temp.count


but I found that only the temp RDD is checkpont in the /tmp/checkpoint 
directory, The data RDD is not checkpointed! I found the doCheckpoint function  
in the org.apache.spark.rdd.RDD class:


  private[spark] def doCheckpoint(): Unit = {
    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, 
ignoreParent = true) {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          checkpointData.get.checkpoint()
        } else {
          dependencies.foreach(_.rdd.doCheckpoint())
        }
      }
    }
  }


from the code above, Only the last RDD(In my case is temp) will be 
checkpointed, My question : Is deliberately designed or this is a bug?


Thank you.










 

Reply via email to