sql?????? CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json' );
select * from user_behavior; pom.xml?????? flink.version=1.13.1 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> ????log?????? Caused by: java.io.IOException: Failed to deserialize JSON '"{\n \"app_time\": \"2021-06-14 10:00:00\",\n \"category_id\": 1,\n \"item_id\": 1,\n \"user_id\": 1,\n \"behavior\": \"pv\"\n}"'. at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:112) at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:50) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) Caused by: java.lang.ClassCastException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode at org.apache.flink.formats.json.JsonToRowDataConverters.lambda$createRowConverter$ef66fe9a$1(JsonToRowDataConverters.java:344) at org.apache.flink.formats.json.JsonToRowDataConverters.lambda$wrapIntoNullableConverter$de0b9253$1(JsonToRowDataConverters.java:376) at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.convertToRowData(JsonRowDataDeserializationSchema.java:121) at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:106) ... 9 more