You can try this

val kafkaReadStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker)
  .option("subscribe", topicName)
  .option("startingOffsets", startingOffsetsMode)
  .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
  .load()

kafkaReadStream
  .writeStream
  .foreachBatch((df: DataFrame, batchId: Long) => sendToSink(df, batchId))
  .trigger(Trigger.ProcessingTime(s"${triggerProcessingTime} seconds"))
  .option("checkpointLocation", checkpoint_path)
  .start()
  .awaitTermination()

Notice the function sendToSink

The foreachBatch method ensures that the sendToSink function is called for
each micro-batch, regardless of whether the DataFrame contains data or not.

Let us look at that function

import org.apache.spark.sql.functions._
def sendToSink(df: DataFrame, batchId: Long): Unit = {

  if (!df.isEmpty) {
    println(s"From sendToSink, batchId is $batchId, at
${java.time.LocalDateTime.now()}")
    // df.show(100, false)
    df.persist()
    // Write to BigQuery batch table
    // s.writeTableToBQ(df, "append",
config.getString("MDVariables.targetDataset"),
config.getString("MDVariables.targetTable"))
    df.unpersist()
    // println("wrote to DB")
  } else {
    println("DataFrame df is empty")
  }
}

If the DataFrame is empty, it prints a message indicating that the
DataFrame is empty. You can of course adapt it for your case

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 21 Mar 2024 at 23:14, Рамик И <ramik...@gmail.com> wrote:

>
> Hi!
> I want to exucute code inside forEachBatch that will trigger regardless of
> whether there is data in the batch or not.
>
>
> val kafkaReadStream = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", broker)
> .option("subscribe", topicName)
> .option("startingOffsets", startingOffsetsMode)
> .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
> .load()
>
>
> kafkaReadStream
> .writeStream
> .trigger(Trigger.ProcessingTime(s"$triggerProcessingTime seconds"))
> .foreachBatch {
>
>     ....
> }
> .start()
> .awaitTermination()
>

Reply via email to