I've a Structured Streaming program running on GCP dataproc which reads
data from Kafka every 10 mins, and then does processing.
This is a multi-tenant system i.e. the program will read data from multiple

In my current code, i'm looping over the customers passing it to the 3
programs - P1, P2, P3
P1, P2, P3 are  classes where the bulk of the processing happens, and the
data is pushed back to kafka


def convertToDictForEachBatch(df, batchId):

# code to change syslog to required format - this is not  included here
since it is not relevant to the issue

for cust in hm.values():

# tdict_ap - has data specific to P2, filter code is not shown
p1 = P1(tdict_ap, spark, False, cust)

# tdict_ap - has data specific to P2, filter code is not shown
p2 = P2(tdict_ap, spark, False, cust)

# tdict_ap - has data specific to P3, filter code is not shown
p3 = P3(tdict_ap, spark, False, cust)

# df_stream = data read from Kafka, this calls function

query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
"topic").writeStream \
         .outputMode("append") \
         .trigger(processingTime='10 minutes') \
         .option("truncate", "false") \
         .option("checkpointLocation", checkpoint) \
         .foreachBatch(convertToDictForEachBatch) \

In the above code, the processing is sequential .. I would like to make the
processing concurrent/asynchronous to the extent possible

Couple of options I'm considering :

1. Using asyncio
- from what i understand, this might improve the performance since for each
customer - it might allow processing
in 3 classes in asynchronous fashion


use windowing to Partition dataframe by 'customer' + groupBy
- this should allow concurrency for the multiple customers if there are
sufficient executors

Here is the same code for this,
I think this might need to be done in each of the 3 classes P1, P2, P3


window = Window.partitionBy('cust')

all_DF = all_DF.repartition('applianceName', 'cktName').cache()
results = (


