Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4626
  
    I think it would be nice to have a utility here in order to make this 
easier to use:
    ```java
    DataStream result =
        Utils.withPreaggregation(
            stream.timeWindow(Time.minutes(5)), 
            myAggregateFunction
        )
        .apply(windowFunction);
    ```
    
    The utility would basically take the aggregate function and insert the 
stream transformation for the pre-aggregation on the "*predecessor* or the 
keyed stream, and then set up the `WindowedStream` again.
    
    Pseudo code:
    ```java
    public static <T, K, W extends Window, A> WindowedStream<T, K, W> 
preaggregate(
            WindowedStream<T, K, W> windowedStream,
            AggregateFunction<T, A, T> preAggregator) {
    
       // sanity check that the windowedStream has no custom trigger and evictor
    
       PreAggregationOperator preAggOp = new 
PreAggregationOperator(preAggregator, properties from windowed stream);
    
        DataStream<T> originalStream = 'get predecessor before keyBy from 
windowed stream'
        DataStream<T> preAggregated = originalStream.transform(preAggOp , ...);
    
        WindowedStream<T, K, W> windowedAgain = preAggregated
            .keyBy(key extractor from original windowed stream)
            .window(assigner);
    
        return windowedAgain;
    }
    ```


---

Reply via email to