tableEnv.executeSql(DDLSourceSQLManager.cdcCreateTableSQL("order_info"));
        tableEnv
                .toRetractStream(tableEnv.from("order_info"), Row.class)
                .filter((FilterFunction<Tuple2&lt;Boolean, Row>>)
booleanRowTuple2 -> booleanRowTuple2.f0)
                .map((MapFunction<Tuple2&lt;Boolean, Row>, Row>)
booleanRowTuple2 -> booleanRowTuple2.f1)
                .assignTimestampsAndWatermarks(
                       
WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner(((element, recordTimestamp)
-> System.currentTimeMillis())))
                .keyBy((KeySelector<Row, String>) row ->
row.getField("consignee").toString())
                .window(TumblingEventTimeWindows.of(Time.seconds(100)))
               
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<Row,
                                Tuple2&lt;TimeWindow, Long>,
                                String,
                                TimeWindow>() {
                            @Override
                            public void process(String s, Context context,
Iterable<Row> elements, Collector<Tuple2&lt;TimeWindow, Long>> out) throws
Exception {
                                Long count = 0L;
                                for (Row element : elements) {
                                    count += 1;
                                }
                                out.collect(new Tuple2(context.window(),
count));
                            }
                        })
                .print();
                ;
        streamEnv.execute();



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复