Hello All, I have a structured Streaming program, which reads data from Kafka topic, and does some processing, and finally puts data into target Kafka Topic.
Note : the processing is done in function - convertToDictForEachBatch(), which is called using - foreachBatch(convertToDictForEachBatch) As part of the processing, it reads another Kafka Topic (events_topic), and if there is New record(s) after the last read, it does some additional processing - reloads data from BigQuery table, and persists it. Here is the code : ``` df_stream = spark.readStream.format('kafka') \ .option("kafka.security.protocol", "SSL") \ .option("kafka.ssl.truststore.location", ssl_truststore_location) \ .option("kafka.ssl.truststore.password", ssl_truststore_password) \ .option("kafka.ssl.keystore.location", ssl_keystore_location) \ .option("kafka.ssl.keystore.password", ssl_keystore_password) \ .option("kafka.bootstrap.servers",kafkaBrokers)\ .option("subscribe", topic) \ .option("kafka.group.id", consumerGroupId)\ .option("startingOffsets", "latest") \ .option("failOnDataLoss", "false") \ .option("maxOffsetsPerTrigger", 10000) \ .load() print(" df_stream -> ", df_stream) query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").writeStream \ .outputMode("append") \ .trigger(processingTime='4 minutes') \ .option("numRows",10000)\ .option("truncate", "false") \ .option("checkpointLocation", checkpoint) \ .foreachBatch(convertToDictForEachBatch) \ .start() query.awaitTermination() ``` # called from - foreachbatch def convertToDictForEachBatch(df, batchId): # checks for event in topic - events_topic and further processing takes place if there is new data in the topic events = spark.read.format('kafka') \ .option("kafka.bootstrap.servers", kafkaBrokers) \ .option("kafka.security.protocol", "SSL") \ .option("kafka.ssl.truststore.location", ssl_truststore_location) \ .option("kafka.ssl.truststore.password", ssl_truststore_password) \ .option("kafka.ssl.keystore.location", ssl_keystore_location_reloadpred) \ .option("kafka.ssl.keystore.password", ssl_keystore_password_reloadpred) \ .option("subscribe", topic_reloadpred) \ .option("kafka.group.id", consumerGroupId_reloadpred) \ .load() # events is passed to a function, and processing is done if new events are generated ``` What is the best way to achieve this ? The current code is reading the entire data in the kafka topic, i need it to read only the new data. Additional Details in stackoverflow : https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic tia!