Hello All, I've a Structured Streaming program running on GCP dataproc which reads data from Kafka every 10 mins, and then does processing. This is a multi-tenant system i.e. the program will read data from multiple customers.
In my current code, i'm looping over the customers passing it to the 3 programs - P1, P2, P3 P1, P2, P3 are classes where the bulk of the processing happens, and the data is pushed back to kafka ``` def convertToDictForEachBatch(df, batchId): # code to change syslog to required format - this is not included here since it is not relevant to the issue for cust in hm.values(): # tdict_ap - has data specific to P2, filter code is not shown p1 = P1(tdict_ap, spark, False, cust) # tdict_ap - has data specific to P2, filter code is not shown p2 = P2(tdict_ap, spark, False, cust) # tdict_ap - has data specific to P3, filter code is not shown p3 = P3(tdict_ap, spark, False, cust) # df_stream = data read from Kafka, this calls function convertToDictForEachBatch query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", "topic").writeStream \ .outputMode("append") \ .trigger(processingTime='10 minutes') \ .option("truncate", "false") \ .option("checkpointLocation", checkpoint) \ .foreachBatch(convertToDictForEachBatch) \ .start() ``` In the above code, the processing is sequential .. I would like to make the processing concurrent/asynchronous to the extent possible Couple of options I'm considering : 1. Using asyncio - from what i understand, this might improve the performance since for each customer - it might allow processing in 3 classes in asynchronous fashion 2. use windowing to Partition dataframe by 'customer' + groupBy - this should allow concurrency for the multiple customers if there are sufficient executors Here is the same code for this, I think this might need to be done in each of the 3 classes P1, P2, P3 ``` window = Window.partitionBy('cust') all_DF = all_DF.repartition('applianceName', 'cktName').cache() results = ( all_DF .groupBy('cust') .apply(<function>) ) ``` Here is the stackoverflow link with the details : https://stackoverflow.com/questions/72636814/spark-structured-streamingbatch-mode-running-dependent-jobs-concurrently Pls advise on what is the best way to achieve this ? tia!