Hi, 
java.util.Date没有sql中的常规类型和它对应,因此使用的兜底的Raw类型(结构化类型)。实际上java.sql.Date 
对应的是sql中的Date。
具体可以参考下这张表:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type-extraction




--

    Best!
    Xuyang





在 2024-03-05 09:23:38,"ha.fen...@aisino.com" <ha.fen...@aisino.com> 写道:
>从流转换成Table
>DataStream<Orders> streamSource = env.addSource(new OrdersSourceObject());
>Table table = 
>tEnv.fromDataStream(streamSource).select($("addtime"),$("cusname"),$("price"),$("status"));
>tEnv.createTemporaryView("itemtable",table);
>
>Orders定义
>private Date addtime;
>private String cusname;
>private BigDecimal price;
>private int status;
>
>输出到kafka
>String creatDDL = "CREATE TABLE kafka_sink (\n" +
>                "  `addtime` TIMESTAMP(0),\n" +
>                "  `cusname` STRING,\n" +
>                "  `price` DECIMAL(15, 2),\n" +
>                "  `status` INT\n" +
>                ") WITH (\n" +
>                "  'connector' = 'kafka',\n" +                
>                "  'format' = 'json'\n" +
>                    .....
>                ")";
>
>String query = "INSERT INTO kafka_sink SELECT * FROM itemtable;";
>tEnv.executeSql(query);
>
>报错
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Column types of query result and sink for 
>'default_catalog.default_database.kafka_sink' do not match.
>Cause: Incompatible types for sink column 'addtime' at position 0.
>
>Query schema: [addtime: RAW('java.util.Date', '...'), cusname: STRING, price: 
>DECIMAL(38, 18), status: INT NOT NULL]
>Sink schema:  [addtime: TIMESTAMP(0), cusname: STRING, price: DECIMAL(15, 2), 
>status: INT]

回复