Re: Flink 1.10 JSON 解析

2020-03-26 文章 Zhenghua Gao
Hi 张宇

看起来是TypeMappingUtils中校验字段物理类型和逻辑类型的bug。
开了一个issue: https://issues.apache.org/jira/browse/FLINK-16800

*Best Regards,*
*Zhenghua Gao*


On Fri, Mar 20, 2020 at 5:28 PM 宇张  wrote:

> hi,
> 了解了,我重新整理一下:
> streamTableEnv
> .connect(
> new Kafka()
> .version("0.11")
> .topic("mysql_binlog_test")
> .startFromEarliest()
> .property("zookeeper.connect",
> "localhost:2181")
> .property("bootstrap.servers",
> "localhost:9092")
> )
> .withFormat(
> new Json()
> )
> .withSchema(
> new Schema()
> .field("business", DataTypes.STRING())
> .field("data", DataTypes.ARRAY(
> DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT()),
> DataTypes.FIELD("vendor_id",
> DataTypes.DOUBLE()),
> DataTypes.FIELD("status",
> DataTypes.BIGINT()),
> DataTypes.FIELD("create_time",
> DataTypes.BIGINT()),
> DataTypes.FIELD("tracking_number",
> DataTypes.STRING()),
> DataTypes.FIELD("invoice_no",
> DataTypes.STRING()),
> DataTypes.FIELD("parent_id",
> DataTypes.BIGINT()
> .field("database", DataTypes.STRING())
> .field("old",
> DataTypes.ARRAY(DataTypes.ROW(DataTypes.FIELD("logistics_status",
> DataTypes.DECIMAL(38,18)
> .field("table", DataTypes.STRING())
> .field("ts", DataTypes.BIGINT())
> .field("type", DataTypes.STRING())
> .field("putRowNum", DataTypes.BIGINT())
> )
> .createTemporaryTable("Test");
> 这里面old复合字段里面子字段的类型使用DECIMAL时抛出异常,采用其他类型是可以的;
> 异常:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type ARRAY> of table field 'old'
> does not match with the physical type ARRAY LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return
> type.
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157)
> at
> org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> at
>
> 

Re: Flink 1.10 JSON 解析

2020-03-20 文章 宇张
hi,
了解了,我重新整理一下:
streamTableEnv
.connect(
new Kafka()
.version("0.11")
.topic("mysql_binlog_test")
.startFromEarliest()
.property("zookeeper.connect",
"localhost:2181")
.property("bootstrap.servers",
"localhost:9092")
)
.withFormat(
new Json()
)
.withSchema(
new Schema()
.field("business", DataTypes.STRING())
.field("data", DataTypes.ARRAY(
DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("vendor_id",
DataTypes.DOUBLE()),
DataTypes.FIELD("status",
DataTypes.BIGINT()),
DataTypes.FIELD("create_time",
DataTypes.BIGINT()),
DataTypes.FIELD("tracking_number",
DataTypes.STRING()),
DataTypes.FIELD("invoice_no",
DataTypes.STRING()),
DataTypes.FIELD("parent_id",
DataTypes.BIGINT()
.field("database", DataTypes.STRING())
.field("old",
DataTypes.ARRAY(DataTypes.ROW(DataTypes.FIELD("logistics_status",
DataTypes.DECIMAL(38,18)
.field("table", DataTypes.STRING())
.field("ts", DataTypes.BIGINT())
.field("type", DataTypes.STRING())
.field("putRowNum", DataTypes.BIGINT())
)
.createTemporaryTable("Test");
这里面old复合字段里面子字段的类型使用DECIMAL时抛出异常,采用其他类型是可以的;
异常:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Type ARRAY> of table field 'old'
does not match with the physical type ARRAY> of the 'old' field of the TableSource return
type.
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
at
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
at
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
at
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157)
at org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110)
at
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
at
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
at
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
at
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at

Re: Flink 1.10 JSON 解析

2020-03-20 文章 Jark Wu
Hi,

你发的图片都裂开了。。。 建议直接贴文本或者先上传到某个图床服务,然后将链接贴过来。

1. 使用 DECIMAL 抛什么错误呢?
2. 如果保留jsonSchema的话,要保证 table schema 和 json schema 是一致的,也就是不仅 table schema
要正确,json schema 也得要正确。
这其实多了很多额外的成本,所以一般建议不配置 jsonSchema。理论上 table schema 能映射出所有的复杂的格式。

Best,
Jark


On Fri, 20 Mar 2020 at 14:48, 宇张  wrote:

> hi、
> 好吧,测试发现Decimal用不了,即使是DECIMAL(38, 18),换成其他类型就好了,不知道是不是bug
> [image: image.png]
>
> On Fri, Mar 20, 2020 at 2:17 PM 宇张  wrote:
>
>> hi,我这面再次进行了尝试,当json数据中有数字类型的时候,即使按照将 data 的schema定义需要改成
>> ARRAY(ROW(...))
>> 另外删除
>> .jsonSchema(...)后,程序仍然无法运行,当没有数字类型的时候是可以的;而报错信息输出来看,这两个结构是对的上的,但是貌似校验未通过
>> [image: image.png]
>>
>>
>> On Fri, Mar 20, 2020 at 12:08 PM 宇张  wrote:
>>
>>> hi,
>>> 好的,我这面进行了尝试,将 data 的schema定义需要改成
>>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no",
>>> STRING)))
>>> 另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
>>> .jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException: Type
>>> ARRAY> of table field
>>> 'data' does not match with the physical type ROW<`f0` ROW<`tracking_number`
>>> STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return
>>> type.
>>>
>>> 而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
>>> [image: image.png]
>>>
>>> On Fri, Mar 20, 2020 at 11:42 AM Jark Wu  wrote:
>>>
 Hi,

 看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
 ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no",
 STRING)))
 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断
 json
 schema 了。

 Best,
 Jark

 On Fri, 20 Mar 2020 at 11:34, 宇张  wrote:

 > hi:
 > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
 > [image: image.png]
 > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
 >
 >
 json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
 >
 >
 jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
 > connect:
 >
 > streamTableEnv
 > .connect(
 > new Kafka()
 > .version("0.11")
 > .topic("mysql_binlog_test_str")
 > .startFromEarliest()
 > .property("zookeeper.connect",
 "localhost:2181")
 > .property("bootstrap.servers",
 "localhost:9092")
 > )
 > .withFormat(
 > new Json()
 >
  
 .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
 > )
 > .withSchema(
 > new Schema()
 > .field("business", DataTypes.STRING())
 > .field("data",
 DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
 > DataTypes.FIELD("tracking_number",
 DataTypes.STRING()),
 > DataTypes.FIELD("invoice_no",
 DataTypes.STRING())
 > .field("database", DataTypes.STRING())
 > .field("table", DataTypes.STRING())
 > .field("ts", DataTypes.DECIMAL(38, 18))
 > .field("type", DataTypes.STRING())
 > .field("putRowNum", DataTypes.DECIMAL(38, 18))
 > )
 > .createTemporaryTable("Test");
 >
 > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON
 object.
 >
 > at
 >
 org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
 > at
 >
 org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
 > at
 >
 org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
 > at
 >
 org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
 > at
 >
 

Re: Flink 1.10 JSON 解析

2020-03-20 文章 宇张
hi、
好吧,测试发现Decimal用不了,即使是DECIMAL(38, 18),换成其他类型就好了,不知道是不是bug
[image: image.png]

On Fri, Mar 20, 2020 at 2:17 PM 宇张  wrote:

> hi,我这面再次进行了尝试,当json数据中有数字类型的时候,即使按照将 data 的schema定义需要改成
> ARRAY(ROW(...))
> 另外删除
> .jsonSchema(...)后,程序仍然无法运行,当没有数字类型的时候是可以的;而报错信息输出来看,这两个结构是对的上的,但是貌似校验未通过
> [image: image.png]
>
>
> On Fri, Mar 20, 2020 at 12:08 PM 宇张  wrote:
>
>> hi,
>> 好的,我这面进行了尝试,将 data 的schema定义需要改成
>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
>> 另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
>> .jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Type
>> ARRAY> of table field
>> 'data' does not match with the physical type ROW<`f0` ROW<`tracking_number`
>> STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return
>> type.
>>
>> 而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
>> [image: image.png]
>>
>> On Fri, Mar 20, 2020 at 11:42 AM Jark Wu  wrote:
>>
>>> Hi,
>>>
>>> 看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
>>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no",
>>> STRING)))
>>> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json
>>> schema 了。
>>>
>>> Best,
>>> Jark
>>>
>>> On Fri, 20 Mar 2020 at 11:34, 宇张  wrote:
>>>
>>> > hi:
>>> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
>>> > [image: image.png]
>>> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
>>> >
>>> >
>>> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
>>> >
>>> >
>>> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
>>> > connect:
>>> >
>>> > streamTableEnv
>>> > .connect(
>>> > new Kafka()
>>> > .version("0.11")
>>> > .topic("mysql_binlog_test_str")
>>> > .startFromEarliest()
>>> > .property("zookeeper.connect",
>>> "localhost:2181")
>>> > .property("bootstrap.servers",
>>> "localhost:9092")
>>> > )
>>> > .withFormat(
>>> > new Json()
>>> >
>>>  
>>> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
>>> > )
>>> > .withSchema(
>>> > new Schema()
>>> > .field("business", DataTypes.STRING())
>>> > .field("data",
>>> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
>>> > DataTypes.FIELD("tracking_number",
>>> DataTypes.STRING()),
>>> > DataTypes.FIELD("invoice_no",
>>> DataTypes.STRING())
>>> > .field("database", DataTypes.STRING())
>>> > .field("table", DataTypes.STRING())
>>> > .field("ts", DataTypes.DECIMAL(38, 18))
>>> > .field("type", DataTypes.STRING())
>>> > .field("putRowNum", DataTypes.DECIMAL(38, 18))
>>> > )
>>> > .createTemporaryTable("Test");
>>> >
>>> > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.
>>> >
>>> > at
>>> >
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
>>> > at
>>> >
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
>>> > at
>>> >
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>>> > at
>>> >
>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
>>> > at
>>> >
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>> > at
>>> >
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> > at
>>> >
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> > at
>>> >
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>>> > Caused by: java.lang.ClassCastException:
>>> 

Re: Flink 1.10 JSON 解析

2020-03-20 文章 宇张
hi,我这面再次进行了尝试,当json数据中有数字类型的时候,即使按照将 data 的schema定义需要改成
ARRAY(ROW(...))
另外删除
.jsonSchema(...)后,程序仍然无法运行,当没有数字类型的时候是可以的;而报错信息输出来看,这两个结构是对的上的,但是貌似校验未通过
[image: image.png]


On Fri, Mar 20, 2020 at 12:08 PM 宇张  wrote:

> hi,
> 好的,我这面进行了尝试,将 data 的schema定义需要改成
> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
> 另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
> .jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type
> ARRAY> of table field
> 'data' does not match with the physical type ROW<`f0` ROW<`tracking_number`
> STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return
> type.
>
> 而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
> [image: image.png]
>
> On Fri, Mar 20, 2020 at 11:42 AM Jark Wu  wrote:
>
>> Hi,
>>
>> 看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
>> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json
>> schema 了。
>>
>> Best,
>> Jark
>>
>> On Fri, 20 Mar 2020 at 11:34, 宇张  wrote:
>>
>> > hi:
>> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
>> > [image: image.png]
>> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
>> >
>> >
>> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
>> >
>> >
>> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
>> > connect:
>> >
>> > streamTableEnv
>> > .connect(
>> > new Kafka()
>> > .version("0.11")
>> > .topic("mysql_binlog_test_str")
>> > .startFromEarliest()
>> > .property("zookeeper.connect", "localhost:2181")
>> > .property("bootstrap.servers", "localhost:9092")
>> > )
>> > .withFormat(
>> > new Json()
>> >
>>  
>> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
>> > )
>> > .withSchema(
>> > new Schema()
>> > .field("business", DataTypes.STRING())
>> > .field("data",
>> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
>> > DataTypes.FIELD("tracking_number",
>> DataTypes.STRING()),
>> > DataTypes.FIELD("invoice_no",
>> DataTypes.STRING())
>> > .field("database", DataTypes.STRING())
>> > .field("table", DataTypes.STRING())
>> > .field("ts", DataTypes.DECIMAL(38, 18))
>> > .field("type", DataTypes.STRING())
>> > .field("putRowNum", DataTypes.DECIMAL(38, 18))
>> > )
>> > .createTemporaryTable("Test");
>> >
>> > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.
>> >
>> > at
>> >
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
>> > at
>> >
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
>> > at
>> >
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>> > at
>> >
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
>> > at
>> >
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> > at
>> >
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> > at
>> >
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> > at
>> >
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>> > Caused by: java.lang.ClassCastException:
>> >
>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
>> > cannot be cast to
>> >
>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
>> > at
>> >
>> 

Re: Flink 1.10 JSON 解析

2020-03-19 文章 宇张
hi,
好的,我这面进行了尝试,将 data 的schema定义需要改成
ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
.jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type
ARRAY> of table field
'data' does not match with the physical type ROW<`f0` ROW<`tracking_number`
STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return
type.
而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
[image: image.png]

On Fri, Mar 20, 2020 at 11:42 AM Jark Wu  wrote:

> Hi,
>
> 看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json
> schema 了。
>
> Best,
> Jark
>
> On Fri, 20 Mar 2020 at 11:34, 宇张  wrote:
>
> > hi:
> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
> > [image: image.png]
> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
> >
> >
> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
> >
> >
> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
> > connect:
> >
> > streamTableEnv
> > .connect(
> > new Kafka()
> > .version("0.11")
> > .topic("mysql_binlog_test_str")
> > .startFromEarliest()
> > .property("zookeeper.connect", "localhost:2181")
> > .property("bootstrap.servers", "localhost:9092")
> > )
> > .withFormat(
> > new Json()
> >
>  
> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
> > )
> > .withSchema(
> > new Schema()
> > .field("business", DataTypes.STRING())
> > .field("data",
> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
> > DataTypes.FIELD("tracking_number",
> DataTypes.STRING()),
> > DataTypes.FIELD("invoice_no",
> DataTypes.STRING())
> > .field("database", DataTypes.STRING())
> > .field("table", DataTypes.STRING())
> > .field("ts", DataTypes.DECIMAL(38, 18))
> > .field("type", DataTypes.STRING())
> > .field("putRowNum", DataTypes.DECIMAL(38, 18))
> > )
> > .createTemporaryTable("Test");
> >
> > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.
> >
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> > Caused by: java.lang.ClassCastException:
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
> > cannot be cast to
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> > at
> >
> 

Re: Flink 1.10 JSON 解析

2020-03-19 文章 Jark Wu
Hi,

看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json
schema 了。

Best,
Jark

On Fri, 20 Mar 2020 at 11:34, 宇张  wrote:

> hi:
> 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
> [image: image.png]
> 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
>
> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
>
> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
> connect:
>
> streamTableEnv
> .connect(
> new Kafka()
> .version("0.11")
> .topic("mysql_binlog_test_str")
> .startFromEarliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")
> )
> .withFormat(
> new Json()
> 
> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
> )
> .withSchema(
> new Schema()
> .field("business", DataTypes.STRING())
> .field("data", DataTypes.ROW(DataTypes.FIELD("f0", 
> DataTypes.ROW(
> DataTypes.FIELD("tracking_number", 
> DataTypes.STRING()),
> DataTypes.FIELD("invoice_no", 
> DataTypes.STRING())
> .field("database", DataTypes.STRING())
> .field("table", DataTypes.STRING())
> .field("ts", DataTypes.DECIMAL(38, 18))
> .field("type", DataTypes.STRING())
> .field("putRowNum", DataTypes.DECIMAL(38, 18))
> )
> .createTemporaryTable("Test");
>
> 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.
>
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> Caused by: java.lang.ClassCastException:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
> cannot be cast to
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> ... 7 more
>
>
>


Flink 1.10 JSON 解析

2020-03-19 文章 宇张
hi:
1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
[image: image.png]
2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
connect:

streamTableEnv
.connect(
new Kafka()
.version("0.11")
.topic("mysql_binlog_test_str")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(
new Json()

.jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
)
.withSchema(
new Schema()
.field("business", DataTypes.STRING())
.field("data",
DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
DataTypes.FIELD("tracking_number",
DataTypes.STRING()),
DataTypes.FIELD("invoice_no",
DataTypes.STRING())
.field("database", DataTypes.STRING())
.field("table", DataTypes.STRING())
.field("ts", DataTypes.DECIMAL(38, 18))
.field("type", DataTypes.STRING())
.field("putRowNum", DataTypes.DECIMAL(38, 18))
)
.createTemporaryTable("Test");

异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.

at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.lang.ClassCastException:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
cannot be cast to
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more