Re: Best way to read batch from Kafka and Offsets

2020-02-15 Thread Ruijing Li
Thought to update this thread. Figured out my issue with forEachBatch and structured streaming, I had an issue where I did a count() before write() so my streaming query branched into 2. I am now using Trigger and structured streaming to handle checkpointing instead of doing it myself. Thanks all

Re: Best way to read batch from Kafka and Offsets

2020-02-05 Thread Ruijing Li
Looks like I’m wrong, since I tried that exact snippet and it worked So to be clear, in the part where I do batchDF.write.parquet, that is not the exact code I’m using. I’m using a custom write function that does similar to write.parquet but has some added functionality. Somehow my custom write

Re: Best way to read batch from Kafka and Offsets

2020-02-05 Thread Ruijing Li
Hi all, I tried with forEachBatch but got an error. Is this expected? Code is df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) => batchDF.write.parquet(hdfsPath) } .option(“checkPointLocation”, anotherHdfsPath) .start() Exception is: Queries with streaming sources must be

Re: Best way to read batch from Kafka and Offsets

2020-02-05 Thread Gourav Sengupta
Hi Burak, I am not quite used to streaming, but was almost thinking on the same lines :) makes a lot of sense to me now. Regards, Gourav On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz wrote: > Do you really want to build all of that and open yourself to bugs when you > can just use foreachBatch?

Re: Best way to read batch from Kafka and Offsets

2020-02-04 Thread Burak Yavuz
Do you really want to build all of that and open yourself to bugs when you can just use foreachBatch? Here are your options: 1. Build it yourself // Read offsets from some store prevOffsets = readOffsets() latestOffsets = getOffsets() df = spark.read.format("kafka").option("startOffsets",

Re: Best way to read batch from Kafka and Offsets

2020-02-04 Thread Ruijing Li
Thanks Anil, I think that’s the approach I will take. Hi Burak, That was a possibility to think about, but my team has custom dataframe writer functions we would like to use, unfortunately they were written for static dataframes in mind. I do see there is a ForEachBatch write mode but my

Re: Best way to read batch from Kafka and Offsets

2020-02-04 Thread Burak Yavuz
Hi Ruijing, Why do you not want to use structured streaming here? This is exactly why structured streaming + Trigger.Once was built, just so that you don't build that solution yourself. You also get exactly once semantics if you use the built in sinks. Best, Burak On Mon, Feb 3, 2020 at 3:15 PM

Re: Best way to read batch from Kafka and Offsets

2020-02-03 Thread Anil Kulkarni
Hi Ruijing, We did the below things to read Kafka in batch from spark: 1) Maintain the start offset (could be db, file etc) 2) Get the end offset dynamically when the job executes. 3) Pass the start and end offsets 4) Overwrite the start offset with the end offset. (Should be done post

Re: Best way to read batch from Kafka and Offsets

2020-02-03 Thread Chris Teoh
The most common delivery semantic for Kafka producer is at least once. So your consumers have to handle dedupe. Spark can do checkpoint but you have to be explicit about it. It only makes sense if your dataframe lineage gets too long (only if you're doing a highly iterative algorithm) and you

Re: Best way to read batch from Kafka and Offsets

2020-02-03 Thread Ruijing Li
Hi Chris, Thanks for the answer. So if I understand correctly: - there will be need to dedupe since I should be expecting at least once delivery. - storing the result of (group by partition and and aggregate max offsets) is enough since kafka message is immutable, so a message will get sent

Re: Best way to read batch from Kafka and Offsets

2020-02-03 Thread Chris Teoh
Kafka can keep track of the offsets (in a separate topic based on your consumer group) you've seen but it is usually best effort and you're probably better off also keeping track of your offsets. If the producer resends a message you would have to dedupe it as you've most likely already seen it,

Best way to read batch from Kafka and Offsets

2020-02-03 Thread Ruijing Li
Hi all, My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop