Thought to update this thread. Figured out my issue with forEachBatch and
structured streaming, I had an issue where I did a count() before write()
so my streaming query branched into 2. I am now using Trigger and
structured streaming to handle checkpointing instead of doing it myself.
Thanks all
Looks like I’m wrong, since I tried that exact snippet and it worked
So to be clear, in the part where I do batchDF.write.parquet, that is not
the exact code I’m using.
I’m using a custom write function that does similar to write.parquet but
has some added functionality. Somehow my custom write
Hi all,
I tried with forEachBatch but got an error. Is this expected?
Code is
df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) =>
batchDF.write.parquet(hdfsPath)
}
.option(“checkPointLocation”, anotherHdfsPath)
.start()
Exception is: Queries with streaming sources must be
Hi Burak,
I am not quite used to streaming, but was almost thinking on the same lines
:) makes a lot of sense to me now.
Regards,
Gourav
On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz wrote:
> Do you really want to build all of that and open yourself to bugs when you
> can just use foreachBatch?
Do you really want to build all of that and open yourself to bugs when you
can just use foreachBatch? Here are your options:
1. Build it yourself
// Read offsets from some store
prevOffsets = readOffsets()
latestOffsets = getOffsets()
df = spark.read.format("kafka").option("startOffsets",
Thanks Anil, I think that’s the approach I will take.
Hi Burak,
That was a possibility to think about, but my team has custom dataframe
writer functions we would like to use, unfortunately they were written for
static dataframes in mind. I do see there is a ForEachBatch write mode but
my
Hi Ruijing,
Why do you not want to use structured streaming here? This is exactly why
structured streaming + Trigger.Once was built, just so that you don't build
that solution yourself.
You also get exactly once semantics if you use the built in sinks.
Best,
Burak
On Mon, Feb 3, 2020 at 3:15 PM
Hi Ruijing,
We did the below things to read Kafka in batch from spark:
1) Maintain the start offset (could be db, file etc)
2) Get the end offset dynamically when the job executes.
3) Pass the start and end offsets
4) Overwrite the start offset with the end offset. (Should be done post
The most common delivery semantic for Kafka producer is at least once.
So your consumers have to handle dedupe.
Spark can do checkpoint but you have to be explicit about it. It only makes
sense if your dataframe lineage gets too long (only if you're doing a
highly iterative algorithm) and you
Hi Chris,
Thanks for the answer. So if I understand correctly:
- there will be need to dedupe since I should be expecting at least once
delivery.
- storing the result of (group by partition and and aggregate max offsets)
is enough since kafka message is immutable, so a message will get sent
Kafka can keep track of the offsets (in a separate topic based on your
consumer group) you've seen but it is usually best effort and you're
probably better off also keeping track of your offsets.
If the producer resends a message you would have to dedupe it as you've
most likely already seen it,
Hi all,
My use case is to read from single kafka topic using a batch spark sql job
(not structured streaming ideally). I want this batch job every time it
starts to get the last offset it stopped at, and start reading from there
until it caught up to the latest offset, store the result and stop
12 matches
Mail list logo