Looks like I’m wrong, since I tried that exact snippet and it worked

So to be clear, in the part where I do batchDF.write.parquet, that is not
the exact code I’m using.

I’m using a custom write function that does similar to write.parquet but
has some added functionality. Somehow my custom write function isn’t
working correctly

 Is batchDF a static dataframe though?

Thanks

On Wed, Feb 5, 2020 at 6:13 PM Ruijing Li <liruijin...@gmail.com> wrote:

> Hi all,
>
> I tried with forEachBatch but got an error. Is this expected?
>
> Code is
>
> df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) =>
> batchDF.write.parquet(hdfsPath)
> }
> .option(“checkPointLocation”, anotherHdfsPath)
> .start()
>
> Exception is: Queries with streaming sources must be executed with
> writeStream.start()
>
> But I thought forEachBatch would treat the batchDF as a static dataframe?
>
> Thanks,
> RJ
>
> On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
>> Hi Burak,
>>
>> I am not quite used to streaming, but was almost thinking on the same
>> lines :) makes a lot of sense to me now.
>>
>> Regards,
>> Gourav
>>
>> On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <brk...@gmail.com> wrote:
>>
>>> Do you really want to build all of that and open yourself to bugs when
>>> you can just use foreachBatch? Here are your options:
>>>
>>> 1. Build it yourself
>>>
>>> // Read offsets from some store
>>> prevOffsets = readOffsets()
>>> latestOffsets = getOffsets()
>>>
>>> df = spark.read.format("kafka").option("startOffsets",
>>> prevOffsets).option("endOffsets", latestOffsets).load()
>>> batchLogic(df)
>>>
>>> saveOffsets(latestOffsets)
>>>
>>> 2. Structured Streaming + Trigger.Once + foreachBatch
>>>
>>> spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
>>> batchId) => batchLogic(df)).trigger("once").start()
>>>
>>> With Option (1), you're going to have to (re)solve:
>>>  a) Tracking and consistency of offsets
>>>  b) Potential topic partition mismatches
>>>  c) Offsets that may have aged out due to retention
>>>  d) Re-execution of jobs and data consistency. What if your job fails as
>>> you're committing the offsets in the end, but the data was already stored?
>>> Will your getOffsets method return the same offsets?
>>>
>>> I'd rather not solve problems that other people have solved for me, but
>>> ultimately the decision is yours to make.
>>>
>>> Best,
>>> Burak
>>>
>>>
>>>
>>>
>>> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <liruijin...@gmail.com> wrote:
>>>
>>>> Thanks Anil, I think that’s the approach I will take.
>>>>
>>>> Hi Burak,
>>>>
>>>> That was a possibility to think about, but my team has custom dataframe
>>>> writer functions we would like to use, unfortunately they were written for
>>>> static dataframes in mind. I do see there is a ForEachBatch write mode but
>>>> my thinking was at that point it was easier to read from kafka through
>>>> batch mode.
>>>>
>>>> Thanks,
>>>> RJ
>>>>
>>>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <brk...@gmail.com> wrote:
>>>>
>>>>> Hi Ruijing,
>>>>>
>>>>> Why do you not want to use structured streaming here? This is exactly
>>>>> why structured streaming + Trigger.Once was built, just so that you don't
>>>>> build that solution yourself.
>>>>> You also get exactly once semantics if you use the built in sinks.
>>>>>
>>>>> Best,
>>>>> Burak
>>>>>
>>>>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <anil...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ruijing,
>>>>>>
>>>>>> We did the below things to read Kafka in batch from spark:
>>>>>>
>>>>>> 1) Maintain the start offset (could be db, file etc)
>>>>>> 2) Get the end offset dynamically when the job executes.
>>>>>> 3) Pass the start and end offsets
>>>>>> 4) Overwrite the start offset with the end offset. (Should be done
>>>>>> post processing the data)
>>>>>>
>>>>>> Currently to make it work in batch mode, you need to maintain the
>>>>>> state information of the offsets externally.
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Anil
>>>>>>
>>>>>> -Sent from my mobile
>>>>>> http://anilkulkarni.com/
>>>>>>
>>>>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <liruijin...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> My use case is to read from single kafka topic using a batch spark
>>>>>>> sql job (not structured streaming ideally). I want this batch job every
>>>>>>> time it starts to get the last offset it stopped at, and start reading 
>>>>>>> from
>>>>>>> there until it caught up to the latest offset, store the result and stop
>>>>>>> the job. Given the dataframe has a partition and offset column, my first
>>>>>>> thought for offset management is to groupBy partition and agg the max
>>>>>>> offset, then store it in HDFS. Next time the job runs, it will read and
>>>>>>> start from this max offset using startingOffsets
>>>>>>>
>>>>>>> However, I was wondering if this will work. If the kafka producer
>>>>>>> failed an offset and later decides to resend it, I will have skipped it
>>>>>>> since I’m starting from the max offset sent. How does spark structured
>>>>>>> streaming know to continue onwards - does it keep a state of all offsets
>>>>>>> seen? If so, how can I replicate this for batch without missing data? 
>>>>>>> Any
>>>>>>> help would be appreciated.
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Cheers,
>>>>>>> Ruijing Li
>>>>>>>
>>>>>> --
>>>> Cheers,
>>>> Ruijing Li
>>>>
>>> --
> Cheers,
> Ruijing Li
>
-- 
Cheers,
Ruijing Li

Reply via email to