Ok. So since I can get repeated batch ids, I guess I can just store the
last committed batch id in my storage (in the same transaction with the
data) and initialize the custom sink with right batch id when application
re-starts. After this just ignore batch if current batchId <= latestBatchId.

Dmitry

2017-09-13 22:12 GMT+03:00 Michael Armbrust <mich...@databricks.com>:

> I think the right way to look at this is the batchId is just a proxy for
> offsets that is agnostic to what type of source you are reading from (or
> how many sources their are).  We might call into a custom sink with the
> same batchId more than once, but it will always contain the same data
> (there is no race condition, since this is stored in a write-ahead log).
> As long as you check/commit the batch id in the same transaction as the
> data you will get exactly once.
>
> On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm.naume...@gmail.com>
> wrote:
>
>> Thanks, I see.
>>
>> However, I guess reading from checkpoint directory might be less
>> efficient comparing just preserving offsets in Dataset.
>>
>> I have one more question about operation idempotence (hope it help others
>> to have a clear picture).
>>
>> If I read offsets on re-start from RDBMS and manually specify starting
>> offsets on Kafka Source, is it still possible that in case of any failure I
>> got a situation where the duplicate batch id will go to a Custom Sink?
>>
>> Previously on DStream, you will just read offsets from storage on start
>> and just write them into DB in one transaction with data and it's was
>> enough for "exactly-once". Please, correct me if I made a mistake here. So
>> does the same strategy will work with Structured Streaming?
>>
>> I guess, that in case of Structured Streaming, Spark will commit batch
>> offset to a checkpoint directory and there can be a race condition where
>> you can commit your data with offsets into DB, but Spark will fail to
>> commit the batch id, and some kind of automatic retry happen. If this is
>> true, is it possible to disable this automatic re-try, so I can still use
>> unified API for batch/streaming with my own re-try logic (which is
>> basically, just ignore intermediate data, re-read from Kafka and re-try
>> processing and load)?
>>
>> Dmitry
>>
>>
>> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>>
>>> In the checkpoint directory there is a file /offsets/$batchId that holds
>>> the offsets serialized as JSON.  I would not consider this a public stable
>>> API though.
>>>
>>> Really the only important thing to get exactly once is that you must
>>> ensure whatever operation you are doing downstream is idempotent with
>>> respect to the batchId.  For example, if you are writing to an RDBMS you
>>> could have a table that records the batch ID and update that in the same
>>> transaction as you append the results of the batch.  Before trying to
>>> append you should check that batch ID and make sure you have not already
>>> committed.
>>>
>>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <dm.naume...@gmail.com
>>> > wrote:
>>>
>>>> Thanks for response, Michael
>>>>
>>>> >  You should still be able to get exactly once processing by using
>>>> the batchId that is passed to the Sink.
>>>>
>>>> Could you explain this in more detail, please? Is there some kind of
>>>> offset manager API that works as get-offset by batch id lookup table?
>>>>
>>>> Dmitry
>>>>
>>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>>>>
>>>>> I think that we are going to have to change the Sink API as part of
>>>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>>>>> which is why I linked these tickets together.  I'm still targeting an
>>>>> initial version for Spark 2.3 which should happen sometime towards the end
>>>>> of the year.
>>>>>
>>>>> There are some misconceptions in that stack overflow answer that I can
>>>>> correct.  Until we improve the Source API, You should still be able to get
>>>>> exactly once processing by using the batchId that is passed to the
>>>>> Sink. We guarantee that the offsets present at any given batch ID
>>>>> will be the same across retries by recording this information in the
>>>>> checkpoint's WAL. The checkpoint does not use java serialization (like
>>>>> DStreams does) and can be used even after upgrading Spark.
>>>>>
>>>>>
>>>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <
>>>>> dm.naume...@gmail.com> wrote:
>>>>>
>>>>>> Thanks, Cody
>>>>>>
>>>>>> Unfortunately, it seems to be there is no active development right
>>>>>> now. Maybe I can step in and help with it somehow?
>>>>>>
>>>>>> Dmitry
>>>>>>
>>>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <c...@koeninger.org>:
>>>>>>
>>>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>>>>>
>>>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <
>>>>>>> dm.naume...@gmail.com> wrote:
>>>>>>> > Hi all,
>>>>>>> >
>>>>>>> > It started as a discussion in
>>>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>>>>>> a-offsets-with-spark-structured-streaming-api.
>>>>>>> >
>>>>>>> > So the problem that there is no support in Public API to obtain
>>>>>>> the Kafka
>>>>>>> > (or Kineses) offsets. For example, if you want to save offsets in
>>>>>>> external
>>>>>>> > storage in Custom Sink, you should :
>>>>>>> > 1) preserve topic, partition and offset across all transform
>>>>>>> operations of
>>>>>>> > Dataset (based on hard-coded Kafka schema)
>>>>>>> > 2) make a manual group by partition/offset with aggregate max
>>>>>>> offset
>>>>>>> >
>>>>>>> > Structured Streaming doc says "Every streaming source is assumed
>>>>>>> to have
>>>>>>> > offsets", so why it's not a part of Public API? What do you think
>>>>>>> about
>>>>>>> > supporting it?
>>>>>>> >
>>>>>>> > Dmitry
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to