You can try this val kafkaReadStream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", broker) .option("subscribe", topicName) .option("startingOffsets", startingOffsetsMode) .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger) .load()
kafkaReadStream .writeStream .foreachBatch((df: DataFrame, batchId: Long) => sendToSink(df, batchId)) .trigger(Trigger.ProcessingTime(s"${triggerProcessingTime} seconds")) .option("checkpointLocation", checkpoint_path) .start() .awaitTermination() Notice the function sendToSink The foreachBatch method ensures that the sendToSink function is called for each micro-batch, regardless of whether the DataFrame contains data or not. Let us look at that function import org.apache.spark.sql.functions._ def sendToSink(df: DataFrame, batchId: Long): Unit = { if (!df.isEmpty) { println(s"From sendToSink, batchId is $batchId, at ${java.time.LocalDateTime.now()}") // df.show(100, false) df.persist() // Write to BigQuery batch table // s.writeTableToBQ(df, "append", config.getString("MDVariables.targetDataset"), config.getString("MDVariables.targetTable")) df.unpersist() // println("wrote to DB") } else { println("DataFrame df is empty") } } If the DataFrame is empty, it prints a message indicating that the DataFrame is empty. You can of course adapt it for your case HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 21 Mar 2024 at 23:14, Рамик И <ramik...@gmail.com> wrote: > > Hi! > I want to exucute code inside forEachBatch that will trigger regardless of > whether there is data in the batch or not. > > > val kafkaReadStream = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", broker) > .option("subscribe", topicName) > .option("startingOffsets", startingOffsetsMode) > .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger) > .load() > > > kafkaReadStream > .writeStream > .trigger(Trigger.ProcessingTime(s"$triggerProcessingTime seconds")) > .foreachBatch { > > .... > } > .start() > .awaitTermination() >