tableEnv.executeSql(DDLSourceSQLManager.cdcCreateTableSQL("order_info")); tableEnv .toRetractStream(tableEnv.from("order_info"), Row.class) .filter((FilterFunction<Tuple2<Boolean, Row>>) booleanRowTuple2 -> booleanRowTuple2.f0) .map((MapFunction<Tuple2<Boolean, Row>, Row>) booleanRowTuple2 -> booleanRowTuple2.f1) .assignTimestampsAndWatermarks( WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner(((element, recordTimestamp) -> System.currentTimeMillis()))) .keyBy((KeySelector<Row, String>) row -> row.getField("consignee").toString()) .window(TumblingEventTimeWindows.of(Time.seconds(100))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) .process(new ProcessWindowFunction<Row, Tuple2<TimeWindow, Long>, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<Row> elements, Collector<Tuple2<TimeWindow, Long>> out) throws Exception { Long count = 0L; for (Row element : elements) { count += 1; } out.collect(new Tuple2(context.window(), count)); } }) .print(); ; streamEnv.execute();
-- Sent from: http://apache-flink.147419.n8.nabble.com/