Thanks Anil, I think that’s the approach I will take. Hi Burak,
That was a possibility to think about, but my team has custom dataframe writer functions we would like to use, unfortunately they were written for static dataframes in mind. I do see there is a ForEachBatch write mode but my thinking was at that point it was easier to read from kafka through batch mode. Thanks, RJ On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <brk...@gmail.com> wrote: > Hi Ruijing, > > Why do you not want to use structured streaming here? This is exactly why > structured streaming + Trigger.Once was built, just so that you don't build > that solution yourself. > You also get exactly once semantics if you use the built in sinks. > > Best, > Burak > > On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <anil...@gmail.com> wrote: > >> Hi Ruijing, >> >> We did the below things to read Kafka in batch from spark: >> >> 1) Maintain the start offset (could be db, file etc) >> 2) Get the end offset dynamically when the job executes. >> 3) Pass the start and end offsets >> 4) Overwrite the start offset with the end offset. (Should be done post >> processing the data) >> >> Currently to make it work in batch mode, you need to maintain the state >> information of the offsets externally. >> >> >> Thanks >> Anil >> >> -Sent from my mobile >> http://anilkulkarni.com/ >> >> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <liruijin...@gmail.com> wrote: >> >>> Hi all, >>> >>> My use case is to read from single kafka topic using a batch spark sql >>> job (not structured streaming ideally). I want this batch job every time it >>> starts to get the last offset it stopped at, and start reading from there >>> until it caught up to the latest offset, store the result and stop the job. >>> Given the dataframe has a partition and offset column, my first thought for >>> offset management is to groupBy partition and agg the max offset, then >>> store it in HDFS. Next time the job runs, it will read and start from this >>> max offset using startingOffsets >>> >>> However, I was wondering if this will work. If the kafka producer failed >>> an offset and later decides to resend it, I will have skipped it since I’m >>> starting from the max offset sent. How does spark structured streaming know >>> to continue onwards - does it keep a state of all offsets seen? If so, how >>> can I replicate this for batch without missing data? Any help would be >>> appreciated. >>> >>> >>> -- >>> Cheers, >>> Ruijing Li >>> >> -- Cheers, Ruijing Li