Hello All,

I have a structured Streaming program, which reads data from Kafka topic,
and does some processing, and finally puts data into target Kafka Topic.

Note : the processing is done in function - convertToDictForEachBatch(),
which is called using - foreachBatch(convertToDictForEachBatch)

As part of the processing, it reads another Kafka Topic (events_topic), and
if there is New record(s) after the last read, it does some additional
processing - reloads data from BigQuery table, and persists it.

Here is the code :

```

df_stream = spark.readStream.format('kafka') \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location", ssl_keystore_location) \
        .option("kafka.ssl.keystore.password", ssl_keystore_password) \
        .option("kafka.bootstrap.servers",kafkaBrokers)\
        .option("subscribe", topic) \
        .option("kafka.group.id", consumerGroupId)\
        .option("startingOffsets", "latest") \
        .option("failOnDataLoss", "false") \
        .option("maxOffsetsPerTrigger", 10000) \
        .load()


    print(" df_stream -> ", df_stream)
    query = df_stream.selectExpr("CAST(value AS STRING)",
"timestamp").writeStream \
        .outputMode("append") \
        .trigger(processingTime='4 minutes') \
        .option("numRows",10000)\
        .option("truncate", "false") \
        .option("checkpointLocation", checkpoint) \
        .foreachBatch(convertToDictForEachBatch) \
        .start()

    query.awaitTermination()

```

# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):

    # checks for event in topic - events_topic and further processing
takes place if there is new data in the topic
    events = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers", kafkaBrokers) \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location",
ssl_keystore_location_reloadpred) \
        .option("kafka.ssl.keystore.password",
ssl_keystore_password_reloadpred) \
        .option("subscribe", topic_reloadpred) \
        .option("kafka.group.id", consumerGroupId_reloadpred) \
        .load()

    # events is passed to a function, and processing is done if new
events are generated

```

What is the best way to achieve this ? The current code is reading the
entire data in the kafka topic, i need it to read only the new data.

Additional Details in stackoverflow :

https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic


tia!

Reply via email to