Hi Hemant, Why not using simple connected streams, one containing the measurements, and the other being the control stream with the thresholds which are updated from time to time. Both will be keyed by the device class, to make sure that the measurements and the thresholds for a specific device class will go to the same machines.
The "current" thresholds you keep them in state as they come from the control stream and the measurements you also keep them in a mapState keyed by their timestamp. When an element comes from the measurements side, your KeyedCoProcessFunction fetches the thresholds from the "control state" and goes to the elements state and fetches all the elements for N units of time in the past and does the computation and purges measurements that are too old to be useful (so that your state does not grow indefinitely). This solution does not use CEP but it gives you the freedom to do any optimisations related to your usecase. I hope this helps, Kostas On Wed, Feb 12, 2020 at 10:40 AM hemant singh <hemant2...@gmail.com> wrote: > > Hello Flink Users, > > I have a requirement to generate alerts for metrics like for example - if cpu > utilization spike i.e cpu_utilization > threshold (>90%) n number of time in > x minutes then generate alerts. For this I am using the CEP module. However, > one of the requirements is for different devices the threshold can be > different as ell as x and n in above statement. Moreover, for different > device class this will be different, also this can change in future. > I am thinking of using Broadcast State Pattern and enrich the metrics stream > with this thresholds & rule and use it later in CEP pattern. One issue is how > to make sure that if new threshold values come in how the broadcast stream > will change. I have an understanding that if I can introduce a watermark in > broadcast stream when values change the KeyedBroadcastProcessFunction will > have latest values streamed. > Is my understanding correct and if anyone has implemented something like this > can weigh in if this is right way to do it. > > Thanks, > Hemant >