Thought to update this thread. Figured out my issue with forEachBatch and structured streaming, I had an issue where I did a count() before write() so my streaming query branched into 2. I am now using Trigger and structured streaming to handle checkpointing instead of doing it myself. Thanks all for your help!
On Wed, Feb 5, 2020 at 7:07 PM Ruijing Li <liruijin...@gmail.com> wrote: > Looks like I’m wrong, since I tried that exact snippet and it worked > > So to be clear, in the part where I do batchDF.write.parquet, that is not > the exact code I’m using. > > I’m using a custom write function that does similar to write.parquet but > has some added functionality. Somehow my custom write function isn’t > working correctly > > Is batchDF a static dataframe though? > > Thanks > > On Wed, Feb 5, 2020 at 6:13 PM Ruijing Li <liruijin...@gmail.com> wrote: > >> Hi all, >> >> I tried with forEachBatch but got an error. Is this expected? >> >> Code is >> >> df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) => >> batchDF.write.parquet(hdfsPath) >> } >> .option(“checkPointLocation”, anotherHdfsPath) >> .start() >> >> Exception is: Queries with streaming sources must be executed with >> writeStream.start() >> >> But I thought forEachBatch would treat the batchDF as a static dataframe? >> >> Thanks, >> RJ >> >> On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta < >> gourav.sengu...@gmail.com> wrote: >> >>> Hi Burak, >>> >>> I am not quite used to streaming, but was almost thinking on the same >>> lines :) makes a lot of sense to me now. >>> >>> Regards, >>> Gourav >>> >>> On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <brk...@gmail.com> wrote: >>> >>>> Do you really want to build all of that and open yourself to bugs when >>>> you can just use foreachBatch? Here are your options: >>>> >>>> 1. Build it yourself >>>> >>>> // Read offsets from some store >>>> prevOffsets = readOffsets() >>>> latestOffsets = getOffsets() >>>> >>>> df = spark.read.format("kafka").option("startOffsets", >>>> prevOffsets).option("endOffsets", latestOffsets).load() >>>> batchLogic(df) >>>> >>>> saveOffsets(latestOffsets) >>>> >>>> 2. Structured Streaming + Trigger.Once + foreachBatch >>>> >>>> spark.readStream.format("kafka").load().writeStream.foreachBatch((df, >>>> batchId) => batchLogic(df)).trigger("once").start() >>>> >>>> With Option (1), you're going to have to (re)solve: >>>> a) Tracking and consistency of offsets >>>> b) Potential topic partition mismatches >>>> c) Offsets that may have aged out due to retention >>>> d) Re-execution of jobs and data consistency. What if your job fails >>>> as you're committing the offsets in the end, but the data was already >>>> stored? Will your getOffsets method return the same offsets? >>>> >>>> I'd rather not solve problems that other people have solved for me, but >>>> ultimately the decision is yours to make. >>>> >>>> Best, >>>> Burak >>>> >>>> >>>> >>>> >>>> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <liruijin...@gmail.com> >>>> wrote: >>>> >>>>> Thanks Anil, I think that’s the approach I will take. >>>>> >>>>> Hi Burak, >>>>> >>>>> That was a possibility to think about, but my team has custom >>>>> dataframe writer functions we would like to use, unfortunately they were >>>>> written for static dataframes in mind. I do see there is a ForEachBatch >>>>> write mode but my thinking was at that point it was easier to read from >>>>> kafka through batch mode. >>>>> >>>>> Thanks, >>>>> RJ >>>>> >>>>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <brk...@gmail.com> wrote: >>>>> >>>>>> Hi Ruijing, >>>>>> >>>>>> Why do you not want to use structured streaming here? This is exactly >>>>>> why structured streaming + Trigger.Once was built, just so that you don't >>>>>> build that solution yourself. >>>>>> You also get exactly once semantics if you use the built in sinks. >>>>>> >>>>>> Best, >>>>>> Burak >>>>>> >>>>>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <anil...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Ruijing, >>>>>>> >>>>>>> We did the below things to read Kafka in batch from spark: >>>>>>> >>>>>>> 1) Maintain the start offset (could be db, file etc) >>>>>>> 2) Get the end offset dynamically when the job executes. >>>>>>> 3) Pass the start and end offsets >>>>>>> 4) Overwrite the start offset with the end offset. (Should be done >>>>>>> post processing the data) >>>>>>> >>>>>>> Currently to make it work in batch mode, you need to maintain the >>>>>>> state information of the offsets externally. >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> Anil >>>>>>> >>>>>>> -Sent from my mobile >>>>>>> http://anilkulkarni.com/ >>>>>>> >>>>>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <liruijin...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> My use case is to read from single kafka topic using a batch spark >>>>>>>> sql job (not structured streaming ideally). I want this batch job every >>>>>>>> time it starts to get the last offset it stopped at, and start reading >>>>>>>> from >>>>>>>> there until it caught up to the latest offset, store the result and >>>>>>>> stop >>>>>>>> the job. Given the dataframe has a partition and offset column, my >>>>>>>> first >>>>>>>> thought for offset management is to groupBy partition and agg the max >>>>>>>> offset, then store it in HDFS. Next time the job runs, it will read and >>>>>>>> start from this max offset using startingOffsets >>>>>>>> >>>>>>>> However, I was wondering if this will work. If the kafka producer >>>>>>>> failed an offset and later decides to resend it, I will have skipped it >>>>>>>> since I’m starting from the max offset sent. How does spark structured >>>>>>>> streaming know to continue onwards - does it keep a state of all >>>>>>>> offsets >>>>>>>> seen? If so, how can I replicate this for batch without missing data? >>>>>>>> Any >>>>>>>> help would be appreciated. >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Cheers, >>>>>>>> Ruijing Li >>>>>>>> >>>>>>> -- >>>>> Cheers, >>>>> Ruijing Li >>>>> >>>> -- >> Cheers, >> Ruijing Li >> > -- > Cheers, > Ruijing Li > -- Cheers, Ruijing Li