Hello

Has anyone work this way? I am asking because I have to get the aggregation
( Sum and Count) for multiple windows size  (10 mins, 20 mins, 30 mins)
please let me know if this works properly or is there other good solution.


DataStream<String> data = ...
// append a Long 1 to each record to count it.
DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne);

DataStream<Tuple2<String, Long>> 1minCnts = withOnes
  // key by String field
  .keyBy(0)
  // define time window
  .timeWindow(Time.of(1, MINUTES))
  // sum ones of the Long field
  // in practice you want to use an incrementally aggregating
ReduceFunction and
  // a WindowFunction to extract the start/end timestamp of the window
  .sum(1);

// emit 1-min counts to wherever you need it
1minCnts.addSink(new YourSink());

// compute 5-min counts based on 1-min counts
DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts
  // key by String field
  .keyBy(0)
  // define time window of 5 minutes
  .timeWindow(Time.of(5, MINUTES))
  // sum the 1-minute counts in the Long field
  .sum(1);

// emit 5-min counts to wherever you need it
5minCnts.addSink(new YourSink());

// continue with 1 day window and 1 week window

Thank you Regards

Reply via email to