hi?? 1. ????????flink 1.13.1 ??????row(a,b)????????????????????????????????????????bug?? 2. ???? row????????row????????????????????row????????name??????????????name???????? ??????????????????
package test; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.ResolvedSchema; public class DataGenTest { public static void main(String[] args) { StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment); tableEnvironment.executeSql("CREATE TABLE datagen (\n" + " f_sequence INT,\n" + " f_random INT,\n" + " f_random_str STRING,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='5',\n" + " 'fields.f_sequence.kind'='sequence',\n" + " 'fields.f_sequence.start'='1',\n" + " 'fields.f_sequence.end'='1000',\n" + " 'fields.f_random.min'='1',\n" + " 'fields.f_random.max'='1000',\n" + " 'fields.f_random_str.length'='10'\n" + ")"); Table table = tableEnvironment.sqlQuery("select row(f_sequence, f_random) as c from datagen"); ResolvedSchema resolvedSchema = table.getResolvedSchema(); System.out.println(resolvedSchema); /** * ?????????? * ( * `c` ROW<`EXPR$0` INT, `EXPR$1` INT> NOT NULL * ) * ????????????????row????????????????????row????????????????????row??????????????????????????c1, c2?? * ( * `c` ROW<`c1` INT, `c2` INT> NOT NULL * ) */ Table table1 = tableEnvironment.sqlQuery("select * from " + table); /** * ????????????sql?????? * Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: * validated type: * RecordType(RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER EXPR$1) NOT NULL c) NOT NULL * converted type: * RecordType(RecordType(INTEGER EXPR$0, INTEGER EXPR$1) NOT NULL c) NOT NULL * rel: * LogicalProject(c=[ROW($0, $1)]) * LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3]) * LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2], ts=[LOCALTIMESTAMP]) * LogicalTableScan(table=[[default_catalog, default_database, datagen]]) */ ResolvedSchema resolvedSchema1 = table1.getResolvedSchema(); System.out.println(resolvedSchema1); table.execute().print(); } }