Hello Has anyone work this way? I am asking because I have to get the aggregation ( Sum and Count) for multiple windows size (10 mins, 20 mins, 30 mins) please let me know if this works properly or is there other good solution.
DataStream<String> data = ... // append a Long 1 to each record to count it. DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne); DataStream<Tuple2<String, Long>> 1minCnts = withOnes // key by String field .keyBy(0) // define time window .timeWindow(Time.of(1, MINUTES)) // sum ones of the Long field // in practice you want to use an incrementally aggregating ReduceFunction and // a WindowFunction to extract the start/end timestamp of the window .sum(1); // emit 1-min counts to wherever you need it 1minCnts.addSink(new YourSink()); // compute 5-min counts based on 1-min counts DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts // key by String field .keyBy(0) // define time window of 5 minutes .timeWindow(Time.of(5, MINUTES)) // sum the 1-minute counts in the Long field .sum(1); // emit 5-min counts to wherever you need it 5minCnts.addSink(new YourSink()); // continue with 1 day window and 1 week window Thank you Regards