Awesome, thanks for explaining it.

ср, 19 авг. 2020 г. в 16:29, Russell Spitzer <russell.spit...@gmail.com>:

> It determines whether it can use the checkpoint at runtime, so you'll be
> able to see it in the UI but not in the plan since you are looking at the
> plan
> before the job is actually running when it checks to see if it can use the
> checkpoint in the lineage.
>
> Here is a two stage job for example:
>
> *scala> val x = sc.parallelize(Seq("foo","bar"))*
> *x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at
> parallelize at <console>:24*
>
> *scala> val y = x.repartition(3)*
> *y: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at repartition
> at <console>:25*
>
> *scala> y.checkpoint*
>
> *scala> y.count*
> *res12: Long = 2*
>
> [image: image.png]
>
> [image: image.png]
>
> *scala> y.count*
> *res13: Long = 2*
>
> [image: image.png]
>
> Notice that we were able to skip the first stage because when Stage 11
> looked for it's dependencies it
> found a checkpointed version of the partitioned data so it didn't need to
> repartition again. This makes my
> 2 Stage job into a 2 Stage job with 1 stage skipped or a 1 stage job.
>
>
>
> On Wed, Aug 19, 2020 at 9:07 AM Ivan Petrov <capacyt...@gmail.com> wrote:
>
>> i did it and see lineage change
>>
>> BEFORE calling action. No success.
>>
>> Job$ - isCheckpointed? false, getCheckpointFile: None
>> Job$ - recordsRDD.toDebugString:
>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>     |  MapPartitionsRDD[2] at map at ...:66 []
>>
>> AFTER calling action. nice, it works!
>> Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>>     Job$ - recordsRDD.toDebugString:
>>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>
>> Lineage now contains only one stage but I want to get rid of it too. This
>> stage happens right before the checkpoint. Will Spark try to re-run it in
>> case task failure AFTER checkpoint?
>> My expectation is that spark will read directly from checkpoint dir, It
>> doesn't have to do anything with previous MapPartitionsRDD[7] at map at
>>  Job.scala:112
>>
>> ср, 19 авг. 2020 г. в 16:01, Russell Spitzer <russell.spit...@gmail.com>:
>>
>>> Checkpoint is lazy and needs an action to actually do the work. The
>>> method just marks the rdd as noted in the doc you posted.
>>>
>>> Call an action twice. The second run should use the checkpoint.
>>>
>>>
>>>
>>> On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <capacyt...@gmail.com> wrote:
>>>
>>>> i think it returns Unit... it won't work
>>>> [image: image.png]
>>>>
>>>> I found another way to make it work. Called action after checkpoint
>>>> val recordsRDD = convertToRecords(anotherRDD)
>>>>     recordsRDD.checkpoint()
>>>>     logger.info("checkpoint done")
>>>>     recordsRDD.count() // (!!!)
>>>>     logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>>>     logger.info(s"recordsRDD.toDebugString:
>>>> \n${recordsRDD.toDebugString}")
>>>>
>>>>     Output:
>>>>     Job$ - checkpoint done (!!!)
>>>>
>>>>     Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>>>>     Job$ - recordsRDD.toDebugString:
>>>>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>>
>>>> But still it has single MapPartitionsRDD in lineage. Lineage became
>>>> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
>>>> want it to take data directly from checkpoint dir.
>>>> MapPartitionsRDD has non-idempotent id generation. i don't want to call
>>>> it twice in case of downstream task failure
>>>>
>>>>
>>>>
>>>>
>>>> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <abebopare...@gmail.com>:
>>>>
>>>>> Hi Ivan,
>>>>>
>>>>> Unlike cache/persist, checkpoint does not operate in-place but
>>>>> requires the result to be assigned to a new variable. In your case:
>>>>>
>>>>> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>>>>>
>>>>> Best,
>>>>> Jacob
>>>>>
>>>>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <capacyt...@gmail.com
>>>>> >:
>>>>>
>>>>>> Hi!
>>>>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>>>>>> checkpointed...
>>>>>> What do I do wrong?
>>>>>>
>>>>>> val recordsRDD = convertToRecords(anotherRDD)
>>>>>> recordsRDD.checkpoint()
>>>>>> logger.info("checkpoint done")
>>>>>>
>>>>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>>>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>>>>> logger.info(s"recordsRDD.toDebugString:
>>>>>> \n${recordsRDD.toDebugString}")
>>>>>>
>>>>>> Output:
>>>>>> Job$ - checkpoint done (!!!)
>>>>>>
>>>>>> But then.....
>>>>>> Job$ - isCheckpointed? false, getCheckpointFile: None
>>>>>> Job$ - recordsRDD.toDebugString:
>>>>>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>>>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>>>>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>>>>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>>>>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>>>>>     |  MapPartitionsRDD[2] at map at ...:66 []
>>>>>>
>>>>>

Reply via email to