大佬们,帮看一下,为什么那里会出现类型转换异常了。





------------------ 原始邮件 ------------------
发件人: kcz <573693...@qq.com&gt;
发送时间: 2021年7月1日 22:49
收件人: user-zh <user-zh@flink.apache.org&gt;
主题: 回复:flink-1.13.1 ddl kafka消费JSON数据 (ObjectNode) jsonNode错误



版本:1.13.1 报错信息如下:
Caused by: java.lang.ClassCastException: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode 
cannot be cast to 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
at 
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$createRowConverter$ef66fe9a$1(JsonToRowDataConverters.java:344)
at 
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$wrapIntoNullableConverter$de0b9253$1(JsonToRowDataConverters.java:376)
at 
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.convertToRowData(JsonRowDataDeserializationSchema.java:121)
at 
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:106)



DDL定义如下:
CREATE TABLE user_behavior (
&nbsp; &nbsp; user_id string 
) WITH (
&nbsp;'connector' = 'kafka',
&nbsp; 'topic' = 'user_behavior',
&nbsp; 'properties.bootstrap.servers' = 'localhost:9092',
&nbsp; 'properties.group.id' = 'testGroup',
&nbsp; 'scan.startup.mode' = 'latest-offset',
&nbsp; 'format' = 'json'
);
select * from user_behavior;

主要代码如下:
&nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.createLocalEnvironment(new Configuration());
&nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv);
&nbsp; &nbsp; &nbsp; &nbsp; tableEnv.executeSql(sql).print();

     pom文件如下:


      &nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<artifactId&gt;flink-table-api-java</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<artifactId&gt;flink-table-planner-blink_${scala.version}</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<artifactId&gt;flink-connector-hive_${scala.version}</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;


&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<artifactId&gt;flink-table-api-java-bridge_${scala.version}</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;


&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<artifactId&gt;flink-table-planner_${scala.version}</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;


&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<artifactId&gt;flink-clients_${scala.version}</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;


&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<artifactId&gt;flink-connector-kafka_2.11</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<artifactId&gt;flink-json</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
<version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;

回复