If you input data already contains both the SensorID and FactoryID, why
would the following not be sufficient?
DataStream<SensorEvent> sensorEvents = ...; sensorEvents
.filter(sensorEvent -> sensorEvent.Status.equals("alerte"))
.map(sensorEvent -> sensorEvent.FactoryID) .addSink(<output>)
If the problem is that you only want one factory alert to be raised if
say, all sensors of a factory go haywire at once, then you're looking at
a time window; e.g., to only fire at most one alert every hour:
DataStream<SensorEvent> sensorEvents = ...; sensorEvents
.filter(sensorEvent -> sensorEvent.Status.equals("alerte"))
.keyBy(sensorEvent -> sensorEvent.FactoryID) .timeWindow(Time.hours(1))
.apply((WindowFunction<SensorEvent, Alert, String, TimeWindow>)
(factoryId, window, input, out) -> out.collect(new Alert(factoryId)));
.addSink(<output>);
Ultimately it would be good to understand what exactly you are
struggling with, and what you have tried so far.
On 04/06/2020 15:45, Aissa Elaffani wrote:
Hello guys,
I have a use case, where I am receiving data from sensors about their
status (Normal or Alerte), {SensorID:"1", FactoryID:"1",
Status:"Normal" ..}, a factory can contain a lot of sensors, so what I
want to do is, if the status of one sensor in a factory, is Alerte I
want to raise an alerte for all the factory (the factory status must
be alerte) ... I did a
stream.keyBy("FactoryID").window(). can you please suggest me a
window function that can fulfill my use case (if one sensor of a
factory raises "alerte" I want the factory status to be "alerte") ...
I hope someone can understand my use case !! Sorry for disturbing you,
and thak you for your time !
Best,
Aissa