In SSS writeStream. \ outputMode('append'). \ option("truncate", "false"). \ * foreachBatch(SendToBigQuery). \* option('checkpointLocation', checkpoint_path). \
so this writeStream will call foreachBatch(<function_name>) """ "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function foreachBatch(SendToBigQuery) expects 2 parameters, first:* micro-batch as DataFrame or Dataset and second: unique id for each batch* Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table that does this def SendToBigQuery(df, batchId): if(len(df.take(1))) > 0: print(batchId) # do your logic else: print("DataFrame is empty") You should also have it in option('checkpointLocation', checkpoint_path). See this article on mine Processing Change Data Capture with Spark Structured Streaming <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=QuTPDwvXSqWKWsAi7z611Q%3D%3D> HTH Mich Talebzadeh, Solutions Architect/Engineering Lead Palantir Technologies Limited London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 26 Jun 2023 at 06:01, Anil Dasari <adas...@guidewire.com> wrote: > Hi, > I am using spark 3.3.1 distribution and spark stream in my application. Is > there a way to add a microbatch id to all logs generated by spark and spark > applications ? > > Thanks. >