Yep, that is correct.  You can also use the query ID which is a GUID that
is stored in the checkpoint and preserved across restarts if you want to
distinguish the batches from different streams.

sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)

This was added recently
<https://github.com/apache/spark/commit/2d968a07d211688a9c588deb859667dd8b653b27>
though.

On Thu, Sep 14, 2017 at 3:40 AM, Dmitry Naumenko <dm.naume...@gmail.com>
wrote:

> Ok. So since I can get repeated batch ids, I guess I can just store the
> last committed batch id in my storage (in the same transaction with the
> data) and initialize the custom sink with right batch id when application
> re-starts. After this just ignore batch if current batchId <=
> latestBatchId.
>
> Dmitry
>
>
> 2017-09-13 22:12 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>
>> I think the right way to look at this is the batchId is just a proxy for
>> offsets that is agnostic to what type of source you are reading from (or
>> how many sources their are).  We might call into a custom sink with the
>> same batchId more than once, but it will always contain the same data
>> (there is no race condition, since this is stored in a write-ahead log).
>> As long as you check/commit the batch id in the same transaction as the
>> data you will get exactly once.
>>
>> On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm.naume...@gmail.com>
>> wrote:
>>
>>> Thanks, I see.
>>>
>>> However, I guess reading from checkpoint directory might be less
>>> efficient comparing just preserving offsets in Dataset.
>>>
>>> I have one more question about operation idempotence (hope it help
>>> others to have a clear picture).
>>>
>>> If I read offsets on re-start from RDBMS and manually specify starting
>>> offsets on Kafka Source, is it still possible that in case of any failure I
>>> got a situation where the duplicate batch id will go to a Custom Sink?
>>>
>>> Previously on DStream, you will just read offsets from storage on start
>>> and just write them into DB in one transaction with data and it's was
>>> enough for "exactly-once". Please, correct me if I made a mistake here. So
>>> does the same strategy will work with Structured Streaming?
>>>
>>> I guess, that in case of Structured Streaming, Spark will commit batch
>>> offset to a checkpoint directory and there can be a race condition where
>>> you can commit your data with offsets into DB, but Spark will fail to
>>> commit the batch id, and some kind of automatic retry happen. If this is
>>> true, is it possible to disable this automatic re-try, so I can still use
>>> unified API for batch/streaming with my own re-try logic (which is
>>> basically, just ignore intermediate data, re-read from Kafka and re-try
>>> processing and load)?
>>>
>>> Dmitry
>>>
>>>
>>> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>>>
>>>> In the checkpoint directory there is a file /offsets/$batchId that
>>>> holds the offsets serialized as JSON.  I would not consider this a public
>>>> stable API though.
>>>>
>>>> Really the only important thing to get exactly once is that you must
>>>> ensure whatever operation you are doing downstream is idempotent with
>>>> respect to the batchId.  For example, if you are writing to an RDBMS you
>>>> could have a table that records the batch ID and update that in the same
>>>> transaction as you append the results of the batch.  Before trying to
>>>> append you should check that batch ID and make sure you have not already
>>>> committed.
>>>>
>>>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <
>>>> dm.naume...@gmail.com> wrote:
>>>>
>>>>> Thanks for response, Michael
>>>>>
>>>>> >  You should still be able to get exactly once processing by using
>>>>> the batchId that is passed to the Sink.
>>>>>
>>>>> Could you explain this in more detail, please? Is there some kind of
>>>>> offset manager API that works as get-offset by batch id lookup table?
>>>>>
>>>>> Dmitry
>>>>>
>>>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>>>>>
>>>>>> I think that we are going to have to change the Sink API as part of
>>>>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>>>>>> which is why I linked these tickets together.  I'm still targeting an
>>>>>> initial version for Spark 2.3 which should happen sometime towards the 
>>>>>> end
>>>>>> of the year.
>>>>>>
>>>>>> There are some misconceptions in that stack overflow answer that I
>>>>>> can correct.  Until we improve the Source API, You should still be able 
>>>>>> to
>>>>>> get exactly once processing by using the batchId that is passed to
>>>>>> the Sink. We guarantee that the offsets present at any given batch
>>>>>> ID will be the same across retries by recording this information in the
>>>>>> checkpoint's WAL. The checkpoint does not use java serialization (like
>>>>>> DStreams does) and can be used even after upgrading Spark.
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <
>>>>>> dm.naume...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks, Cody
>>>>>>>
>>>>>>> Unfortunately, it seems to be there is no active development right
>>>>>>> now. Maybe I can step in and help with it somehow?
>>>>>>>
>>>>>>> Dmitry
>>>>>>>
>>>>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <c...@koeninger.org>:
>>>>>>>
>>>>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>>>>>>
>>>>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <
>>>>>>>> dm.naume...@gmail.com> wrote:
>>>>>>>> > Hi all,
>>>>>>>> >
>>>>>>>> > It started as a discussion in
>>>>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>>>>>>> a-offsets-with-spark-structured-streaming-api.
>>>>>>>> >
>>>>>>>> > So the problem that there is no support in Public API to obtain
>>>>>>>> the Kafka
>>>>>>>> > (or Kineses) offsets. For example, if you want to save offsets in
>>>>>>>> external
>>>>>>>> > storage in Custom Sink, you should :
>>>>>>>> > 1) preserve topic, partition and offset across all transform
>>>>>>>> operations of
>>>>>>>> > Dataset (based on hard-coded Kafka schema)
>>>>>>>> > 2) make a manual group by partition/offset with aggregate max
>>>>>>>> offset
>>>>>>>> >
>>>>>>>> > Structured Streaming doc says "Every streaming source is assumed
>>>>>>>> to have
>>>>>>>> > offsets", so why it's not a part of Public API? What do you think
>>>>>>>> about
>>>>>>>> > supporting it?
>>>>>>>> >
>>>>>>>> > Dmitry
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to