Hi all, I tried with forEachBatch but got an error. Is this expected?
Code is df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) => batchDF.write.parquet(hdfsPath) } .option(“checkPointLocation”, anotherHdfsPath) .start() Exception is: Queries with streaming sources must be executed with writeStream.start() But I thought forEachBatch would treat the batchDF as a static dataframe? Thanks, RJ On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi Burak, > > I am not quite used to streaming, but was almost thinking on the same > lines :) makes a lot of sense to me now. > > Regards, > Gourav > > On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <brk...@gmail.com> wrote: > >> Do you really want to build all of that and open yourself to bugs when >> you can just use foreachBatch? Here are your options: >> >> 1. Build it yourself >> >> // Read offsets from some store >> prevOffsets = readOffsets() >> latestOffsets = getOffsets() >> >> df = spark.read.format("kafka").option("startOffsets", >> prevOffsets).option("endOffsets", latestOffsets).load() >> batchLogic(df) >> >> saveOffsets(latestOffsets) >> >> 2. Structured Streaming + Trigger.Once + foreachBatch >> >> spark.readStream.format("kafka").load().writeStream.foreachBatch((df, >> batchId) => batchLogic(df)).trigger("once").start() >> >> With Option (1), you're going to have to (re)solve: >> a) Tracking and consistency of offsets >> b) Potential topic partition mismatches >> c) Offsets that may have aged out due to retention >> d) Re-execution of jobs and data consistency. What if your job fails as >> you're committing the offsets in the end, but the data was already stored? >> Will your getOffsets method return the same offsets? >> >> I'd rather not solve problems that other people have solved for me, but >> ultimately the decision is yours to make. >> >> Best, >> Burak >> >> >> >> >> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <liruijin...@gmail.com> wrote: >> >>> Thanks Anil, I think that’s the approach I will take. >>> >>> Hi Burak, >>> >>> That was a possibility to think about, but my team has custom dataframe >>> writer functions we would like to use, unfortunately they were written for >>> static dataframes in mind. I do see there is a ForEachBatch write mode but >>> my thinking was at that point it was easier to read from kafka through >>> batch mode. >>> >>> Thanks, >>> RJ >>> >>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <brk...@gmail.com> wrote: >>> >>>> Hi Ruijing, >>>> >>>> Why do you not want to use structured streaming here? This is exactly >>>> why structured streaming + Trigger.Once was built, just so that you don't >>>> build that solution yourself. >>>> You also get exactly once semantics if you use the built in sinks. >>>> >>>> Best, >>>> Burak >>>> >>>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <anil...@gmail.com> wrote: >>>> >>>>> Hi Ruijing, >>>>> >>>>> We did the below things to read Kafka in batch from spark: >>>>> >>>>> 1) Maintain the start offset (could be db, file etc) >>>>> 2) Get the end offset dynamically when the job executes. >>>>> 3) Pass the start and end offsets >>>>> 4) Overwrite the start offset with the end offset. (Should be done >>>>> post processing the data) >>>>> >>>>> Currently to make it work in batch mode, you need to maintain the >>>>> state information of the offsets externally. >>>>> >>>>> >>>>> Thanks >>>>> Anil >>>>> >>>>> -Sent from my mobile >>>>> http://anilkulkarni.com/ >>>>> >>>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <liruijin...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> My use case is to read from single kafka topic using a batch spark >>>>>> sql job (not structured streaming ideally). I want this batch job every >>>>>> time it starts to get the last offset it stopped at, and start reading >>>>>> from >>>>>> there until it caught up to the latest offset, store the result and stop >>>>>> the job. Given the dataframe has a partition and offset column, my first >>>>>> thought for offset management is to groupBy partition and agg the max >>>>>> offset, then store it in HDFS. Next time the job runs, it will read and >>>>>> start from this max offset using startingOffsets >>>>>> >>>>>> However, I was wondering if this will work. If the kafka producer >>>>>> failed an offset and later decides to resend it, I will have skipped it >>>>>> since I’m starting from the max offset sent. How does spark structured >>>>>> streaming know to continue onwards - does it keep a state of all offsets >>>>>> seen? If so, how can I replicate this for batch without missing data? Any >>>>>> help would be appreciated. >>>>>> >>>>>> >>>>>> -- >>>>>> Cheers, >>>>>> Ruijing Li >>>>>> >>>>> -- >>> Cheers, >>> Ruijing Li >>> >> -- Cheers, Ruijing Li