How do you check if new data is in the topic and what happens if not?

On Sat, 12 Mar 2022 at 00:40, karan alang <karan.al...@gmail.com> wrote:

> Hello All,
>
> I have a structured Streaming program, which reads data from Kafka topic,
> and does some processing, and finally puts data into target Kafka Topic.
>
> Note : the processing is donee topic in function -
> convertToDictForEachBatch(), which is called using -
> foreachBatch(convertToDictForEachBatcha is in th)
>
> As part of the processing, it reads another Kafka Topic (events_topic),
> and if there is New record(s) after the last read, it does some additional
> processing - reloads data from BigQuery table, and persists it.
>
> Here is the code :
>
> ```
>
> df_stream = spark.readStream.format('kafka') \
>         .option("kafka.security.protocol", "SSL") \
>         .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>         .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>         .option("kafka.ssl.keystore.location", ssl_keystore_location) \
>         .option("kafka.ssl.keystore.password", ssl_keystore_password) \
>         .option("kafka.bootstrap.servers",kafkaBrokers)\
>         .option("subscribe", topic) \
>         .option("kafka.group.id", consumerGroupId)\
>         .option("startingOffsets", "latest") \
>         .option("failOnDataLoss", "false") \
>         .option("maxOffsetsPerTrigger", 10000) \
>         .load()
>
>
>     print(" df_stream -> ", df_stream)
>     query = df_stream.selectExpr("CAST(value AS STRING)", 
> "timestamp").writeStream \
>         .outputMode("append") \
>         .trigger(processingTime='4 minutes') \
>         .option("numRows",10000)\
>         .option("truncate", "false") \
>         .option("checkpointLocation", checkpoint) \
>         .foreachBatch(convertToDictForEachBatch) \
>         .start()
>
>     query.awaitTermination()
>
> ```
>
> # called from - foreachbatch
> def convertToDictForEachBatch(df, batchId):
>
>     # checks for event in topic - events_topic and further processing takes 
> place if there is new data in the topic
>     events = spark.read.format('kafka') \
>         .option("kafka.bootstrap.servers", kafkaBrokers) \
>         .option("kafka.security.protocol", "SSL") \
>         .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>         .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>         .option("kafka.ssl.keystore.location", 
> ssl_keystore_location_reloadpred) \
>         .option("kafka.ssl.keystore.password", 
> ssl_keystore_password_reloadpred) \
>         .option("subscribe", topic_reloadpred) \
>         .option("kafka.group.id", consumerGroupId_reloadpred) \
>         .load()
>
>     # events is passed to a function, and processing is done if new events 
> are generated
>
> ```
>
> What is the best way to achieve this ? The current code is reading the
> entire data in the kafka topic, i need it to read only the new data.
>
> Additional Details in stackoverflow :
>
>
> https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic
>
>
> tia!
>
-- 



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to