Hi all,

I tried with forEachBatch but got an error. Is this expected?

Code is

df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) =>
batchDF.write.parquet(hdfsPath)
}
.option(“checkPointLocation”, anotherHdfsPath)
.start()

Exception is: Queries with streaming sources must be executed with
writeStream.start()

But I thought forEachBatch would treat the batchDF as a static dataframe?

Thanks,
RJ

On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi Burak,
>
> I am not quite used to streaming, but was almost thinking on the same
> lines :) makes a lot of sense to me now.
>
> Regards,
> Gourav
>
> On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <brk...@gmail.com> wrote:
>
>> Do you really want to build all of that and open yourself to bugs when
>> you can just use foreachBatch? Here are your options:
>>
>> 1. Build it yourself
>>
>> // Read offsets from some store
>> prevOffsets = readOffsets()
>> latestOffsets = getOffsets()
>>
>> df = spark.read.format("kafka").option("startOffsets",
>> prevOffsets).option("endOffsets", latestOffsets).load()
>> batchLogic(df)
>>
>> saveOffsets(latestOffsets)
>>
>> 2. Structured Streaming + Trigger.Once + foreachBatch
>>
>> spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
>> batchId) => batchLogic(df)).trigger("once").start()
>>
>> With Option (1), you're going to have to (re)solve:
>>  a) Tracking and consistency of offsets
>>  b) Potential topic partition mismatches
>>  c) Offsets that may have aged out due to retention
>>  d) Re-execution of jobs and data consistency. What if your job fails as
>> you're committing the offsets in the end, but the data was already stored?
>> Will your getOffsets method return the same offsets?
>>
>> I'd rather not solve problems that other people have solved for me, but
>> ultimately the decision is yours to make.
>>
>> Best,
>> Burak
>>
>>
>>
>>
>> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <liruijin...@gmail.com> wrote:
>>
>>> Thanks Anil, I think that’s the approach I will take.
>>>
>>> Hi Burak,
>>>
>>> That was a possibility to think about, but my team has custom dataframe
>>> writer functions we would like to use, unfortunately they were written for
>>> static dataframes in mind. I do see there is a ForEachBatch write mode but
>>> my thinking was at that point it was easier to read from kafka through
>>> batch mode.
>>>
>>> Thanks,
>>> RJ
>>>
>>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <brk...@gmail.com> wrote:
>>>
>>>> Hi Ruijing,
>>>>
>>>> Why do you not want to use structured streaming here? This is exactly
>>>> why structured streaming + Trigger.Once was built, just so that you don't
>>>> build that solution yourself.
>>>> You also get exactly once semantics if you use the built in sinks.
>>>>
>>>> Best,
>>>> Burak
>>>>
>>>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <anil...@gmail.com> wrote:
>>>>
>>>>> Hi Ruijing,
>>>>>
>>>>> We did the below things to read Kafka in batch from spark:
>>>>>
>>>>> 1) Maintain the start offset (could be db, file etc)
>>>>> 2) Get the end offset dynamically when the job executes.
>>>>> 3) Pass the start and end offsets
>>>>> 4) Overwrite the start offset with the end offset. (Should be done
>>>>> post processing the data)
>>>>>
>>>>> Currently to make it work in batch mode, you need to maintain the
>>>>> state information of the offsets externally.
>>>>>
>>>>>
>>>>> Thanks
>>>>> Anil
>>>>>
>>>>> -Sent from my mobile
>>>>> http://anilkulkarni.com/
>>>>>
>>>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <liruijin...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> My use case is to read from single kafka topic using a batch spark
>>>>>> sql job (not structured streaming ideally). I want this batch job every
>>>>>> time it starts to get the last offset it stopped at, and start reading 
>>>>>> from
>>>>>> there until it caught up to the latest offset, store the result and stop
>>>>>> the job. Given the dataframe has a partition and offset column, my first
>>>>>> thought for offset management is to groupBy partition and agg the max
>>>>>> offset, then store it in HDFS. Next time the job runs, it will read and
>>>>>> start from this max offset using startingOffsets
>>>>>>
>>>>>> However, I was wondering if this will work. If the kafka producer
>>>>>> failed an offset and later decides to resend it, I will have skipped it
>>>>>> since I’m starting from the max offset sent. How does spark structured
>>>>>> streaming know to continue onwards - does it keep a state of all offsets
>>>>>> seen? If so, how can I replicate this for batch without missing data? Any
>>>>>> help would be appreciated.
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> Ruijing Li
>>>>>>
>>>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
Cheers,
Ruijing Li

Reply via email to