Re: StructuredStreaming - processing data based on new events in Kafka topic

2022-03-13 Thread karan alang
data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 12 Mar 2022 at 09:33, Mich Talebzadeh 
> wrote:
>
>> How do you check if new data is in the topic and what happens if not?
>>
>> On Sat, 12 Mar 2022 at 00:40, karan alang  wrote:
>>
>>> Hello All,
>>>
>>> I have a structured Streaming program, which reads data from Kafka
>>> topic, and does some processing, and finally puts data into target Kafka
>>> Topic.
>>>
>>> Note : the processing is donee topic in function -
>>> convertToDictForEachBatch(), which is called using -
>>> foreachBatch(convertToDictForEachBatcha is in th)
>>>
>>> As part of the processing, it reads another Kafka Topic (events_topic),
>>> and if there is New record(s) after the last read, it does some additional
>>> processing - reloads data from BigQuery table, and persists it.
>>>
>>> Here is the code :
>>>
>>> ```
>>>
>>> df_stream = spark.readStream.format('kafka') \
>>> .option("kafka.security.protocol", "SSL") \
>>> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>>> .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>>> .option("kafka.ssl.keystore.location", ssl_keystore_location) \
>>> .option("kafka.ssl.keystore.password", ssl_keystore_password) \
>>> .option("kafka.bootstrap.servers",kafkaBrokers)\
>>> .option("subscribe", topic) \
>>> .option("kafka.group.id", consumerGroupId)\
>>> .option("startingOffsets", "latest") \
>>> .option("failOnDataLoss", "false") \
>>> .option("maxOffsetsPerTrigger", 1) \
>>> .load()
>>>
>>>
>>> print(" df_stream -> ", df_stream)
>>> query = df_stream.selectExpr("CAST(value AS STRING)", 
>>> "timestamp").writeStream \
>>> .outputMode("append") \
>>> .trigger(processingTime='4 minutes') \
>>> .option("numRows",1)\
>>> .option("truncate", "false") \
>>> .option("checkpointLocation", checkpoint) \
>>> .foreachBatch(convertToDictForEachBatch) \
>>> .start()
>>>
>>> query.awaitTermination()
>>>
>>> ```
>>>
>>> # called from - foreachbatch
>>> def convertToDictForEachBatch(df, batchId):
>>>
>>> # checks for event in topic - events_topic and further processing takes 
>>> place if there is new data in the topic
>>> events = spark.read.format('kafka') \
>>> .option("kafka.bootstrap.servers", kafkaBrokers) \
>>> .option("kafka.security.protocol", "SSL") \
>>> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>>> .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>>> .option("kafka.ssl.keystore.location", 
>>> ssl_keystore_location_reloadpred) \
>>> .option("kafka.ssl.keystore.password", 
>>> ssl_keystore_password_reloadpred) \
>>> .option("subscribe", topic_reloadpred) \
>>> .option("kafka.group.id", consumerGroupId_reloadpred) \
>>> .load()
>>>
>>> # events is passed to a function, and processing is done if new events 
>>> are generated
>>>
>>> ```
>>>
>>> What is the best way to achieve this ? The current code is reading the
>>> entire data in the kafka topic, i need it to read only the new data.
>>>
>>> Additional Details in stackoverflow :
>>>
>>>
>>> https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic
>>>
>>>
>>> tia!
>>>
>> --
>>
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: StructuredStreaming - processing data based on new events in Kafka topic

2022-03-12 Thread Mich Talebzadeh
ermination()
>>
>> ```
>>
>> # called from - foreachbatch
>> def convertToDictForEachBatch(df, batchId):
>>
>> # checks for event in topic - events_topic and further processing takes 
>> place if there is new data in the topic
>> events = spark.read.format('kafka') \
>> .option("kafka.bootstrap.servers", kafkaBrokers) \
>> .option("kafka.security.protocol", "SSL") \
>> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>> .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>> .option("kafka.ssl.keystore.location", 
>> ssl_keystore_location_reloadpred) \
>> .option("kafka.ssl.keystore.password", 
>> ssl_keystore_password_reloadpred) \
>> .option("subscribe", topic_reloadpred) \
>> .option("kafka.group.id", consumerGroupId_reloadpred) \
>> .load()
>>
>> # events is passed to a function, and processing is done if new events 
>> are generated
>>
>> ```
>>
>> What is the best way to achieve this ? The current code is reading the
>> entire data in the kafka topic, i need it to read only the new data.
>>
>> Additional Details in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic
>>
>>
>> tia!
>>
> --
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: StructuredStreaming - processing data based on new events in Kafka topic

2022-03-12 Thread Mich Talebzadeh
How do you check if new data is in the topic and what happens if not?

On Sat, 12 Mar 2022 at 00:40, karan alang  wrote:

> Hello All,
>
> I have a structured Streaming program, which reads data from Kafka topic,
> and does some processing, and finally puts data into target Kafka Topic.
>
> Note : the processing is donee topic in function -
> convertToDictForEachBatch(), which is called using -
> foreachBatch(convertToDictForEachBatcha is in th)
>
> As part of the processing, it reads another Kafka Topic (events_topic),
> and if there is New record(s) after the last read, it does some additional
> processing - reloads data from BigQuery table, and persists it.
>
> Here is the code :
>
> ```
>
> df_stream = spark.readStream.format('kafka') \
> .option("kafka.security.protocol", "SSL") \
> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
> .option("kafka.ssl.truststore.password", ssl_truststore_password) \
> .option("kafka.ssl.keystore.location", ssl_keystore_location) \
> .option("kafka.ssl.keystore.password", ssl_keystore_password) \
> .option("kafka.bootstrap.servers",kafkaBrokers)\
> .option("subscribe", topic) \
> .option("kafka.group.id", consumerGroupId)\
> .option("startingOffsets", "latest") \
> .option("failOnDataLoss", "false") \
> .option("maxOffsetsPerTrigger", 1) \
> .load()
>
>
> print(" df_stream -> ", df_stream)
> query = df_stream.selectExpr("CAST(value AS STRING)", 
> "timestamp").writeStream \
> .outputMode("append") \
> .trigger(processingTime='4 minutes') \
> .option("numRows",1)\
> .option("truncate", "false") \
> .option("checkpointLocation", checkpoint) \
> .foreachBatch(convertToDictForEachBatch) \
> .start()
>
> query.awaitTermination()
>
> ```
>
> # called from - foreachbatch
> def convertToDictForEachBatch(df, batchId):
>
> # checks for event in topic - events_topic and further processing takes 
> place if there is new data in the topic
> events = spark.read.format('kafka') \
> .option("kafka.bootstrap.servers", kafkaBrokers) \
> .option("kafka.security.protocol", "SSL") \
> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
> .option("kafka.ssl.truststore.password", ssl_truststore_password) \
> .option("kafka.ssl.keystore.location", 
> ssl_keystore_location_reloadpred) \
> .option("kafka.ssl.keystore.password", 
> ssl_keystore_password_reloadpred) \
> .option("subscribe", topic_reloadpred) \
> .option("kafka.group.id", consumerGroupId_reloadpred) \
> .load()
>
> # events is passed to a function, and processing is done if new events 
> are generated
>
> ```
>
> What is the best way to achieve this ? The current code is reading the
> entire data in the kafka topic, i need it to read only the new data.
>
> Additional Details in stackoverflow :
>
>
> https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic
>
>
> tia!
>
-- 



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


StructuredStreaming - processing data based on new events in Kafka topic

2022-03-11 Thread karan alang
Hello All,

I have a structured Streaming program, which reads data from Kafka topic,
and does some processing, and finally puts data into target Kafka Topic.

Note : the processing is done in function - convertToDictForEachBatch(),
which is called using - foreachBatch(convertToDictForEachBatch)

As part of the processing, it reads another Kafka Topic (events_topic), and
if there is New record(s) after the last read, it does some additional
processing - reloads data from BigQuery table, and persists it.

Here is the code :

```

df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 1) \
.load()


print(" df_stream -> ", df_stream)
query = df_stream.selectExpr("CAST(value AS STRING)",
"timestamp").writeStream \
.outputMode("append") \
.trigger(processingTime='4 minutes') \
.option("numRows",1)\
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()

query.awaitTermination()

```

# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):

# checks for event in topic - events_topic and further processing
takes place if there is new data in the topic
events = spark.read.format('kafka') \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location",
ssl_keystore_location_reloadpred) \
.option("kafka.ssl.keystore.password",
ssl_keystore_password_reloadpred) \
.option("subscribe", topic_reloadpred) \
.option("kafka.group.id", consumerGroupId_reloadpred) \
.load()

# events is passed to a function, and processing is done if new
events are generated

```

What is the best way to achieve this ? The current code is reading the
entire data in the kafka topic, i need it to read only the new data.

Additional Details in stackoverflow :

https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic


tia!