Yay! Glad you could figure it out!

On Sat, Feb 15, 2020 at 7:41 AM Ruijing Li <liruijin...@gmail.com> wrote:

> Thought to update this thread. Figured out my issue with forEachBatch and
> structured streaming, I had an issue where I did a count() before write()
> so my streaming query branched into 2. I am now using Trigger and
> structured streaming to handle checkpointing instead of doing it myself.
> Thanks all for your help!
>
> On Wed, Feb 5, 2020 at 7:07 PM Ruijing Li <liruijin...@gmail.com> wrote:
>
>> Looks like I’m wrong, since I tried that exact snippet and it worked
>>
>> So to be clear, in the part where I do batchDF.write.parquet, that is not
>> the exact code I’m using.
>>
>> I’m using a custom write function that does similar to write.parquet but
>> has some added functionality. Somehow my custom write function isn’t
>> working correctly
>>
>>  Is batchDF a static dataframe though?
>>
>> Thanks
>>
>> On Wed, Feb 5, 2020 at 6:13 PM Ruijing Li <liruijin...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I tried with forEachBatch but got an error. Is this expected?
>>>
>>> Code is
>>>
>>> df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId)
>>> =>
>>> batchDF.write.parquet(hdfsPath)
>>> }
>>> .option(“checkPointLocation”, anotherHdfsPath)
>>> .start()
>>>
>>> Exception is: Queries with streaming sources must be executed with
>>> writeStream.start()
>>>
>>> But I thought forEachBatch would treat the batchDF as a static dataframe?
>>>
>>> Thanks,
>>> RJ
>>>
>>> On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi Burak,
>>>>
>>>> I am not quite used to streaming, but was almost thinking on the same
>>>> lines :) makes a lot of sense to me now.
>>>>
>>>> Regards,
>>>> Gourav
>>>>
>>>> On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <brk...@gmail.com> wrote:
>>>>
>>>>> Do you really want to build all of that and open yourself to bugs when
>>>>> you can just use foreachBatch? Here are your options:
>>>>>
>>>>> 1. Build it yourself
>>>>>
>>>>> // Read offsets from some store
>>>>> prevOffsets = readOffsets()
>>>>> latestOffsets = getOffsets()
>>>>>
>>>>> df = spark.read.format("kafka").option("startOffsets",
>>>>> prevOffsets).option("endOffsets", latestOffsets).load()
>>>>> batchLogic(df)
>>>>>
>>>>> saveOffsets(latestOffsets)
>>>>>
>>>>> 2. Structured Streaming + Trigger.Once + foreachBatch
>>>>>
>>>>> spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
>>>>> batchId) => batchLogic(df)).trigger("once").start()
>>>>>
>>>>> With Option (1), you're going to have to (re)solve:
>>>>>  a) Tracking and consistency of offsets
>>>>>  b) Potential topic partition mismatches
>>>>>  c) Offsets that may have aged out due to retention
>>>>>  d) Re-execution of jobs and data consistency. What if your job fails
>>>>> as you're committing the offsets in the end, but the data was already
>>>>> stored? Will your getOffsets method return the same offsets?
>>>>>
>>>>> I'd rather not solve problems that other people have solved for me,
>>>>> but ultimately the decision is yours to make.
>>>>>
>>>>> Best,
>>>>> Burak
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <liruijin...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Anil, I think that’s the approach I will take.
>>>>>>
>>>>>> Hi Burak,
>>>>>>
>>>>>> That was a possibility to think about, but my team has custom
>>>>>> dataframe writer functions we would like to use, unfortunately they were
>>>>>> written for static dataframes in mind. I do see there is a ForEachBatch
>>>>>> write mode but my thinking was at that point it was easier to read from
>>>>>> kafka through batch mode.
>>>>>>
>>>>>> Thanks,
>>>>>> RJ
>>>>>>
>>>>>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <brk...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Ruijing,
>>>>>>>
>>>>>>> Why do you not want to use structured streaming here? This is
>>>>>>> exactly why structured streaming + Trigger.Once was built, just so that 
>>>>>>> you
>>>>>>> don't build that solution yourself.
>>>>>>> You also get exactly once semantics if you use the built in sinks.
>>>>>>>
>>>>>>> Best,
>>>>>>> Burak
>>>>>>>
>>>>>>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <anil...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Ruijing,
>>>>>>>>
>>>>>>>> We did the below things to read Kafka in batch from spark:
>>>>>>>>
>>>>>>>> 1) Maintain the start offset (could be db, file etc)
>>>>>>>> 2) Get the end offset dynamically when the job executes.
>>>>>>>> 3) Pass the start and end offsets
>>>>>>>> 4) Overwrite the start offset with the end offset. (Should be done
>>>>>>>> post processing the data)
>>>>>>>>
>>>>>>>> Currently to make it work in batch mode, you need to maintain the
>>>>>>>> state information of the offsets externally.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Anil
>>>>>>>>
>>>>>>>> -Sent from my mobile
>>>>>>>> http://anilkulkarni.com/
>>>>>>>>
>>>>>>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <liruijin...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> My use case is to read from single kafka topic using a batch spark
>>>>>>>>> sql job (not structured streaming ideally). I want this batch job 
>>>>>>>>> every
>>>>>>>>> time it starts to get the last offset it stopped at, and start 
>>>>>>>>> reading from
>>>>>>>>> there until it caught up to the latest offset, store the result and 
>>>>>>>>> stop
>>>>>>>>> the job. Given the dataframe has a partition and offset column, my 
>>>>>>>>> first
>>>>>>>>> thought for offset management is to groupBy partition and agg the max
>>>>>>>>> offset, then store it in HDFS. Next time the job runs, it will read 
>>>>>>>>> and
>>>>>>>>> start from this max offset using startingOffsets
>>>>>>>>>
>>>>>>>>> However, I was wondering if this will work. If the kafka producer
>>>>>>>>> failed an offset and later decides to resend it, I will have skipped 
>>>>>>>>> it
>>>>>>>>> since I’m starting from the max offset sent. How does spark structured
>>>>>>>>> streaming know to continue onwards - does it keep a state of all 
>>>>>>>>> offsets
>>>>>>>>> seen? If so, how can I replicate this for batch without missing data? 
>>>>>>>>> Any
>>>>>>>>> help would be appreciated.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Cheers,
>>>>>>>>> Ruijing Li
>>>>>>>>>
>>>>>>>> --
>>>>>> Cheers,
>>>>>> Ruijing Li
>>>>>>
>>>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
>> Cheers,
>> Ruijing Li
>>
> --
> Cheers,
> Ruijing Li
>

Reply via email to