I think the right way to look at this is the batchId is just a proxy for offsets that is agnostic to what type of source you are reading from (or how many sources their are). We might call into a custom sink with the same batchId more than once, but it will always contain the same data (there is no race condition, since this is stored in a write-ahead log). As long as you check/commit the batch id in the same transaction as the data you will get exactly once.
On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm.naume...@gmail.com> wrote: > Thanks, I see. > > However, I guess reading from checkpoint directory might be less efficient > comparing just preserving offsets in Dataset. > > I have one more question about operation idempotence (hope it help others > to have a clear picture). > > If I read offsets on re-start from RDBMS and manually specify starting > offsets on Kafka Source, is it still possible that in case of any failure I > got a situation where the duplicate batch id will go to a Custom Sink? > > Previously on DStream, you will just read offsets from storage on start > and just write them into DB in one transaction with data and it's was > enough for "exactly-once". Please, correct me if I made a mistake here. So > does the same strategy will work with Structured Streaming? > > I guess, that in case of Structured Streaming, Spark will commit batch > offset to a checkpoint directory and there can be a race condition where > you can commit your data with offsets into DB, but Spark will fail to > commit the batch id, and some kind of automatic retry happen. If this is > true, is it possible to disable this automatic re-try, so I can still use > unified API for batch/streaming with my own re-try logic (which is > basically, just ignore intermediate data, re-read from Kafka and re-try > processing and load)? > > Dmitry > > > 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mich...@databricks.com>: > >> In the checkpoint directory there is a file /offsets/$batchId that holds >> the offsets serialized as JSON. I would not consider this a public stable >> API though. >> >> Really the only important thing to get exactly once is that you must >> ensure whatever operation you are doing downstream is idempotent with >> respect to the batchId. For example, if you are writing to an RDBMS you >> could have a table that records the batch ID and update that in the same >> transaction as you append the results of the batch. Before trying to >> append you should check that batch ID and make sure you have not already >> committed. >> >> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <dm.naume...@gmail.com> >> wrote: >> >>> Thanks for response, Michael >>> >>> > You should still be able to get exactly once processing by using the >>> > batchId >>> that is passed to the Sink. >>> >>> Could you explain this in more detail, please? Is there some kind of >>> offset manager API that works as get-offset by batch id lookup table? >>> >>> Dmitry >>> >>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mich...@databricks.com>: >>> >>>> I think that we are going to have to change the Sink API as part of >>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>, >>>> which is why I linked these tickets together. I'm still targeting an >>>> initial version for Spark 2.3 which should happen sometime towards the end >>>> of the year. >>>> >>>> There are some misconceptions in that stack overflow answer that I can >>>> correct. Until we improve the Source API, You should still be able to get >>>> exactly once processing by using the batchId that is passed to the Sink. >>>> We guarantee that the offsets present at any given batch ID will be the >>>> same across retries by recording this information in the checkpoint's WAL. >>>> The checkpoint does not use java serialization (like DStreams does) and can >>>> be used even after upgrading Spark. >>>> >>>> >>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko < >>>> dm.naume...@gmail.com> wrote: >>>> >>>>> Thanks, Cody >>>>> >>>>> Unfortunately, it seems to be there is no active development right >>>>> now. Maybe I can step in and help with it somehow? >>>>> >>>>> Dmitry >>>>> >>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <c...@koeninger.org>: >>>>> >>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258 >>>>>> >>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko < >>>>>> dm.naume...@gmail.com> wrote: >>>>>> > Hi all, >>>>>> > >>>>>> > It started as a discussion in >>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk >>>>>> a-offsets-with-spark-structured-streaming-api. >>>>>> > >>>>>> > So the problem that there is no support in Public API to obtain the >>>>>> Kafka >>>>>> > (or Kineses) offsets. For example, if you want to save offsets in >>>>>> external >>>>>> > storage in Custom Sink, you should : >>>>>> > 1) preserve topic, partition and offset across all transform >>>>>> operations of >>>>>> > Dataset (based on hard-coded Kafka schema) >>>>>> > 2) make a manual group by partition/offset with aggregate max offset >>>>>> > >>>>>> > Structured Streaming doc says "Every streaming source is assumed to >>>>>> have >>>>>> > offsets", so why it's not a part of Public API? What do you think >>>>>> about >>>>>> > supporting it? >>>>>> > >>>>>> > Dmitry >>>>>> >>>>> >>>>> >>>> >>> >> >