Alright, maybe my example needs to be more concrete. How about this:
In this example, I don't want to create to windows just to re-combine
what was just aggregated in SQL.  Is there a way to transform the aggregate
results into one datastream object so that I don't have to aggregate again?


// aggregate this stream for 15 minutes
final Table employeeDailyPurchasesTable = tableEnv.sqlQuery("SELECT\n" +
      "    t.organization_id, t.department_id, s.date, s.employee_id,
t.fullName, t.dob, SUM(s.purchase) AS purchases\n" +
      "FROM\n" +
      "    employee_purchases s\n" +
      "LEFT JOIN\n" +
      "    employees FOR SYSTEM_TIME AS OF s.procTime AS t ON
t.organization = s.organization AND t.department = s.department AND
t.employee_id = s.employee_id\n" +
      "GROUP BY\n" +
      "    TUMBLE(s.procTime, INTERVAL '15' MINUTE),
t.organization_id, t.department_id, s.date, s.employee_id, t.fullName,
t.dob");

// now I want everything that was just aggregated processed together,
// below gives me each row again in a stream
final DataStream<Row> employeeDailyPurchasesDataStream =
tableEnv.toAppendStream(employeeDailyPurchasesTable, Row.class);

// so, do I need to wait another 15 minutes to aggregate this? It was
just aggregated for 15 minutes above!
// how do I get the previous aggregated results into one object so
that I don't have to wait and aggregate it again
final DataStream<DailyEmployeePurchases>
aggregatedAgainBecauseINeedHelp = employeeDailyPurchasesDataStream
      .keyBy(0, 1, 2)
      .window(TumblingProcessingTimeWindows.of(Time.minutes(15)))
      .aggregate(new AggregateFunction<Row, DailyEmployeePurchases,
DailyEmployeePurchases>() {

         @Override
         public DailyEmployeePurchases createAccumulator() {
            return new DailyEmployeePurchases();
         }

         @Override
         public DailyEmployeePurchases add(Row value,
DailyEmployeePurchases accumulator) {
            return accumulator.add(value);
         }

         @Override
         public DailyEmployeePurchases
getResult(DailyEmployeePurchases accumulator) {
            return accumulator;
         }

         @Override
         public DailyEmployeePurchases merge(DailyEmployeePurchases a,
DailyEmployeePurchases b) {
            return a.merge(b);
         }
      });

// important business logic that needs to be applied to the group of employees
aggregatedAgainBecauseINeedHelp.keyBy("organizationId", "departmentId")
      .process(new KeyedProcessFunction<Tuple, DailyEmployeePurchases,
DailyEmployeePurchases>() {

         @Override
         public void processElement(DailyEmployeePurchases value,
Context ctx, Collector<DailyEmployeePurchases> out) throws Exception {
            // very important stuff here
         }
      });

Reply via email to