Hello Spark Team,

Greetings!


I am writing this mail to get suggestions on the observation below.



*Use Case:* Spark Structured Streaming to extract data from Azure Event
Hub, process it, and write it to Snowflake Database table using
ForEachBatch with Epoch_Id/ Batch_Id passed to the foreach batch function.


*Code Snippet:*

ehConf = {}

ehConf['eventhubs.connectionString'] =
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EVENT_HUB_CONNECTION_STRING)

ehConf['eventhubs.consumerGroup'] = consumergroup



# Read stream data from event hub

spark_df = spark \

  .readStream \

  .format("eventhubs") \

  .options(**ehConf) \

  .load()



# Write to Snowflake

def foreach_batch_function(df, epoch_id):

       df.write\

            .format(SNOWFLAKE_SOURCE_NAME)\

            .options(**sfOptions)\

            .option("dbtable", snowflake_table)\

            .mode('append')\

            .save()



processed_df.writeStream.outputMode('append').\

    trigger(processingTime='10 seconds').\

    option("checkpointLocation",f"checkpoint/P1").\

    foreachBatch(foreach_batch_function).start()



*Observation:* When node failure occurs. Although on spark structured
official website, it is mentioned that when one uses ForeachBatch along
with epoch_id/batch_id during recovery from node failure there shouldn't be
any duplicates, but I do find duplicates getting populated in my snowflake
tables. Link for reference :
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
.


In my use case I wanted data to be populated in Snowflake without any
duplicates. Is there something which I am missing in my code OR Is it
achievable ?


Any suggestion/ solution to achieve the above would be helpful.



Eagerly waiting to hear from you.



Thank You,


Regards,

Vedant.

Reply via email to