hi??
1. ????????flink 1.13.1 
??????row(a,b)????????????????????????????????????????bug??
2. ???? 
row????????row????????????????????row????????name??????????????name????????
??????????????????


package test;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ResolvedSchema;


public class DataGenTest {


    public static void main(String[] args) {
        StreamExecutionEnvironment 
streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecutionEnvironment);


        tableEnvironment.executeSql("CREATE TABLE datagen 
(\n" +
                " f_sequence INT,\n" +
                " f_random INT,\n" +
                " f_random_str 
STRING,\n" +
                " ts AS 
localtimestamp,\n" +
                " WATERMARK FOR ts AS 
ts\n" +
                ") WITH (\n" +
                " 'connector' = 
'datagen',\n" +
                " 
'rows-per-second'='5',\n" +
                " 
'fields.f_sequence.kind'='sequence',\n" +
                " 
'fields.f_sequence.start'='1',\n" +
                " 
'fields.f_sequence.end'='1000',\n" +
                " 
'fields.f_random.min'='1',\n" +
                " 
'fields.f_random.max'='1000',\n" +
                " 
'fields.f_random_str.length'='10'\n" +
                ")");


        Table table = tableEnvironment.sqlQuery("select 
row(f_sequence, f_random) as c from datagen");
        ResolvedSchema resolvedSchema = 
table.getResolvedSchema();
        System.out.println(resolvedSchema);
        /**
         * ??????????
         * (
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp;`c` ROW<`EXPR$0` INT, `EXPR$1` 
INT&gt; NOT NULL
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* )
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* 
????????????????row????????????????????row????????????????????row??????????????????????????c1,
 c2??
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* (
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp;`c` ROW<`c1` INT, `c2` INT&gt; 
NOT NULL
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* )
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*/


&nbsp; &nbsp; &nbsp; &nbsp; Table table1 = tableEnvironment.sqlQuery("select * 
from " + table);
&nbsp; &nbsp; &nbsp; &nbsp; /**
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* ????????????sql??????
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* Exception in thread "main" 
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* validated type:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* 
RecordType(RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER EXPR$1) NOT NULL 
c) NOT NULL
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* converted type:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* RecordType(RecordType(INTEGER EXPR$0, 
INTEGER EXPR$1) NOT NULL c) NOT NULL
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* rel:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* LogicalProject(c=[ROW($0, $1)])
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; 
&nbsp;LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3])
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp; 
&nbsp;LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2], 
ts=[LOCALTIMESTAMP])
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp; &nbsp; 
&nbsp;LogicalTableScan(table=[[default_catalog, default_database, datagen]])
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*/
&nbsp; &nbsp; &nbsp; &nbsp; ResolvedSchema resolvedSchema1 = 
table1.getResolvedSchema();
&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(resolvedSchema1);


&nbsp; &nbsp; &nbsp; &nbsp; table.execute().print();




&nbsp; &nbsp; }


}

回复