hi:
1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
[image: image.png]
2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":19999}
jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
connect:

streamTableEnv
        .connect(
                new Kafka()
                        .version("0.11")
                        .topic("mysql_binlog_test_str")
                        .startFromEarliest()
                        .property("zookeeper.connect", "localhost:2181")
                        .property("bootstrap.servers", "localhost:9092")
        )
        .withFormat(
                new Json()

.jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
        )
        .withSchema(
                new Schema()
                        .field("business", DataTypes.STRING())
                        .field("data",
DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
                                DataTypes.FIELD("tracking_number",
DataTypes.STRING()),
                                DataTypes.FIELD("invoice_no",
DataTypes.STRING())))))
                        .field("database", DataTypes.STRING())
                        .field("table", DataTypes.STRING())
                        .field("ts", DataTypes.DECIMAL(38, 18))
                        .field("type", DataTypes.STRING())
                        .field("putRowNum", DataTypes.DECIMAL(38, 18))
        )
        .createTemporaryTable("Test");

异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.

at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.lang.ClassCastException:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
cannot be cast to
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more

回复