Ok. So since I can get repeated batch ids, I guess I can just store the last committed batch id in my storage (in the same transaction with the data) and initialize the custom sink with right batch id when application re-starts. After this just ignore batch if current batchId <= latestBatchId.
Dmitry 2017-09-13 22:12 GMT+03:00 Michael Armbrust <[email protected]>: > I think the right way to look at this is the batchId is just a proxy for > offsets that is agnostic to what type of source you are reading from (or > how many sources their are). We might call into a custom sink with the > same batchId more than once, but it will always contain the same data > (there is no race condition, since this is stored in a write-ahead log). > As long as you check/commit the batch id in the same transaction as the > data you will get exactly once. > > On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <[email protected]> > wrote: > >> Thanks, I see. >> >> However, I guess reading from checkpoint directory might be less >> efficient comparing just preserving offsets in Dataset. >> >> I have one more question about operation idempotence (hope it help others >> to have a clear picture). >> >> If I read offsets on re-start from RDBMS and manually specify starting >> offsets on Kafka Source, is it still possible that in case of any failure I >> got a situation where the duplicate batch id will go to a Custom Sink? >> >> Previously on DStream, you will just read offsets from storage on start >> and just write them into DB in one transaction with data and it's was >> enough for "exactly-once". Please, correct me if I made a mistake here. So >> does the same strategy will work with Structured Streaming? >> >> I guess, that in case of Structured Streaming, Spark will commit batch >> offset to a checkpoint directory and there can be a race condition where >> you can commit your data with offsets into DB, but Spark will fail to >> commit the batch id, and some kind of automatic retry happen. If this is >> true, is it possible to disable this automatic re-try, so I can still use >> unified API for batch/streaming with my own re-try logic (which is >> basically, just ignore intermediate data, re-read from Kafka and re-try >> processing and load)? >> >> Dmitry >> >> >> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <[email protected]>: >> >>> In the checkpoint directory there is a file /offsets/$batchId that holds >>> the offsets serialized as JSON. I would not consider this a public stable >>> API though. >>> >>> Really the only important thing to get exactly once is that you must >>> ensure whatever operation you are doing downstream is idempotent with >>> respect to the batchId. For example, if you are writing to an RDBMS you >>> could have a table that records the batch ID and update that in the same >>> transaction as you append the results of the batch. Before trying to >>> append you should check that batch ID and make sure you have not already >>> committed. >>> >>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <[email protected] >>> > wrote: >>> >>>> Thanks for response, Michael >>>> >>>> > You should still be able to get exactly once processing by using >>>> the batchId that is passed to the Sink. >>>> >>>> Could you explain this in more detail, please? Is there some kind of >>>> offset manager API that works as get-offset by batch id lookup table? >>>> >>>> Dmitry >>>> >>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <[email protected]>: >>>> >>>>> I think that we are going to have to change the Sink API as part of >>>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>, >>>>> which is why I linked these tickets together. I'm still targeting an >>>>> initial version for Spark 2.3 which should happen sometime towards the end >>>>> of the year. >>>>> >>>>> There are some misconceptions in that stack overflow answer that I can >>>>> correct. Until we improve the Source API, You should still be able to get >>>>> exactly once processing by using the batchId that is passed to the >>>>> Sink. We guarantee that the offsets present at any given batch ID >>>>> will be the same across retries by recording this information in the >>>>> checkpoint's WAL. The checkpoint does not use java serialization (like >>>>> DStreams does) and can be used even after upgrading Spark. >>>>> >>>>> >>>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko < >>>>> [email protected]> wrote: >>>>> >>>>>> Thanks, Cody >>>>>> >>>>>> Unfortunately, it seems to be there is no active development right >>>>>> now. Maybe I can step in and help with it somehow? >>>>>> >>>>>> Dmitry >>>>>> >>>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <[email protected]>: >>>>>> >>>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258 >>>>>>> >>>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko < >>>>>>> [email protected]> wrote: >>>>>>> > Hi all, >>>>>>> > >>>>>>> > It started as a discussion in >>>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk >>>>>>> a-offsets-with-spark-structured-streaming-api. >>>>>>> > >>>>>>> > So the problem that there is no support in Public API to obtain >>>>>>> the Kafka >>>>>>> > (or Kineses) offsets. For example, if you want to save offsets in >>>>>>> external >>>>>>> > storage in Custom Sink, you should : >>>>>>> > 1) preserve topic, partition and offset across all transform >>>>>>> operations of >>>>>>> > Dataset (based on hard-coded Kafka schema) >>>>>>> > 2) make a manual group by partition/offset with aggregate max >>>>>>> offset >>>>>>> > >>>>>>> > Structured Streaming doc says "Every streaming source is assumed >>>>>>> to have >>>>>>> > offsets", so why it's not a part of Public API? What do you think >>>>>>> about >>>>>>> > supporting it? >>>>>>> > >>>>>>> > Dmitry >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >
