Hello All,

I've a Structured Streaming program running on GCP dataproc which reads
data from Kafka every 10 mins, and then does processing.
This is a multi-tenant system i.e. the program will read data from multiple
customers.

In my current code, i'm looping over the customers passing it to the 3
programs - P1, P2, P3
P1, P2, P3 are  classes where the bulk of the processing happens, and the
data is pushed back to kafka

```

def convertToDictForEachBatch(df, batchId):

# code to change syslog to required format - this is not  included here
since it is not relevant to the issue

for cust in hm.values():

# tdict_ap - has data specific to P2, filter code is not shown
p1 = P1(tdict_ap, spark, False, cust)


# tdict_ap - has data specific to P2, filter code is not shown
p2 = P2(tdict_ap, spark, False, cust)


# tdict_ap - has data specific to P3, filter code is not shown
p3 = P3(tdict_ap, spark, False, cust)



# df_stream = data read from Kafka, this calls function
convertToDictForEachBatch

query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
"topic").writeStream \
         .outputMode("append") \
         .trigger(processingTime='10 minutes') \
         .option("truncate", "false") \
         .option("checkpointLocation", checkpoint) \
         .foreachBatch(convertToDictForEachBatch) \
         .start()



```
In the above code, the processing is sequential .. I would like to make the
processing concurrent/asynchronous to the extent possible

Couple of options I'm considering :

1. Using asyncio
- from what i understand, this might improve the performance since for each
customer - it might allow processing
in 3 classes in asynchronous fashion

2.

use windowing to Partition dataframe by 'customer' + groupBy
- this should allow concurrency for the multiple customers if there are
sufficient executors

Here is the same code for this,
I think this might need to be done in each of the 3 classes P1, P2, P3

```

window = Window.partitionBy('cust')

all_DF = all_DF.repartition('applianceName', 'cktName').cache()
results = (
    all_DF
    .groupBy('cust')
    .apply(<function>)
    )

```

Here is the stackoverflow link with the details :

https://stackoverflow.com/questions/72636814/spark-structured-streamingbatch-mode-running-dependent-jobs-concurrently

Pls advise on what is the best way to achieve this ?

tia!

Reply via email to