感谢回答!
我定义了 schema 指定了时间字段,正如下面的代码,唯一的区别是使用
createTemporaryTable 替代原先的 registerTableSource 方法。
Rowtime rowtime = new Rowtime()
.timestampsFromField("searchTime")
.watermarksPeriodicBounded(5 * 1000);
Schema schema = new Schema()
.field("code", DataTypes.INT())
.field("results", DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.STRING()),
DataTypes.FIELD("items",
DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.FIELD("id",
DataTypes.STRING()),
DataTypes.FIELD("name",
DataTypes.STRING())
)
)
)
))
.field("rowTime", DataTypes.TIMESTAMP()).rowtime(rowtime);
tableEnv.connect(kafka)
.withSchema(schema)
.withFormat(new Json().failOnMissingField(false))
.inAppendMode()
// .registerTableSource("tb_json");
.createTemporaryTable("tb_json");
SQL :SELECT rowTime as searchTime, code, ... FROM tb_json order by
searchTime ASC
Jingsong Li wrote on 2020/5/8 10:02 AM:
Hi,
就像异常所说,streaming sql不支持非时间字段的order by。
你是怎么来指定时间字段的呢?
Best,
Jingsong Lee
On Fri, May 8, 2020 at 9:52 AM Hito Zhu<qrshi....@gmail.com> wrote:
hi all,
flink 1.10 建议使用 createTemporaryTable 方法代替 registerTableSource
方法,替换后报错,错误信息和SQL如下:
Exception in thread "main" org.apache.flink.table.api.TableException:
Sort on a non-time-attribute field is not supported.
SQL:select code, ...,searchTime from table order by searchTime asc
不使用 order by 语句没问题。
--
Sent from Postbox <https://www.postbox-inc.com>