Yay! Glad you could figure it out! On Sat, Feb 15, 2020 at 7:41 AM Ruijing Li <liruijin...@gmail.com> wrote:
> Thought to update this thread. Figured out my issue with forEachBatch and > structured streaming, I had an issue where I did a count() before write() > so my streaming query branched into 2. I am now using Trigger and > structured streaming to handle checkpointing instead of doing it myself. > Thanks all for your help! > > On Wed, Feb 5, 2020 at 7:07 PM Ruijing Li <liruijin...@gmail.com> wrote: > >> Looks like I’m wrong, since I tried that exact snippet and it worked >> >> So to be clear, in the part where I do batchDF.write.parquet, that is not >> the exact code I’m using. >> >> I’m using a custom write function that does similar to write.parquet but >> has some added functionality. Somehow my custom write function isn’t >> working correctly >> >> Is batchDF a static dataframe though? >> >> Thanks >> >> On Wed, Feb 5, 2020 at 6:13 PM Ruijing Li <liruijin...@gmail.com> wrote: >> >>> Hi all, >>> >>> I tried with forEachBatch but got an error. Is this expected? >>> >>> Code is >>> >>> df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) >>> => >>> batchDF.write.parquet(hdfsPath) >>> } >>> .option(“checkPointLocation”, anotherHdfsPath) >>> .start() >>> >>> Exception is: Queries with streaming sources must be executed with >>> writeStream.start() >>> >>> But I thought forEachBatch would treat the batchDF as a static dataframe? >>> >>> Thanks, >>> RJ >>> >>> On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta < >>> gourav.sengu...@gmail.com> wrote: >>> >>>> Hi Burak, >>>> >>>> I am not quite used to streaming, but was almost thinking on the same >>>> lines :) makes a lot of sense to me now. >>>> >>>> Regards, >>>> Gourav >>>> >>>> On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <brk...@gmail.com> wrote: >>>> >>>>> Do you really want to build all of that and open yourself to bugs when >>>>> you can just use foreachBatch? Here are your options: >>>>> >>>>> 1. Build it yourself >>>>> >>>>> // Read offsets from some store >>>>> prevOffsets = readOffsets() >>>>> latestOffsets = getOffsets() >>>>> >>>>> df = spark.read.format("kafka").option("startOffsets", >>>>> prevOffsets).option("endOffsets", latestOffsets).load() >>>>> batchLogic(df) >>>>> >>>>> saveOffsets(latestOffsets) >>>>> >>>>> 2. Structured Streaming + Trigger.Once + foreachBatch >>>>> >>>>> spark.readStream.format("kafka").load().writeStream.foreachBatch((df, >>>>> batchId) => batchLogic(df)).trigger("once").start() >>>>> >>>>> With Option (1), you're going to have to (re)solve: >>>>> a) Tracking and consistency of offsets >>>>> b) Potential topic partition mismatches >>>>> c) Offsets that may have aged out due to retention >>>>> d) Re-execution of jobs and data consistency. What if your job fails >>>>> as you're committing the offsets in the end, but the data was already >>>>> stored? Will your getOffsets method return the same offsets? >>>>> >>>>> I'd rather not solve problems that other people have solved for me, >>>>> but ultimately the decision is yours to make. >>>>> >>>>> Best, >>>>> Burak >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <liruijin...@gmail.com> >>>>> wrote: >>>>> >>>>>> Thanks Anil, I think that’s the approach I will take. >>>>>> >>>>>> Hi Burak, >>>>>> >>>>>> That was a possibility to think about, but my team has custom >>>>>> dataframe writer functions we would like to use, unfortunately they were >>>>>> written for static dataframes in mind. I do see there is a ForEachBatch >>>>>> write mode but my thinking was at that point it was easier to read from >>>>>> kafka through batch mode. >>>>>> >>>>>> Thanks, >>>>>> RJ >>>>>> >>>>>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <brk...@gmail.com> wrote: >>>>>> >>>>>>> Hi Ruijing, >>>>>>> >>>>>>> Why do you not want to use structured streaming here? This is >>>>>>> exactly why structured streaming + Trigger.Once was built, just so that >>>>>>> you >>>>>>> don't build that solution yourself. >>>>>>> You also get exactly once semantics if you use the built in sinks. >>>>>>> >>>>>>> Best, >>>>>>> Burak >>>>>>> >>>>>>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <anil...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Ruijing, >>>>>>>> >>>>>>>> We did the below things to read Kafka in batch from spark: >>>>>>>> >>>>>>>> 1) Maintain the start offset (could be db, file etc) >>>>>>>> 2) Get the end offset dynamically when the job executes. >>>>>>>> 3) Pass the start and end offsets >>>>>>>> 4) Overwrite the start offset with the end offset. (Should be done >>>>>>>> post processing the data) >>>>>>>> >>>>>>>> Currently to make it work in batch mode, you need to maintain the >>>>>>>> state information of the offsets externally. >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> Anil >>>>>>>> >>>>>>>> -Sent from my mobile >>>>>>>> http://anilkulkarni.com/ >>>>>>>> >>>>>>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <liruijin...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> My use case is to read from single kafka topic using a batch spark >>>>>>>>> sql job (not structured streaming ideally). I want this batch job >>>>>>>>> every >>>>>>>>> time it starts to get the last offset it stopped at, and start >>>>>>>>> reading from >>>>>>>>> there until it caught up to the latest offset, store the result and >>>>>>>>> stop >>>>>>>>> the job. Given the dataframe has a partition and offset column, my >>>>>>>>> first >>>>>>>>> thought for offset management is to groupBy partition and agg the max >>>>>>>>> offset, then store it in HDFS. Next time the job runs, it will read >>>>>>>>> and >>>>>>>>> start from this max offset using startingOffsets >>>>>>>>> >>>>>>>>> However, I was wondering if this will work. If the kafka producer >>>>>>>>> failed an offset and later decides to resend it, I will have skipped >>>>>>>>> it >>>>>>>>> since I’m starting from the max offset sent. How does spark structured >>>>>>>>> streaming know to continue onwards - does it keep a state of all >>>>>>>>> offsets >>>>>>>>> seen? If so, how can I replicate this for batch without missing data? >>>>>>>>> Any >>>>>>>>> help would be appreciated. >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Cheers, >>>>>>>>> Ruijing Li >>>>>>>>> >>>>>>>> -- >>>>>> Cheers, >>>>>> Ruijing Li >>>>>> >>>>> -- >>> Cheers, >>> Ruijing Li >>> >> -- >> Cheers, >> Ruijing Li >> > -- > Cheers, > Ruijing Li >