Hi Burak, I am not quite used to streaming, but was almost thinking on the same lines :) makes a lot of sense to me now.
Regards, Gourav On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <brk...@gmail.com> wrote: > Do you really want to build all of that and open yourself to bugs when you > can just use foreachBatch? Here are your options: > > 1. Build it yourself > > // Read offsets from some store > prevOffsets = readOffsets() > latestOffsets = getOffsets() > > df = spark.read.format("kafka").option("startOffsets", > prevOffsets).option("endOffsets", latestOffsets).load() > batchLogic(df) > > saveOffsets(latestOffsets) > > 2. Structured Streaming + Trigger.Once + foreachBatch > > spark.readStream.format("kafka").load().writeStream.foreachBatch((df, > batchId) => batchLogic(df)).trigger("once").start() > > With Option (1), you're going to have to (re)solve: > a) Tracking and consistency of offsets > b) Potential topic partition mismatches > c) Offsets that may have aged out due to retention > d) Re-execution of jobs and data consistency. What if your job fails as > you're committing the offsets in the end, but the data was already stored? > Will your getOffsets method return the same offsets? > > I'd rather not solve problems that other people have solved for me, but > ultimately the decision is yours to make. > > Best, > Burak > > > > > On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <liruijin...@gmail.com> wrote: > >> Thanks Anil, I think that’s the approach I will take. >> >> Hi Burak, >> >> That was a possibility to think about, but my team has custom dataframe >> writer functions we would like to use, unfortunately they were written for >> static dataframes in mind. I do see there is a ForEachBatch write mode but >> my thinking was at that point it was easier to read from kafka through >> batch mode. >> >> Thanks, >> RJ >> >> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <brk...@gmail.com> wrote: >> >>> Hi Ruijing, >>> >>> Why do you not want to use structured streaming here? This is exactly >>> why structured streaming + Trigger.Once was built, just so that you don't >>> build that solution yourself. >>> You also get exactly once semantics if you use the built in sinks. >>> >>> Best, >>> Burak >>> >>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <anil...@gmail.com> wrote: >>> >>>> Hi Ruijing, >>>> >>>> We did the below things to read Kafka in batch from spark: >>>> >>>> 1) Maintain the start offset (could be db, file etc) >>>> 2) Get the end offset dynamically when the job executes. >>>> 3) Pass the start and end offsets >>>> 4) Overwrite the start offset with the end offset. (Should be done post >>>> processing the data) >>>> >>>> Currently to make it work in batch mode, you need to maintain the state >>>> information of the offsets externally. >>>> >>>> >>>> Thanks >>>> Anil >>>> >>>> -Sent from my mobile >>>> http://anilkulkarni.com/ >>>> >>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <liruijin...@gmail.com> wrote: >>>> >>>>> Hi all, >>>>> >>>>> My use case is to read from single kafka topic using a batch spark sql >>>>> job (not structured streaming ideally). I want this batch job every time >>>>> it >>>>> starts to get the last offset it stopped at, and start reading from there >>>>> until it caught up to the latest offset, store the result and stop the >>>>> job. >>>>> Given the dataframe has a partition and offset column, my first thought >>>>> for >>>>> offset management is to groupBy partition and agg the max offset, then >>>>> store it in HDFS. Next time the job runs, it will read and start from this >>>>> max offset using startingOffsets >>>>> >>>>> However, I was wondering if this will work. If the kafka producer >>>>> failed an offset and later decides to resend it, I will have skipped it >>>>> since I’m starting from the max offset sent. How does spark structured >>>>> streaming know to continue onwards - does it keep a state of all offsets >>>>> seen? If so, how can I replicate this for batch without missing data? Any >>>>> help would be appreciated. >>>>> >>>>> >>>>> -- >>>>> Cheers, >>>>> Ruijing Li >>>>> >>>> -- >> Cheers, >> Ruijing Li >> >