Hi, java.util.Date没有sql中的常规类型和它对应,因此使用的兜底的Raw类型(结构化类型)。实际上java.sql.Date 对应的是sql中的Date。 具体可以参考下这张表:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type-extraction
-- Best! Xuyang 在 2024-03-05 09:23:38,"ha.fen...@aisino.com" <ha.fen...@aisino.com> 写道: >从流转换成Table >DataStream<Orders> streamSource = env.addSource(new OrdersSourceObject()); >Table table = >tEnv.fromDataStream(streamSource).select($("addtime"),$("cusname"),$("price"),$("status")); >tEnv.createTemporaryView("itemtable",table); > >Orders定义 >private Date addtime; >private String cusname; >private BigDecimal price; >private int status; > >输出到kafka >String creatDDL = "CREATE TABLE kafka_sink (\n" + > " `addtime` TIMESTAMP(0),\n" + > " `cusname` STRING,\n" + > " `price` DECIMAL(15, 2),\n" + > " `status` INT\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'format' = 'json'\n" + > ..... > ")"; > >String query = "INSERT INTO kafka_sink SELECT * FROM itemtable;"; >tEnv.executeSql(query); > >报错 > >Exception in thread "main" org.apache.flink.table.api.ValidationException: >Column types of query result and sink for >'default_catalog.default_database.kafka_sink' do not match. >Cause: Incompatible types for sink column 'addtime' at position 0. > >Query schema: [addtime: RAW('java.util.Date', '...'), cusname: STRING, price: >DECIMAL(38, 18), status: INT NOT NULL] >Sink schema: [addtime: TIMESTAMP(0), cusname: STRING, price: DECIMAL(15, 2), >status: INT]