1:当DataStream是由 一个table 经过 group by rowtime 转换过来的就无法触发窗口
例如: tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test group by msg,rowtime")); // 获得 DataStream,并定义wtm生成 SingleOutputStreamOperator r = tableEnv.toRetractStream(tableEnv.from("test3"), Row.class) .filter(x -> x.f0) // map ........ .returns(Types.TUPLE(Types.STRING, Types.LONG)) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) .withTimestampAssigner(((element, recordTimestamp) -> element.f1)) ); 参考 官方文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html // stream - 转 Table,指定Rowtime tableEnv.createTemporaryView("test5", r, $("msg"), $("rowtime").rowtime()); String sql5 = "select " + "msg," + "count(1) cnt" + " from test5 " + " group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " + ""; tableEnv.executeSql("insert into printlnRetractSink " + sql5); 结果: 无法触发窗口操作。 查调试源码: org.apache.flink.table.runtime.operators.window.WindowOperator // 返回的wtm永远都是 -9223372036854775808 public long getCurrentWatermark() { return internalTimerService.currentWatermark(); } // 查看任务,watermark是正常在生成的。InternalTimerServiceImpl.advanceWatermark是正常为currentWatermark赋值。但是 internalTimerService.currentWatermark() 却拿的是-9223372036854775808 // 当 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test group by msg,rowtime")); 语句改为 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test")); 结果就是正确的。 所以这是一个bug吗?? -- Sent from: http://apache-flink.147419.n8.nabble.com/