Hi Team,

I am working on a basic streaming aggregation where I have one file stream
source and two write sinks (Hudi table). The only difference is the
aggregation performed is different, hence I am using the same spark
session to perform both operations.

(File Source)
    --> Agg1 -> DF1
    --> Agg2 -> DF2

After processing both, I write in parallel to Hudi table through
forEachBatch

// Seq(DF1, DF2).par.foreach(writeToHudi)

writeToHudi(df) {
df.writeStream.queryName("unique_name_per_stream").forEachBatch(df =>
df.write.options().mode("append").save("path")).trigger("trigger_time").start()
}

I didn't mention the hudi options, both have different table names and
options.. For brevity avoided mentioning it.

While doing so, one of the write query stream ends (FINISHED) state right
after reading the file source.. Not sure what's happening, but locally when
I run the test cases with the same method - works fine.

Spark Version: 3.3.2
EMR : 6.13.0
Hudi version : 0.13.1

Any suggestion would be really helpful. Let me know if you need additional
details.

Thanks,
Subash

Reply via email to