sql??????
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);


select * from user_behavior;



pom.xml??????
flink.version=1.13.1
<dependency&gt;
    <groupId&gt;org.apache.flink</groupId&gt;
    <artifactId&gt;flink-runtime-web_2.11</artifactId&gt;
    <version&gt;${flink.version}</version&gt;
    <scope&gt;compile</scope&gt;
</dependency&gt;

<dependency&gt;
    <groupId&gt;org.apache.flink</groupId&gt;
    <artifactId&gt;flink-json</artifactId&gt;
    <version&gt;${flink.version}</version&gt;
</dependency&gt;

<dependency&gt;
    <groupId&gt;org.apache.flink</groupId&gt;
    <artifactId&gt;flink-csv</artifactId&gt;
    <version&gt;${flink.version}</version&gt;
</dependency&gt;


<dependency&gt;
    <groupId&gt;org.apache.flink</groupId&gt;
    <artifactId&gt;flink-core</artifactId&gt;
    <version&gt;${flink.version}</version&gt;
</dependency&gt;

<dependency&gt;
    <groupId&gt;org.apache.flink</groupId&gt;
    
<artifactId&gt;flink-table-api-java-bridge_${scala.binary.version}</artifactId&gt;
    <version&gt;${flink.version}</version&gt;
</dependency&gt;

<dependency&gt;
    <groupId&gt;org.apache.flink</groupId&gt;
    
<artifactId&gt;flink-table-api-scala-bridge_${scala.binary.version}</artifactId&gt;
    <version&gt;${flink.version}</version&gt;
</dependency&gt;

<dependency&gt;
    <groupId&gt;org.apache.flink</groupId&gt;
    <artifactId&gt;flink-table-planner_${scala.binary.version}</artifactId&gt;
    <version&gt;${flink.version}</version&gt;
</dependency&gt;

<dependency&gt;
    <groupId&gt;org.apache.flink</groupId&gt;
    
<artifactId&gt;flink-table-planner-blink_${scala.binary.version}</artifactId&gt;
    <version&gt;${flink.version}</version&gt;
</dependency&gt;

<dependency&gt;
    <groupId&gt;org.apache.flink</groupId&gt;
    <artifactId&gt;flink-table-common</artifactId&gt;
    <version&gt;${flink.version}</version&gt;
    <!--            <scope&gt;provided</scope&gt;--&gt;
</dependency&gt;


<dependency&gt;
    <groupId&gt;org.apache.flink</groupId&gt;
    <artifactId&gt;flink-streaming-scala_${scala.binary.version}</artifactId&gt;
    <version&gt;${flink.version}</version&gt;
</dependency&gt;

<dependency&gt;
    <groupId&gt;org.apache.flink</groupId&gt;
    <artifactId&gt;flink-clients_${scala.binary.version}</artifactId&gt;
    <version&gt;${flink.version}</version&gt;
</dependency&gt;


<dependency&gt;
    <groupId&gt;org.apache.flink</groupId&gt;
    <artifactId&gt;flink-connector-kafka_2.11</artifactId&gt;
    <version&gt;${flink.version}</version&gt;
    <!--            <scope&gt;provided</scope&gt;--&gt;
</dependency&gt;


<dependency&gt;
    <groupId&gt;com.alibaba</groupId&gt;
    <artifactId&gt;fastjson</artifactId&gt;
    <version&gt;1.2.56</version&gt;
</dependency&gt;

<dependency&gt;
    <groupId&gt;org.projectlombok</groupId&gt;
    <artifactId&gt;lombok</artifactId&gt;
    <version&gt;1.18.12</version&gt;
</dependency&gt;
????log??????
Caused by: java.io.IOException: Failed to deserialize JSON '"{\n&nbsp; &nbsp; 
\"app_time\": \"2021-06-14 10:00:00\",\n&nbsp; &nbsp; \"category_id\": 
1,\n&nbsp; &nbsp; \"item_id\": 1,\n&nbsp; &nbsp; \"user_id\": 1,\n&nbsp; &nbsp; 
\"behavior\": \"pv\"\n}"'.
        at 
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:112)
        at 
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:50)
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
        at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.lang.ClassCastException: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode 
cannot be cast to 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
        at 
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$createRowConverter$ef66fe9a$1(JsonToRowDataConverters.java:344)
        at 
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$wrapIntoNullableConverter$de0b9253$1(JsonToRowDataConverters.java:376)
        at 
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.convertToRowData(JsonRowDataDeserializationSchema.java:121)
        at 
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:106)
        ... 9 more

回复