Thought to update this thread. Figured out my issue with forEachBatch and
structured streaming, I had an issue where I did a count() before write()
so my streaming query branched into 2. I am now using Trigger and
structured streaming to handle checkpointing instead of doing it myself.
Thanks all for your help!

On Wed, Feb 5, 2020 at 7:07 PM Ruijing Li <liruijin...@gmail.com> wrote:

> Looks like I’m wrong, since I tried that exact snippet and it worked
>
> So to be clear, in the part where I do batchDF.write.parquet, that is not
> the exact code I’m using.
>
> I’m using a custom write function that does similar to write.parquet but
> has some added functionality. Somehow my custom write function isn’t
> working correctly
>
>  Is batchDF a static dataframe though?
>
> Thanks
>
> On Wed, Feb 5, 2020 at 6:13 PM Ruijing Li <liruijin...@gmail.com> wrote:
>
>> Hi all,
>>
>> I tried with forEachBatch but got an error. Is this expected?
>>
>> Code is
>>
>> df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) =>
>> batchDF.write.parquet(hdfsPath)
>> }
>> .option(“checkPointLocation”, anotherHdfsPath)
>> .start()
>>
>> Exception is: Queries with streaming sources must be executed with
>> writeStream.start()
>>
>> But I thought forEachBatch would treat the batchDF as a static dataframe?
>>
>> Thanks,
>> RJ
>>
>> On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi Burak,
>>>
>>> I am not quite used to streaming, but was almost thinking on the same
>>> lines :) makes a lot of sense to me now.
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <brk...@gmail.com> wrote:
>>>
>>>> Do you really want to build all of that and open yourself to bugs when
>>>> you can just use foreachBatch? Here are your options:
>>>>
>>>> 1. Build it yourself
>>>>
>>>> // Read offsets from some store
>>>> prevOffsets = readOffsets()
>>>> latestOffsets = getOffsets()
>>>>
>>>> df = spark.read.format("kafka").option("startOffsets",
>>>> prevOffsets).option("endOffsets", latestOffsets).load()
>>>> batchLogic(df)
>>>>
>>>> saveOffsets(latestOffsets)
>>>>
>>>> 2. Structured Streaming + Trigger.Once + foreachBatch
>>>>
>>>> spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
>>>> batchId) => batchLogic(df)).trigger("once").start()
>>>>
>>>> With Option (1), you're going to have to (re)solve:
>>>>  a) Tracking and consistency of offsets
>>>>  b) Potential topic partition mismatches
>>>>  c) Offsets that may have aged out due to retention
>>>>  d) Re-execution of jobs and data consistency. What if your job fails
>>>> as you're committing the offsets in the end, but the data was already
>>>> stored? Will your getOffsets method return the same offsets?
>>>>
>>>> I'd rather not solve problems that other people have solved for me, but
>>>> ultimately the decision is yours to make.
>>>>
>>>> Best,
>>>> Burak
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <liruijin...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Anil, I think that’s the approach I will take.
>>>>>
>>>>> Hi Burak,
>>>>>
>>>>> That was a possibility to think about, but my team has custom
>>>>> dataframe writer functions we would like to use, unfortunately they were
>>>>> written for static dataframes in mind. I do see there is a ForEachBatch
>>>>> write mode but my thinking was at that point it was easier to read from
>>>>> kafka through batch mode.
>>>>>
>>>>> Thanks,
>>>>> RJ
>>>>>
>>>>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <brk...@gmail.com> wrote:
>>>>>
>>>>>> Hi Ruijing,
>>>>>>
>>>>>> Why do you not want to use structured streaming here? This is exactly
>>>>>> why structured streaming + Trigger.Once was built, just so that you don't
>>>>>> build that solution yourself.
>>>>>> You also get exactly once semantics if you use the built in sinks.
>>>>>>
>>>>>> Best,
>>>>>> Burak
>>>>>>
>>>>>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <anil...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Ruijing,
>>>>>>>
>>>>>>> We did the below things to read Kafka in batch from spark:
>>>>>>>
>>>>>>> 1) Maintain the start offset (could be db, file etc)
>>>>>>> 2) Get the end offset dynamically when the job executes.
>>>>>>> 3) Pass the start and end offsets
>>>>>>> 4) Overwrite the start offset with the end offset. (Should be done
>>>>>>> post processing the data)
>>>>>>>
>>>>>>> Currently to make it work in batch mode, you need to maintain the
>>>>>>> state information of the offsets externally.
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Anil
>>>>>>>
>>>>>>> -Sent from my mobile
>>>>>>> http://anilkulkarni.com/
>>>>>>>
>>>>>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <liruijin...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> My use case is to read from single kafka topic using a batch spark
>>>>>>>> sql job (not structured streaming ideally). I want this batch job every
>>>>>>>> time it starts to get the last offset it stopped at, and start reading 
>>>>>>>> from
>>>>>>>> there until it caught up to the latest offset, store the result and 
>>>>>>>> stop
>>>>>>>> the job. Given the dataframe has a partition and offset column, my 
>>>>>>>> first
>>>>>>>> thought for offset management is to groupBy partition and agg the max
>>>>>>>> offset, then store it in HDFS. Next time the job runs, it will read and
>>>>>>>> start from this max offset using startingOffsets
>>>>>>>>
>>>>>>>> However, I was wondering if this will work. If the kafka producer
>>>>>>>> failed an offset and later decides to resend it, I will have skipped it
>>>>>>>> since I’m starting from the max offset sent. How does spark structured
>>>>>>>> streaming know to continue onwards - does it keep a state of all 
>>>>>>>> offsets
>>>>>>>> seen? If so, how can I replicate this for batch without missing data? 
>>>>>>>> Any
>>>>>>>> help would be appreciated.
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Cheers,
>>>>>>>> Ruijing Li
>>>>>>>>
>>>>>>> --
>>>>> Cheers,
>>>>> Ruijing Li
>>>>>
>>>> --
>> Cheers,
>> Ruijing Li
>>
> --
> Cheers,
> Ruijing Li
>
-- 
Cheers,
Ruijing Li

Reply via email to