Hi,

we have the following use case for stream processing. We have incoming
events of the following form: (itemID, orderID, orderStatus, timestamp).
There are several itemIDs for each orderID, each with its own timestamp.
The orderStatus can be "created" and "sent". The first incoming itemID
represents "order created" and the last "order sent". We need to issue an
alarm if those events occur over a certain time period, i.e. the difference
of their timestamps exceeds a threshold.

I have implemented this as follows.

source
|
mapToPair
|
|
|----------------------------------
|(1)                                    |
updateStateByKey             |(2)
|                                       |
coGroup----------------------|
|
|(3)
|
map
|
sink


In *mapToPair* I put the orderID in the pair's first position and the
remaining fields as a Tuple in the second.
In the state I set a flag when both itemIDs for "created" and "sent" are
received and their time difference is computed. If it's bigger than the
threshold, the flag is false. (*updateStateByKey*)
I use *coGroup* and a *map* in order to be able to filter the initial
events and forward them to the sink. The grouping is done by orderID.

The problem is that the coGroup's output doesn't contain all initial
itemIDs for each orderID key. Thus the result is incorrect.

Do you guys have any idea what could cause this issue? Am I
missing/overlooking something?

Best,
Mihail

Reply via email to