I think the right way to look at this is the batchId is just a proxy for
offsets that is agnostic to what type of source you are reading from (or
how many sources their are).  We might call into a custom sink with the
same batchId more than once, but it will always contain the same data
(there is no race condition, since this is stored in a write-ahead log).
As long as you check/commit the batch id in the same transaction as the
data you will get exactly once.

On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm.naume...@gmail.com>
wrote:

> Thanks, I see.
>
> However, I guess reading from checkpoint directory might be less efficient
> comparing just preserving offsets in Dataset.
>
> I have one more question about operation idempotence (hope it help others
> to have a clear picture).
>
> If I read offsets on re-start from RDBMS and manually specify starting
> offsets on Kafka Source, is it still possible that in case of any failure I
> got a situation where the duplicate batch id will go to a Custom Sink?
>
> Previously on DStream, you will just read offsets from storage on start
> and just write them into DB in one transaction with data and it's was
> enough for "exactly-once". Please, correct me if I made a mistake here. So
> does the same strategy will work with Structured Streaming?
>
> I guess, that in case of Structured Streaming, Spark will commit batch
> offset to a checkpoint directory and there can be a race condition where
> you can commit your data with offsets into DB, but Spark will fail to
> commit the batch id, and some kind of automatic retry happen. If this is
> true, is it possible to disable this automatic re-try, so I can still use
> unified API for batch/streaming with my own re-try logic (which is
> basically, just ignore intermediate data, re-read from Kafka and re-try
> processing and load)?
>
> Dmitry
>
>
> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>
>> In the checkpoint directory there is a file /offsets/$batchId that holds
>> the offsets serialized as JSON.  I would not consider this a public stable
>> API though.
>>
>> Really the only important thing to get exactly once is that you must
>> ensure whatever operation you are doing downstream is idempotent with
>> respect to the batchId.  For example, if you are writing to an RDBMS you
>> could have a table that records the batch ID and update that in the same
>> transaction as you append the results of the batch.  Before trying to
>> append you should check that batch ID and make sure you have not already
>> committed.
>>
>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <dm.naume...@gmail.com>
>> wrote:
>>
>>> Thanks for response, Michael
>>>
>>> >  You should still be able to get exactly once processing by using the 
>>> > batchId
>>> that is passed to the Sink.
>>>
>>> Could you explain this in more detail, please? Is there some kind of
>>> offset manager API that works as get-offset by batch id lookup table?
>>>
>>> Dmitry
>>>
>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>>>
>>>> I think that we are going to have to change the Sink API as part of
>>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>>>> which is why I linked these tickets together.  I'm still targeting an
>>>> initial version for Spark 2.3 which should happen sometime towards the end
>>>> of the year.
>>>>
>>>> There are some misconceptions in that stack overflow answer that I can
>>>> correct.  Until we improve the Source API, You should still be able to get
>>>> exactly once processing by using the batchId that is passed to the Sink.
>>>> We guarantee that the offsets present at any given batch ID will be the
>>>> same across retries by recording this information in the checkpoint's WAL.
>>>> The checkpoint does not use java serialization (like DStreams does) and can
>>>> be used even after upgrading Spark.
>>>>
>>>>
>>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <
>>>> dm.naume...@gmail.com> wrote:
>>>>
>>>>> Thanks, Cody
>>>>>
>>>>> Unfortunately, it seems to be there is no active development right
>>>>> now. Maybe I can step in and help with it somehow?
>>>>>
>>>>> Dmitry
>>>>>
>>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <c...@koeninger.org>:
>>>>>
>>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>>>>
>>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <
>>>>>> dm.naume...@gmail.com> wrote:
>>>>>> > Hi all,
>>>>>> >
>>>>>> > It started as a discussion in
>>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>>>>> a-offsets-with-spark-structured-streaming-api.
>>>>>> >
>>>>>> > So the problem that there is no support in Public API to obtain the
>>>>>> Kafka
>>>>>> > (or Kineses) offsets. For example, if you want to save offsets in
>>>>>> external
>>>>>> > storage in Custom Sink, you should :
>>>>>> > 1) preserve topic, partition and offset across all transform
>>>>>> operations of
>>>>>> > Dataset (based on hard-coded Kafka schema)
>>>>>> > 2) make a manual group by partition/offset with aggregate max offset
>>>>>> >
>>>>>> > Structured Streaming doc says "Every streaming source is assumed to
>>>>>> have
>>>>>> > offsets", so why it's not a part of Public API? What do you think
>>>>>> about
>>>>>> > supporting it?
>>>>>> >
>>>>>> > Dmitry
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to