
2024-03-21 Thread Mich Talebzadeh
You can try this

val kafkaReadStream = spark
  .option("kafka.bootstrap.servers", broker)
  .option("subscribe", topicName)
  .option("startingOffsets", startingOffsetsMode)
  .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)

  .foreachBatch((df: DataFrame, batchId: Long) => sendToSink(df, batchId))
  .trigger(Trigger.ProcessingTime(s"${triggerProcessingTime} seconds"))
  .option("checkpointLocation", checkpoint_path)

Notice the function sendToSink

The foreachBatch method ensures that the sendToSink function is called for
each micro-batch, regardless of whether the DataFrame contains data or not.

Let us look at that function

import org.apache.spark.sql.functions._
def sendToSink(df: DataFrame, batchId: Long): Unit = {

  if (!df.isEmpty) {
println(s"From sendToSink, batchId is $batchId, at
//, false)
// Write to BigQuery batch table
// s.writeTableToBQ(df, "append",
// println("wrote to DB")
  } else {
println("DataFrame df is empty")

If the DataFrame is empty, it prints a message indicating that the
DataFrame is empty. You can of course adapt it for your case


Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
United Kingdom

   view my Linkedin profile

*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".

On Thu, 21 Mar 2024 at 23:14, Рамик И  wrote:

> Hi!
> I want to exucute code inside forEachBatch that will trigger regardless of
> whether there is data in the batch or not.
> val kafkaReadStream = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", broker)
> .option("subscribe", topicName)
> .option("startingOffsets", startingOffsetsMode)
> .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
> .load()
> kafkaReadStream
> .writeStream
> .trigger(Trigger.ProcessingTime(s"$triggerProcessingTime seconds"))
> .foreachBatch {
> }
> .start()
> .awaitTermination()

Bug in org.apache.spark.util.sketch.BloomFilter

2024-03-21 Thread Nathan Conroy
Hi All,

I believe that there is a bug that affects the Spark BloomFilter implementation 
when creating a bloom filter with large n. Since this implementation uses 
integer hash functions, it doesn’t work properly when the number of bits 
exceeds MAX_INT.

I asked a question about this on stackoverflow, but didn’t get a satisfactory 
answer. I believe I know what is causing the bug and have documented my 
reasoning there as well:

I would just go ahead and create a Jira ticket on the spark jira board, but I’m 
still waiting to hear back regarding getting my account set up.

Huge thanks if anyone can help!


[no subject]

2024-03-21 Thread Рамик И
I want to exucute code inside forEachBatch that will trigger regardless of
whether there is data in the batch or not.

val kafkaReadStream = spark
.option("kafka.bootstrap.servers", broker)
.option("subscribe", topicName)
.option("startingOffsets", startingOffsetsMode)
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)

.trigger(Trigger.ProcessingTime(s"$triggerProcessingTime seconds"))
.foreachBatch {
