Hello Spark Team,
Greetings!
I am writing this mail to get suggestions on the observation below.
*Use Case:* Spark Structured Streaming to extract data from Azure Event
Hub, process it, and write it to Snowflake Database table using
ForEachBatch with Epoch_Id/ Batch_Id passed to the foreach batch function.
*Code Snippet:*
ehConf = {}
ehConf['eventhubs.connectionString'] =
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EVENT_HUB_CONNECTION_STRING)
ehConf['eventhubs.consumerGroup'] = consumergroup
# Read stream data from event hub
spark_df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
# Write to Snowflake
def foreach_batch_function(df, epoch_id):
df.write\
.format(SNOWFLAKE_SOURCE_NAME)\
.options(**sfOptions)\
.option("dbtable", snowflake_table)\
.mode('append')\
.save()
processed_df.writeStream.outputMode('append').\
trigger(processingTime='10 seconds').\
option("checkpointLocation",f"checkpoint/P1").\
foreachBatch(foreach_batch_function).start()
*Observation:* When node failure occurs. Although on spark structured
official website, it is mentioned that when one uses ForeachBatch along
with epoch_id/batch_id during recovery from node failure there shouldn't be
any duplicates, but I do find duplicates getting populated in my snowflake
tables. Link for reference :
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
.
In my use case I wanted data to be populated in Snowflake without any
duplicates. Is there something which I am missing in my code OR Is it
achievable ?
Any suggestion/ solution to achieve the above would be helpful.
Eagerly waiting to hear from you.
Thank You,
Regards,
Vedant.