Awesome, thanks for explaining it.

ср, 19 авг. 2020 г. в 16:29, Russell Spitzer <>:

> It determines whether it can use the checkpoint at runtime, so you'll be
> able to see it in the UI but not in the plan since you are looking at the
> plan
> before the job is actually running when it checks to see if it can use the
> checkpoint in the lineage.
> Here is a two stage job for example:
> *scala> val x = sc.parallelize(Seq("foo","bar"))*
> *x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at
> parallelize at <console>:24*
> *scala> val y = x.repartition(3)*
> *y: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at repartition
> at <console>:25*
> *scala> y.checkpoint*
> *scala> y.count*
> *res12: Long = 2*
> [image: image.png]
> [image: image.png]
> *scala> y.count*
> *res13: Long = 2*
> [image: image.png]
> Notice that we were able to skip the first stage because when Stage 11
> looked for it's dependencies it
> found a checkpointed version of the partitioned data so it didn't need to
> repartition again. This makes my
> 2 Stage job into a 2 Stage job with 1 stage skipped or a 1 stage job.
> On Wed, Aug 19, 2020 at 9:07 AM Ivan Petrov <> wrote:
>> i did it and see lineage change
>> BEFORE calling action. No success.
>> Job$ - isCheckpointed? false, getCheckpointFile: None
>> Job$ - recordsRDD.toDebugString:
>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>     |  MapPartitionsRDD[2] at map at ...:66 []
>> AFTER calling action. nice, it works!
>> Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>>     Job$ - recordsRDD.toDebugString:
>>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>> Lineage now contains only one stage but I want to get rid of it too. This
>> stage happens right before the checkpoint. Will Spark try to re-run it in
>> case task failure AFTER checkpoint?
>> My expectation is that spark will read directly from checkpoint dir, It
>> doesn't have to do anything with previous MapPartitionsRDD[7] at map at
>>  Job.scala:112
>> ср, 19 авг. 2020 г. в 16:01, Russell Spitzer <>:
>>> Checkpoint is lazy and needs an action to actually do the work. The
>>> method just marks the rdd as noted in the doc you posted.
>>> Call an action twice. The second run should use the checkpoint.
>>> On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <> wrote:
>>>> i think it returns Unit... it won't work
>>>> [image: image.png]
>>>> I found another way to make it work. Called action after checkpoint
>>>> val recordsRDD = convertToRecords(anotherRDD)
>>>>     recordsRDD.checkpoint()
>>>>"checkpoint done")
>>>>     recordsRDD.count() // (!!!)
>>>>"isCheckpointed? ${recordsRDD.isCheckpointed},
>>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>>> \n${recordsRDD.toDebugString}")
>>>>     Output:
>>>>     Job$ - checkpoint done (!!!)
>>>>     Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>>>>     Job$ - recordsRDD.toDebugString:
>>>>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>> But still it has single MapPartitionsRDD in lineage. Lineage became
>>>> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
>>>> want it to take data directly from checkpoint dir.
>>>> MapPartitionsRDD has non-idempotent id generation. i don't want to call
>>>> it twice in case of downstream task failure
>>>> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <>:
>>>>> Hi Ivan,
>>>>> Unlike cache/persist, checkpoint does not operate in-place but
>>>>> requires the result to be assigned to a new variable. In your case:
>>>>> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>>>>> Best,
>>>>> Jacob
>>>>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <
>>>>> >:
>>>>>> Hi!
>>>>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>>>>>> checkpointed...
>>>>>> What do I do wrong?
>>>>>> val recordsRDD = convertToRecords(anotherRDD)
>>>>>> recordsRDD.checkpoint()
>>>>>>"checkpoint done")
>>>>>>"isCheckpointed? ${recordsRDD.isCheckpointed},
>>>>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>>>>> \n${recordsRDD.toDebugString}")
>>>>>> Output:
>>>>>> Job$ - checkpoint done (!!!)
>>>>>> But then.....
>>>>>> Job$ - isCheckpointed? false, getCheckpointFile: None
>>>>>> Job$ - recordsRDD.toDebugString:
>>>>>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>>>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>>>>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>>>>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>>>>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>>>>>     |  MapPartitionsRDD[2] at map at ...:66 []

Reply via email to