Looks like I’m wrong, since I tried that exact snippet and it worked So to be clear, in the part where I do batchDF.write.parquet, that is not the exact code I’m using.
I’m using a custom write function that does similar to write.parquet but has some added functionality. Somehow my custom write function isn’t working correctly Is batchDF a static dataframe though? Thanks On Wed, Feb 5, 2020 at 6:13 PM Ruijing Li <liruijin...@gmail.com> wrote: > Hi all, > > I tried with forEachBatch but got an error. Is this expected? > > Code is > > df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) => > batchDF.write.parquet(hdfsPath) > } > .option(“checkPointLocation”, anotherHdfsPath) > .start() > > Exception is: Queries with streaming sources must be executed with > writeStream.start() > > But I thought forEachBatch would treat the batchDF as a static dataframe? > > Thanks, > RJ > > On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta <gourav.sengu...@gmail.com> > wrote: > >> Hi Burak, >> >> I am not quite used to streaming, but was almost thinking on the same >> lines :) makes a lot of sense to me now. >> >> Regards, >> Gourav >> >> On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <brk...@gmail.com> wrote: >> >>> Do you really want to build all of that and open yourself to bugs when >>> you can just use foreachBatch? Here are your options: >>> >>> 1. Build it yourself >>> >>> // Read offsets from some store >>> prevOffsets = readOffsets() >>> latestOffsets = getOffsets() >>> >>> df = spark.read.format("kafka").option("startOffsets", >>> prevOffsets).option("endOffsets", latestOffsets).load() >>> batchLogic(df) >>> >>> saveOffsets(latestOffsets) >>> >>> 2. Structured Streaming + Trigger.Once + foreachBatch >>> >>> spark.readStream.format("kafka").load().writeStream.foreachBatch((df, >>> batchId) => batchLogic(df)).trigger("once").start() >>> >>> With Option (1), you're going to have to (re)solve: >>> a) Tracking and consistency of offsets >>> b) Potential topic partition mismatches >>> c) Offsets that may have aged out due to retention >>> d) Re-execution of jobs and data consistency. What if your job fails as >>> you're committing the offsets in the end, but the data was already stored? >>> Will your getOffsets method return the same offsets? >>> >>> I'd rather not solve problems that other people have solved for me, but >>> ultimately the decision is yours to make. >>> >>> Best, >>> Burak >>> >>> >>> >>> >>> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <liruijin...@gmail.com> wrote: >>> >>>> Thanks Anil, I think that’s the approach I will take. >>>> >>>> Hi Burak, >>>> >>>> That was a possibility to think about, but my team has custom dataframe >>>> writer functions we would like to use, unfortunately they were written for >>>> static dataframes in mind. I do see there is a ForEachBatch write mode but >>>> my thinking was at that point it was easier to read from kafka through >>>> batch mode. >>>> >>>> Thanks, >>>> RJ >>>> >>>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <brk...@gmail.com> wrote: >>>> >>>>> Hi Ruijing, >>>>> >>>>> Why do you not want to use structured streaming here? This is exactly >>>>> why structured streaming + Trigger.Once was built, just so that you don't >>>>> build that solution yourself. >>>>> You also get exactly once semantics if you use the built in sinks. >>>>> >>>>> Best, >>>>> Burak >>>>> >>>>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <anil...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Ruijing, >>>>>> >>>>>> We did the below things to read Kafka in batch from spark: >>>>>> >>>>>> 1) Maintain the start offset (could be db, file etc) >>>>>> 2) Get the end offset dynamically when the job executes. >>>>>> 3) Pass the start and end offsets >>>>>> 4) Overwrite the start offset with the end offset. (Should be done >>>>>> post processing the data) >>>>>> >>>>>> Currently to make it work in batch mode, you need to maintain the >>>>>> state information of the offsets externally. >>>>>> >>>>>> >>>>>> Thanks >>>>>> Anil >>>>>> >>>>>> -Sent from my mobile >>>>>> http://anilkulkarni.com/ >>>>>> >>>>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <liruijin...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> My use case is to read from single kafka topic using a batch spark >>>>>>> sql job (not structured streaming ideally). I want this batch job every >>>>>>> time it starts to get the last offset it stopped at, and start reading >>>>>>> from >>>>>>> there until it caught up to the latest offset, store the result and stop >>>>>>> the job. Given the dataframe has a partition and offset column, my first >>>>>>> thought for offset management is to groupBy partition and agg the max >>>>>>> offset, then store it in HDFS. Next time the job runs, it will read and >>>>>>> start from this max offset using startingOffsets >>>>>>> >>>>>>> However, I was wondering if this will work. If the kafka producer >>>>>>> failed an offset and later decides to resend it, I will have skipped it >>>>>>> since I’m starting from the max offset sent. How does spark structured >>>>>>> streaming know to continue onwards - does it keep a state of all offsets >>>>>>> seen? If so, how can I replicate this for batch without missing data? >>>>>>> Any >>>>>>> help would be appreciated. >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Cheers, >>>>>>> Ruijing Li >>>>>>> >>>>>> -- >>>> Cheers, >>>> Ruijing Li >>>> >>> -- > Cheers, > Ruijing Li > -- Cheers, Ruijing Li