Awesome, thanks for explaining it. ср, 19 авг. 2020 г. в 16:29, Russell Spitzer <russell.spit...@gmail.com>:
> It determines whether it can use the checkpoint at runtime, so you'll be > able to see it in the UI but not in the plan since you are looking at the > plan > before the job is actually running when it checks to see if it can use the > checkpoint in the lineage. > > Here is a two stage job for example: > > *scala> val x = sc.parallelize(Seq("foo","bar"))* > *x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at > parallelize at <console>:24* > > *scala> val y = x.repartition(3)* > *y: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at repartition > at <console>:25* > > *scala> y.checkpoint* > > *scala> y.count* > *res12: Long = 2* > > [image: image.png] > > [image: image.png] > > *scala> y.count* > *res13: Long = 2* > > [image: image.png] > > Notice that we were able to skip the first stage because when Stage 11 > looked for it's dependencies it > found a checkpointed version of the partitioned data so it didn't need to > repartition again. This makes my > 2 Stage job into a 2 Stage job with 1 stage skipped or a 1 stage job. > > > > On Wed, Aug 19, 2020 at 9:07 AM Ivan Petrov <capacyt...@gmail.com> wrote: > >> i did it and see lineage change >> >> BEFORE calling action. No success. >> >> Job$ - isCheckpointed? false, getCheckpointFile: None >> Job$ - recordsRDD.toDebugString: >> (2) MapPartitionsRDD[7] at map at Job.scala:112 [] >> | MapPartitionsRDD[6] at map at Job.scala:111 [] >> | MapPartitionsRDD[5] at map at ....scala:40 [] >> | ShuffledRDD[4] at reduceByKey at ....scala:31 [] >> +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 [] >> | MapPartitionsRDD[2] at map at ...:66 [] >> >> AFTER calling action. nice, it works! >> Job$ - isCheckpointed? true, getCheckpointFile: Some(path) >> Job$ - recordsRDD.toDebugString: >> (2) MapPartitionsRDD[7] at map at Job.scala:112 [] >> >> Lineage now contains only one stage but I want to get rid of it too. This >> stage happens right before the checkpoint. Will Spark try to re-run it in >> case task failure AFTER checkpoint? >> My expectation is that spark will read directly from checkpoint dir, It >> doesn't have to do anything with previous MapPartitionsRDD[7] at map at >> Job.scala:112 >> >> ср, 19 авг. 2020 г. в 16:01, Russell Spitzer <russell.spit...@gmail.com>: >> >>> Checkpoint is lazy and needs an action to actually do the work. The >>> method just marks the rdd as noted in the doc you posted. >>> >>> Call an action twice. The second run should use the checkpoint. >>> >>> >>> >>> On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <capacyt...@gmail.com> wrote: >>> >>>> i think it returns Unit... it won't work >>>> [image: image.png] >>>> >>>> I found another way to make it work. Called action after checkpoint >>>> val recordsRDD = convertToRecords(anotherRDD) >>>> recordsRDD.checkpoint() >>>> logger.info("checkpoint done") >>>> recordsRDD.count() // (!!!) >>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, >>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}") >>>> logger.info(s"recordsRDD.toDebugString: >>>> \n${recordsRDD.toDebugString}") >>>> >>>> Output: >>>> Job$ - checkpoint done (!!!) >>>> >>>> Job$ - isCheckpointed? true, getCheckpointFile: Some(path) >>>> Job$ - recordsRDD.toDebugString: >>>> (2) MapPartitionsRDD[7] at map at Job.scala:112 [] >>>> >>>> But still it has single MapPartitionsRDD in lineage. Lineage became >>>> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i >>>> want it to take data directly from checkpoint dir. >>>> MapPartitionsRDD has non-idempotent id generation. i don't want to call >>>> it twice in case of downstream task failure >>>> >>>> >>>> >>>> >>>> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <abebopare...@gmail.com>: >>>> >>>>> Hi Ivan, >>>>> >>>>> Unlike cache/persist, checkpoint does not operate in-place but >>>>> requires the result to be assigned to a new variable. In your case: >>>>> >>>>> val recordsRDD = convertToRecords(anotherRDD).checkpoint() >>>>> >>>>> Best, >>>>> Jacob >>>>> >>>>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <capacyt...@gmail.com >>>>> >: >>>>> >>>>>> Hi! >>>>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not >>>>>> checkpointed... >>>>>> What do I do wrong? >>>>>> >>>>>> val recordsRDD = convertToRecords(anotherRDD) >>>>>> recordsRDD.checkpoint() >>>>>> logger.info("checkpoint done") >>>>>> >>>>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, >>>>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}") >>>>>> logger.info(s"recordsRDD.toDebugString: >>>>>> \n${recordsRDD.toDebugString}") >>>>>> >>>>>> Output: >>>>>> Job$ - checkpoint done (!!!) >>>>>> >>>>>> But then..... >>>>>> Job$ - isCheckpointed? false, getCheckpointFile: None >>>>>> Job$ - recordsRDD.toDebugString: >>>>>> (2) MapPartitionsRDD[7] at map at Job.scala:112 [] >>>>>> | MapPartitionsRDD[6] at map at Job.scala:111 [] >>>>>> | MapPartitionsRDD[5] at map at ....scala:40 [] >>>>>> | ShuffledRDD[4] at reduceByKey at ....scala:31 [] >>>>>> +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 [] >>>>>> | MapPartitionsRDD[2] at map at ...:66 [] >>>>>> >>>>>