A good feature of spark structured streaming is that it can join the static dataframe with the streaming dataframe. To cite an example as below. users is a static dataframe read from database. transactionStream is from a stream. By the joining operation, we can get the spending of each country accumulated with the new arrival of batches.
val spendingByCountry = (transactionStream .join(users, users("id") === transactionStream("userid")) .groupBy($"country") .agg(sum($"cost")) as "spending") spendingByContry.writeStream .outputMode("complete") .format("console") .start() The sum of cost is aggregated with the arrival of new batches as shown below. ------------------------------- Batch: 0 ------------------------------- Country Spending EN 90.0 FR 50.0 ------------------------------- Batch: 1 ------------------------------- Country Spending EN 190.0 FR 150.0 If I want to introduce a notification and reset logic as the above example, what should be the correct approach? The requirement is that if the spending is larger than some threshold, the records of country and spending should be stored into a table and the spending should be reset as 0 to accumulate again.