Hi! 关于问题一(如何配置 row 的字段名),可以通过 cast 语句: select cast(row(f_sequence, f_random) as row<a int, b int>) as c from datagen
关于问题二,看起来确实是一个 bug,可以去 https://issues.apache.org/jira/projects/FLINK/issues 上开一个 ticket Asahi Lee <978466...@qq.com.invalid> 于2021年7月22日周四 下午8:44写道: > hi! > 1. 我在使用flink 1.13.1 对通过row(a,b)生成的列再次查询时,发生错误,是否是一个bug? > 2. 通过 row函数生成row类型的列时,无法指定row中字段的name,是否考虑支持name的配置? > 我的示例程序如下: > > > package test; > > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.catalog.ResolvedSchema; > > > public class DataGenTest { > > > public static void main(String[] args) { > StreamExecutionEnvironment > streamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(streamExecutionEnvironment); > > > tableEnvironment.executeSql("CREATE TABLE > datagen (\n" + > " f_sequence > INT,\n" + > " f_random INT,\n" > + > " f_random_str > STRING,\n" + > " ts AS > localtimestamp,\n" + > " WATERMARK FOR ts > AS ts\n" + > ") WITH (\n" + > " 'connector' = > 'datagen',\n" + > " > 'rows-per-second'='5',\n" + > " > 'fields.f_sequence.kind'='sequence',\n" + > " > 'fields.f_sequence.start'='1',\n" + > " > 'fields.f_sequence.end'='1000',\n" + > " > 'fields.f_random.min'='1',\n" + > " > 'fields.f_random.max'='1000',\n" + > " > 'fields.f_random_str.length'='10'\n" + > ")"); > > > Table table = > tableEnvironment.sqlQuery("select row(f_sequence, f_random) as c from > datagen"); > ResolvedSchema resolvedSchema = > table.getResolvedSchema(); > System.out.println(resolvedSchema); > /** > * 打印如下: > * ( > * `c` ROW<`EXPR$0` INT, > `EXPR$1` INT> NOT NULL > * ) > * > 问题一,通过使用row函数,我将两个列放入row类型中,那我如何配置row中字段的名称呢?,如下中的c1, c2: > * ( > * `c` ROW<`c1` INT, `c2` > INT> NOT NULL > * ) > */ > > > Table table1 = > tableEnvironment.sqlQuery("select * from " + table); > /** > * 问题二,查询sql报错: > * Exception in thread "main" > java.lang.AssertionError: Conversion to relational algebra failed to > preserve datatypes: > * validated type: > * > RecordType(RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER EXPR$1) NOT > NULL c) NOT NULL > * converted type: > * RecordType(RecordType(INTEGER EXPR$0, > INTEGER EXPR$1) NOT NULL c) NOT NULL > * rel: > * LogicalProject(c=[ROW($0, $1)]) > * > LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3]) > * > LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2], > ts=[LOCALTIMESTAMP]) > * > LogicalTableScan(table=[[default_catalog, default_database, datagen]]) > */ > ResolvedSchema resolvedSchema1 = > table1.getResolvedSchema(); > System.out.println(resolvedSchema1); > > > table.execute().print(); > > > > > } > > > }