A good feature of spark structured streaming is that it can join the static
dataframe with the streaming dataframe. To cite an example as below. users
is a static dataframe read from database. transactionStream is from a
stream. By the joining operation, we can get the spending of each country
accumulated with the new arrival of batches.

val spendingByCountry = (transactionStream
    .join(users, users("id") === transactionStream("userid"))
    .groupBy($"country")
    .agg(sum($"cost")) as "spending")

spendingByContry.writeStream
    .outputMode("complete")
    .format("console")
    .start()

The sum of cost is aggregated with the arrival of new batches as shown
below.

-------------------------------
Batch: 0
-------------------------------
Country Spending
EN      90.0
FR      50.0

-------------------------------
Batch: 1
-------------------------------
Country Spending
EN      190.0
FR      150.0

If I want to introduce a notification and reset logic as the above example,
what should be the correct approach? The requirement is that if the
spending is larger than some threshold, the records of country and spending
should be stored into a table and the spending should be reset as 0 to
accumulate again.

Reply via email to