Hi! 据我所知目前暂时没有增加 count window 的打算,以后可能会在最新的 Window TVF 里添加 count window tvf。
不建议在 SQL 中自行实现 count window,因为 SQL 添加 window 较为复杂。但可以考虑先将 SQL 转为 datastream,用 datastream 的 count window 之后再将 datastream 转回 SQL。 EnvironmentSettings settings = EnvironmentSettings.newInstance(). inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create( StreamExecutionEnvironment.getExecutionEnvironment(), settings); tEnv.executeSql( "CREATE TABLE T ( a INT, b INT, key AS abs(a) % 3, val AS abs(b) % 3 ) WITH ( 'connector' = 'datagen' )"); Table table = tEnv.sqlQuery("SELECT key, val FROM T"); DataStream<Row> dataStream = tEnv.toDataStream(table); DataStream<Tuple2<Integer, Integer>> summedStream = dataStream .keyBy(row -> (int) row.getField(0)) .countWindow(100) .apply( (WindowFunction< Row, Tuple2<Integer, Integer>, Integer, GlobalWindow>) (key, window, input, out) -> { int sum = 0; for (Row row : input) { Integer field = (Integer) row.getField(1); if (field != null) { sum += field; } } out.collect(Tuple2.of(key, sum)); }) .returns( new TupleTypeInfo<>( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)); Table summedTable = tEnv.fromDataStream(summedStream); tEnv.registerTable("S", summedTable); tEnv.executeSql("SELECT f0, f1 FROM S").print(); casel.chen <casel_c...@126.com> 于2021年9月17日周五 下午6:05写道: > 今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time > window,问一下官方是否打算sql支持count window呢? > 如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢!