Alright, maybe my example needs to be more concrete. How about this: In this example, I don't want to create to windows just to re-combine what was just aggregated in SQL. Is there a way to transform the aggregate results into one datastream object so that I don't have to aggregate again?
// aggregate this stream for 15 minutes final Table employeeDailyPurchasesTable = tableEnv.sqlQuery("SELECT\n" + " t.organization_id, t.department_id, s.date, s.employee_id, t.fullName, t.dob, SUM(s.purchase) AS purchases\n" + "FROM\n" + " employee_purchases s\n" + "LEFT JOIN\n" + " employees FOR SYSTEM_TIME AS OF s.procTime AS t ON t.organization = s.organization AND t.department = s.department AND t.employee_id = s.employee_id\n" + "GROUP BY\n" + " TUMBLE(s.procTime, INTERVAL '15' MINUTE), t.organization_id, t.department_id, s.date, s.employee_id, t.fullName, t.dob"); // now I want everything that was just aggregated processed together, // below gives me each row again in a stream final DataStream<Row> employeeDailyPurchasesDataStream = tableEnv.toAppendStream(employeeDailyPurchasesTable, Row.class); // so, do I need to wait another 15 minutes to aggregate this? It was just aggregated for 15 minutes above! // how do I get the previous aggregated results into one object so that I don't have to wait and aggregate it again final DataStream<DailyEmployeePurchases> aggregatedAgainBecauseINeedHelp = employeeDailyPurchasesDataStream .keyBy(0, 1, 2) .window(TumblingProcessingTimeWindows.of(Time.minutes(15))) .aggregate(new AggregateFunction<Row, DailyEmployeePurchases, DailyEmployeePurchases>() { @Override public DailyEmployeePurchases createAccumulator() { return new DailyEmployeePurchases(); } @Override public DailyEmployeePurchases add(Row value, DailyEmployeePurchases accumulator) { return accumulator.add(value); } @Override public DailyEmployeePurchases getResult(DailyEmployeePurchases accumulator) { return accumulator; } @Override public DailyEmployeePurchases merge(DailyEmployeePurchases a, DailyEmployeePurchases b) { return a.merge(b); } }); // important business logic that needs to be applied to the group of employees aggregatedAgainBecauseINeedHelp.keyBy("organizationId", "departmentId") .process(new KeyedProcessFunction<Tuple, DailyEmployeePurchases, DailyEmployeePurchases>() { @Override public void processElement(DailyEmployeePurchases value, Context ctx, Collector<DailyEmployeePurchases> out) throws Exception { // very important stuff here } });