TD can correct me on this, but I believe checkpointing is done after a set
of jobs is submitted, not after they are completed.  If you fail while
processing the jobs, starting over from that checkpoint should put you in
the correct state.

In any case, are you actually observing a loss of messages when killing /
restarting a job?

On Wed, Sep 23, 2015 at 3:49 AM, Petr Novak <oss.mli...@gmail.com> wrote:

> Hi,
> I have 2 streams and checkpointing with code based on documentation. One
> stream is transforming data from Kafka and saves them to Parquet file. The
> other stream uses the same stream and does updateStateByKey to compute some
> aggregations. There is no gracefulShutdown.
>
> Both use about this code to save files:
>
> stream.foreachRDD { (rdd, time) =>
>   ...
>   rdd.toDF().write.save(...use time for the directory name...)
> }
>
> It is not idempotent at the moment but let's put this aside for now.
>
> The strange thing is that when I Ctrl+C the job I can see checkpoint file
> with timestamp for the last batch but there are no stream files/directories
> for this timestamp or only one of streams have data saved with time aligned
> with the last checkpoint file. I would expect that checkpoint file is
> created after both streams successfully finishes its saves and that it is
> created at the end of the batch. Otherwise I don't know for what
> checkpointing is good for except maybe cutting lineage. Is file saving
> asynchronous and Spark checkpointing does not care about it?
>
> I actually need to checkpoint both streams atomically at the end of the
> batch. It seems to me that Spark checkpoiting facility is quite unusable in
> practice except for some simple scenarios and everybody has to actually
> roll its own.
>
> Am I wrong? How can I use Spark checkpoiting to checkpoint both streams
> after they successfully save its results to files. It is actually the
> reason while I think that micro-batch streaming is nice because it has
> clearly defined synchronization barrier. But it doesn't seems like
> checkpointing takes an advantage of it.
>
> I can't ensure atomicity when saving more files for more streams and it
> would require some further cleanup code on job restart. But at least I
> would like to have a quarantee where existence of checkpoint file signals
> that batch with that timestamp finished successfully with all its RDD
> actions.
>
> Or it is expected to behave like this and I have something wrong with my
> code?
>
> Many thanks for any insights,
> Petr
>
>

Reply via email to