Re:

2024-03-21 Thread Mich Talebzadeh
You can try this

val kafkaReadStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker)
  .option("subscribe", topicName)
  .option("startingOffsets", startingOffsetsMode)
  .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
  .load()

kafkaReadStream
  .writeStream
  .foreachBatch((df: DataFrame, batchId: Long) => sendToSink(df, batchId))
  .trigger(Trigger.ProcessingTime(s"${triggerProcessingTime} seconds"))
  .option("checkpointLocation", checkpoint_path)
  .start()
  .awaitTermination()

Notice the function sendToSink

The foreachBatch method ensures that the sendToSink function is called for
each micro-batch, regardless of whether the DataFrame contains data or not.

Let us look at that function

import org.apache.spark.sql.functions._
def sendToSink(df: DataFrame, batchId: Long): Unit = {

  if (!df.isEmpty) {
println(s"From sendToSink, batchId is $batchId, at
${java.time.LocalDateTime.now()}")
// df.show(100, false)
df.persist()
// Write to BigQuery batch table
// s.writeTableToBQ(df, "append",
config.getString("MDVariables.targetDataset"),
config.getString("MDVariables.targetTable"))
df.unpersist()
// println("wrote to DB")
  } else {
println("DataFrame df is empty")
  }
}

If the DataFrame is empty, it prints a message indicating that the
DataFrame is empty. You can of course adapt it for your case

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 21 Mar 2024 at 23:14, Рамик И  wrote:

>
> Hi!
> I want to exucute code inside forEachBatch that will trigger regardless of
> whether there is data in the batch or not.
>
>
> val kafkaReadStream = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", broker)
> .option("subscribe", topicName)
> .option("startingOffsets", startingOffsetsMode)
> .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
> .load()
>
>
> kafkaReadStream
> .writeStream
> .trigger(Trigger.ProcessingTime(s"$triggerProcessingTime seconds"))
> .foreachBatch {
>
> 
> }
> .start()
> .awaitTermination()
>


Bug in org.apache.spark.util.sketch.BloomFilter

2024-03-21 Thread Nathan Conroy
Hi All,

I believe that there is a bug that affects the Spark BloomFilter implementation 
when creating a bloom filter with large n. Since this implementation uses 
integer hash functions, it doesn’t work properly when the number of bits 
exceeds MAX_INT.

I asked a question about this on stackoverflow, but didn’t get a satisfactory 
answer. I believe I know what is causing the bug and have documented my 
reasoning there as well:

https://stackoverflow.com/questions/78162973/why-is-observed-false-positive-rate-in-spark-bloom-filter-higher-than-expected

I would just go ahead and create a Jira ticket on the spark jira board, but I’m 
still waiting to hear back regarding getting my account set up.

Huge thanks if anyone can help!

-N


[no subject]

2024-03-21 Thread Рамик И
Hi!
I want to exucute code inside forEachBatch that will trigger regardless of
whether there is data in the batch or not.


val kafkaReadStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker)
.option("subscribe", topicName)
.option("startingOffsets", startingOffsetsMode)
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()


kafkaReadStream
.writeStream
.trigger(Trigger.ProcessingTime(s"$triggerProcessingTime seconds"))
.foreachBatch {


}
.start()
.awaitTermination()