In the checkpoint directory there is a file /offsets/$batchId that holds
the offsets serialized as JSON.  I would not consider this a public stable
API though.

Really the only important thing to get exactly once is that you must ensure
whatever operation you are doing downstream is idempotent with respect to
the batchId.  For example, if you are writing to an RDBMS you could have a
table that records the batch ID and update that in the same transaction as
you append the results of the batch.  Before trying to append you should
check that batch ID and make sure you have not already committed.

On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <dm.naume...@gmail.com>
wrote:

> Thanks for response, Michael
>
> >  You should still be able to get exactly once processing by using the 
> > batchId
> that is passed to the Sink.
>
> Could you explain this in more detail, please? Is there some kind of
> offset manager API that works as get-offset by batch id lookup table?
>
> Dmitry
>
> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>
>> I think that we are going to have to change the Sink API as part of
>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>> which is why I linked these tickets together.  I'm still targeting an
>> initial version for Spark 2.3 which should happen sometime towards the end
>> of the year.
>>
>> There are some misconceptions in that stack overflow answer that I can
>> correct.  Until we improve the Source API, You should still be able to get
>> exactly once processing by using the batchId that is passed to the Sink.
>> We guarantee that the offsets present at any given batch ID will be the
>> same across retries by recording this information in the checkpoint's WAL.
>> The checkpoint does not use java serialization (like DStreams does) and can
>> be used even after upgrading Spark.
>>
>>
>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <dm.naume...@gmail.com>
>> wrote:
>>
>>> Thanks, Cody
>>>
>>> Unfortunately, it seems to be there is no active development right now.
>>> Maybe I can step in and help with it somehow?
>>>
>>> Dmitry
>>>
>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <c...@koeninger.org>:
>>>
>>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>>
>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <dm.naume...@gmail.com>
>>>> wrote:
>>>> > Hi all,
>>>> >
>>>> > It started as a discussion in
>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>>> a-offsets-with-spark-structured-streaming-api.
>>>> >
>>>> > So the problem that there is no support in Public API to obtain the
>>>> Kafka
>>>> > (or Kineses) offsets. For example, if you want to save offsets in
>>>> external
>>>> > storage in Custom Sink, you should :
>>>> > 1) preserve topic, partition and offset across all transform
>>>> operations of
>>>> > Dataset (based on hard-coded Kafka schema)
>>>> > 2) make a manual group by partition/offset with aggregate max offset
>>>> >
>>>> > Structured Streaming doc says "Every streaming source is assumed to
>>>> have
>>>> > offsets", so why it's not a part of Public API? What do you think
>>>> about
>>>> > supporting it?
>>>> >
>>>> > Dmitry
>>>>
>>>
>>>
>>
>

Reply via email to