Hi Burak,

I am not quite used to streaming, but was almost thinking on the same lines
:) makes a lot of sense to me now.

Regards,
Gourav

On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <brk...@gmail.com> wrote:

> Do you really want to build all of that and open yourself to bugs when you
> can just use foreachBatch? Here are your options:
>
> 1. Build it yourself
>
> // Read offsets from some store
> prevOffsets = readOffsets()
> latestOffsets = getOffsets()
>
> df = spark.read.format("kafka").option("startOffsets",
> prevOffsets).option("endOffsets", latestOffsets).load()
> batchLogic(df)
>
> saveOffsets(latestOffsets)
>
> 2. Structured Streaming + Trigger.Once + foreachBatch
>
> spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
> batchId) => batchLogic(df)).trigger("once").start()
>
> With Option (1), you're going to have to (re)solve:
>  a) Tracking and consistency of offsets
>  b) Potential topic partition mismatches
>  c) Offsets that may have aged out due to retention
>  d) Re-execution of jobs and data consistency. What if your job fails as
> you're committing the offsets in the end, but the data was already stored?
> Will your getOffsets method return the same offsets?
>
> I'd rather not solve problems that other people have solved for me, but
> ultimately the decision is yours to make.
>
> Best,
> Burak
>
>
>
>
> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <liruijin...@gmail.com> wrote:
>
>> Thanks Anil, I think that’s the approach I will take.
>>
>> Hi Burak,
>>
>> That was a possibility to think about, but my team has custom dataframe
>> writer functions we would like to use, unfortunately they were written for
>> static dataframes in mind. I do see there is a ForEachBatch write mode but
>> my thinking was at that point it was easier to read from kafka through
>> batch mode.
>>
>> Thanks,
>> RJ
>>
>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <brk...@gmail.com> wrote:
>>
>>> Hi Ruijing,
>>>
>>> Why do you not want to use structured streaming here? This is exactly
>>> why structured streaming + Trigger.Once was built, just so that you don't
>>> build that solution yourself.
>>> You also get exactly once semantics if you use the built in sinks.
>>>
>>> Best,
>>> Burak
>>>
>>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <anil...@gmail.com> wrote:
>>>
>>>> Hi Ruijing,
>>>>
>>>> We did the below things to read Kafka in batch from spark:
>>>>
>>>> 1) Maintain the start offset (could be db, file etc)
>>>> 2) Get the end offset dynamically when the job executes.
>>>> 3) Pass the start and end offsets
>>>> 4) Overwrite the start offset with the end offset. (Should be done post
>>>> processing the data)
>>>>
>>>> Currently to make it work in batch mode, you need to maintain the state
>>>> information of the offsets externally.
>>>>
>>>>
>>>> Thanks
>>>> Anil
>>>>
>>>> -Sent from my mobile
>>>> http://anilkulkarni.com/
>>>>
>>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <liruijin...@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> My use case is to read from single kafka topic using a batch spark sql
>>>>> job (not structured streaming ideally). I want this batch job every time 
>>>>> it
>>>>> starts to get the last offset it stopped at, and start reading from there
>>>>> until it caught up to the latest offset, store the result and stop the 
>>>>> job.
>>>>> Given the dataframe has a partition and offset column, my first thought 
>>>>> for
>>>>> offset management is to groupBy partition and agg the max offset, then
>>>>> store it in HDFS. Next time the job runs, it will read and start from this
>>>>> max offset using startingOffsets
>>>>>
>>>>> However, I was wondering if this will work. If the kafka producer
>>>>> failed an offset and later decides to resend it, I will have skipped it
>>>>> since I’m starting from the max offset sent. How does spark structured
>>>>> streaming know to continue onwards - does it keep a state of all offsets
>>>>> seen? If so, how can I replicate this for batch without missing data? Any
>>>>> help would be appreciated.
>>>>>
>>>>>
>>>>> --
>>>>> Cheers,
>>>>> Ruijing Li
>>>>>
>>>> --
>> Cheers,
>> Ruijing Li
>>
>

Reply via email to