Hi All: I have define kafka connector Descriptor, and registe Table tEnv.connect(new Kafka() .version("universal") .topic(tableName) .startFromEarliest() .property("zookeeper.connect", “xxx") .property("bootstrap.servers", “xxx") .property("group.id", “xxx")) .withFormat(new Json().deriveSchema()) .withSchema(new Schema() .field("rowtime", Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("createTime") .watermarksPeriodicBounded(300_000)) .field("data", Types.ROW(dataFieldTypes))) .inAppendMode().registerTableSource(tableName);
kafka input is: { "data": [ 18140781, ], "createTime": 1572577137596 } Exception as follows: Caused by: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203) Caused by: java.time.format.DateTimeParseException: Text '1553080631582' could not be parsed at index 0 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127) ... 7 more polaris...@gmail.com