Yep, that is correct. You can also use the query ID which is a GUID that is stored in the checkpoint and preserved across restarts if you want to distinguish the batches from different streams.
sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY) This was added recently <https://github.com/apache/spark/commit/2d968a07d211688a9c588deb859667dd8b653b27> though. On Thu, Sep 14, 2017 at 3:40 AM, Dmitry Naumenko <dm.naume...@gmail.com> wrote: > Ok. So since I can get repeated batch ids, I guess I can just store the > last committed batch id in my storage (in the same transaction with the > data) and initialize the custom sink with right batch id when application > re-starts. After this just ignore batch if current batchId <= > latestBatchId. > > Dmitry > > > 2017-09-13 22:12 GMT+03:00 Michael Armbrust <mich...@databricks.com>: > >> I think the right way to look at this is the batchId is just a proxy for >> offsets that is agnostic to what type of source you are reading from (or >> how many sources their are). We might call into a custom sink with the >> same batchId more than once, but it will always contain the same data >> (there is no race condition, since this is stored in a write-ahead log). >> As long as you check/commit the batch id in the same transaction as the >> data you will get exactly once. >> >> On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm.naume...@gmail.com> >> wrote: >> >>> Thanks, I see. >>> >>> However, I guess reading from checkpoint directory might be less >>> efficient comparing just preserving offsets in Dataset. >>> >>> I have one more question about operation idempotence (hope it help >>> others to have a clear picture). >>> >>> If I read offsets on re-start from RDBMS and manually specify starting >>> offsets on Kafka Source, is it still possible that in case of any failure I >>> got a situation where the duplicate batch id will go to a Custom Sink? >>> >>> Previously on DStream, you will just read offsets from storage on start >>> and just write them into DB in one transaction with data and it's was >>> enough for "exactly-once". Please, correct me if I made a mistake here. So >>> does the same strategy will work with Structured Streaming? >>> >>> I guess, that in case of Structured Streaming, Spark will commit batch >>> offset to a checkpoint directory and there can be a race condition where >>> you can commit your data with offsets into DB, but Spark will fail to >>> commit the batch id, and some kind of automatic retry happen. If this is >>> true, is it possible to disable this automatic re-try, so I can still use >>> unified API for batch/streaming with my own re-try logic (which is >>> basically, just ignore intermediate data, re-read from Kafka and re-try >>> processing and load)? >>> >>> Dmitry >>> >>> >>> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mich...@databricks.com>: >>> >>>> In the checkpoint directory there is a file /offsets/$batchId that >>>> holds the offsets serialized as JSON. I would not consider this a public >>>> stable API though. >>>> >>>> Really the only important thing to get exactly once is that you must >>>> ensure whatever operation you are doing downstream is idempotent with >>>> respect to the batchId. For example, if you are writing to an RDBMS you >>>> could have a table that records the batch ID and update that in the same >>>> transaction as you append the results of the batch. Before trying to >>>> append you should check that batch ID and make sure you have not already >>>> committed. >>>> >>>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko < >>>> dm.naume...@gmail.com> wrote: >>>> >>>>> Thanks for response, Michael >>>>> >>>>> > You should still be able to get exactly once processing by using >>>>> the batchId that is passed to the Sink. >>>>> >>>>> Could you explain this in more detail, please? Is there some kind of >>>>> offset manager API that works as get-offset by batch id lookup table? >>>>> >>>>> Dmitry >>>>> >>>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mich...@databricks.com>: >>>>> >>>>>> I think that we are going to have to change the Sink API as part of >>>>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>, >>>>>> which is why I linked these tickets together. I'm still targeting an >>>>>> initial version for Spark 2.3 which should happen sometime towards the >>>>>> end >>>>>> of the year. >>>>>> >>>>>> There are some misconceptions in that stack overflow answer that I >>>>>> can correct. Until we improve the Source API, You should still be able >>>>>> to >>>>>> get exactly once processing by using the batchId that is passed to >>>>>> the Sink. We guarantee that the offsets present at any given batch >>>>>> ID will be the same across retries by recording this information in the >>>>>> checkpoint's WAL. The checkpoint does not use java serialization (like >>>>>> DStreams does) and can be used even after upgrading Spark. >>>>>> >>>>>> >>>>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko < >>>>>> dm.naume...@gmail.com> wrote: >>>>>> >>>>>>> Thanks, Cody >>>>>>> >>>>>>> Unfortunately, it seems to be there is no active development right >>>>>>> now. Maybe I can step in and help with it somehow? >>>>>>> >>>>>>> Dmitry >>>>>>> >>>>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <c...@koeninger.org>: >>>>>>> >>>>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258 >>>>>>>> >>>>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko < >>>>>>>> dm.naume...@gmail.com> wrote: >>>>>>>> > Hi all, >>>>>>>> > >>>>>>>> > It started as a discussion in >>>>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk >>>>>>>> a-offsets-with-spark-structured-streaming-api. >>>>>>>> > >>>>>>>> > So the problem that there is no support in Public API to obtain >>>>>>>> the Kafka >>>>>>>> > (or Kineses) offsets. For example, if you want to save offsets in >>>>>>>> external >>>>>>>> > storage in Custom Sink, you should : >>>>>>>> > 1) preserve topic, partition and offset across all transform >>>>>>>> operations of >>>>>>>> > Dataset (based on hard-coded Kafka schema) >>>>>>>> > 2) make a manual group by partition/offset with aggregate max >>>>>>>> offset >>>>>>>> > >>>>>>>> > Structured Streaming doc says "Every streaming source is assumed >>>>>>>> to have >>>>>>>> > offsets", so why it's not a part of Public API? What do you think >>>>>>>> about >>>>>>>> > supporting it? >>>>>>>> > >>>>>>>> > Dmitry >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >