If you input data already contains both the SensorID and FactoryID, why would the following not be sufficient?

DataStream<SensorEvent> sensorEvents = ...; sensorEvents .filter(sensorEvent -> sensorEvent.Status.equals("alerte")) .map(sensorEvent -> sensorEvent.FactoryID) .addSink(<output>)

If the problem is that you only want one factory alert to be raised if say, all sensors of a factory go haywire at once, then you're looking at a time window; e.g., to only fire at most one alert every hour:

DataStream<SensorEvent> sensorEvents = ...; sensorEvents .filter(sensorEvent -> sensorEvent.Status.equals("alerte")) .keyBy(sensorEvent -> sensorEvent.FactoryID) .timeWindow(Time.hours(1)) .apply((WindowFunction<SensorEvent, Alert, String, TimeWindow>) (factoryId, window, input, out) -> out.collect(new Alert(factoryId)));
.addSink(<output>);

Ultimately it would be good to understand what exactly you are struggling with, and what you have tried so far.

On 04/06/2020 15:45, Aissa Elaffani wrote:
Hello guys,
I have a use case, where I am receiving data from sensors about their status (Normal or Alerte), {SensorID:"1", FactoryID:"1", Status:"Normal" ..}, a factory can contain a lot of sensors, so what I want to do is, if the status of one sensor in a factory, is Alerte I want to raise an alerte for all the factory (the factory status must be alerte) ... I did a stream.keyBy("FactoryID").window(). can you please suggest me a window function that can fulfill my use case (if one sensor of a factory raises "alerte" I want the factory status to be "alerte") ... I hope someone can understand my use case !! Sorry for disturbing you, and thak you for your time !
Best,
Aissa


Reply via email to