Hi, we have the following use case for stream processing. We have incoming events of the following form: (itemID, orderID, orderStatus, timestamp). There are several itemIDs for each orderID, each with its own timestamp. The orderStatus can be "created" and "sent". The first incoming itemID represents "order created" and the last "order sent". We need to issue an alarm if those events occur over a certain time period, i.e. the difference of their timestamps exceeds a threshold.
I have implemented this as follows. source | mapToPair | | |---------------------------------- |(1) | updateStateByKey |(2) | | coGroup----------------------| | |(3) | map | sink In *mapToPair* I put the orderID in the pair's first position and the remaining fields as a Tuple in the second. In the state I set a flag when both itemIDs for "created" and "sent" are received and their time difference is computed. If it's bigger than the threshold, the flag is false. (*updateStateByKey*) I use *coGroup* and a *map* in order to be able to filter the initial events and forward them to the sink. The grouping is done by orderID. The problem is that the coGroup's output doesn't contain all initial itemIDs for each orderID key. Thus the result is incorrect. Do you guys have any idea what could cause this issue? Am I missing/overlooking something? Best, Mihail