感谢回答!
我定义了 schema 指定了时间字段,正如下面的代码,唯一的区别是使用 createTemporaryTable 替代原先的 registerTableSource 方法。
Rowtime rowtime = new Rowtime()
    .timestampsFromField("searchTime")
    .watermarksPeriodicBounded(5 * 1000);

Schema schema = new Schema()
    .field("code", DataTypes.INT())
    .field("results", DataTypes.ROW(
            DataTypes.FIELD("id", DataTypes.STRING()),
            DataTypes.FIELD("items",
                    DataTypes.ARRAY(
                            DataTypes.ROW(
                                    DataTypes.FIELD("id", DataTypes.STRING()),                                     DataTypes.FIELD("name", DataTypes.STRING())
                            )
                    )
            )
    ))
    .field("rowTime", DataTypes.TIMESTAMP()).rowtime(rowtime);

tableEnv.connect(kafka)
    .withSchema(schema)
    .withFormat(new Json().failOnMissingField(false))
    .inAppendMode()
    // .registerTableSource("tb_json");
    .createTemporaryTable("tb_json");

SQL :SELECT rowTime as searchTime,  code, ...  FROM tb_json order by searchTime ASC

Jingsong Li wrote on 2020/5/8 10:02 AM:
Hi,

就像异常所说,streaming sql不支持非时间字段的order by。
你是怎么来指定时间字段的呢?

Best,
Jingsong Lee

On Fri, May 8, 2020 at 9:52 AM Hito Zhu<qrshi....@gmail.com>  wrote:

hi all,
flink 1.10 建议使用 createTemporaryTable 方法代替 registerTableSource
方法,替换后报错,错误信息和SQL如下:

Exception in thread "main" org.apache.flink.table.api.TableException:
Sort on a non-time-attribute field is not supported.
SQL:select code, ...,searchTime from table order by searchTime asc

不使用 order by 语句没问题。






--
Sent from Postbox <https://www.postbox-inc.com>

回复