Hello All,
I've a Structured Streaming program running on GCP dataproc which reads
data from Kafka every 10 mins, and then does processing.
This is a multi-tenant system i.e. the program will read data from multiple
customers.
In my current code, i'm looping over the customers passing it to the 3
programs - P1, P2, P3
P1, P2, P3 are classes where the bulk of the processing happens, and the
data is pushed back to kafka
```
def convertToDictForEachBatch(df, batchId):
# code to change syslog to required format - this is not included here
since it is not relevant to the issue
for cust in hm.values():
# tdict_ap - has data specific to P2, filter code is not shown
p1 = P1(tdict_ap, spark, False, cust)
# tdict_ap - has data specific to P2, filter code is not shown
p2 = P2(tdict_ap, spark, False, cust)
# tdict_ap - has data specific to P3, filter code is not shown
p3 = P3(tdict_ap, spark, False, cust)
# df_stream = data read from Kafka, this calls function
convertToDictForEachBatch
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
"topic").writeStream \
.outputMode("append") \
.trigger(processingTime='10 minutes') \
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()
```
In the above code, the processing is sequential .. I would like to make the
processing concurrent/asynchronous to the extent possible
Couple of options I'm considering :
1. Using asyncio
- from what i understand, this might improve the performance since for each
customer - it might allow processing
in 3 classes in asynchronous fashion
2.
use windowing to Partition dataframe by 'customer' + groupBy
- this should allow concurrency for the multiple customers if there are
sufficient executors
Here is the same code for this,
I think this might need to be done in each of the 3 classes P1, P2, P3
```
window = Window.partitionBy('cust')
all_DF = all_DF.repartition('applianceName', 'cktName').cache()
results = (
all_DF
.groupBy('cust')
.apply(<function>)
)
```
Here is the stackoverflow link with the details :
https://stackoverflow.com/questions/72636814/spark-structured-streamingbatch-mode-running-dependent-jobs-concurrently
Pls advise on what is the best way to achieve this ?
tia!