有哪位大佬帮我看下,谢谢
尝试了很久,还是无法解析嵌套结构的Json Error Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 4, column 9 to line 4, column 31: Column 'data.transaction_type' not found in any table at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) 嵌套Json 定义的 format 和 schema 如下: .withFormat(new Json() .jsonSchema( """{type: 'object', | properties: { | database: { | type: 'string' | }, | table: { | type: 'string' | }, | maxwell_ts: { | type: 'integer' | }, | data: { | type: 'object', | properties :{ | reference_id :{ | type: 'string' | }, | transaction_type :{ | type: 'integer' | }, | merchant_id :{ | type: 'integer' | }, | create_time :{ | type: 'integer' | }, | status :{ | type: 'integer' | } | } | } | } | } """.stripMargin.replaceAll("\n", " ") ) ) .withSchema(new Schema() .field("table", STRING()) .field("database", STRING()) .field("data", ROW(FIELD("reference_id",STRING()), FIELD("transaction_type",INT()), FIELD("merchant_id",INT()), FIELD("status",INT()))) //.field("event_time", BIGINT()) // .from("maxwell_ts") //.rowtime(new Rowtime() // //.timestampsFromField("ts" * 1000) // .timestampsFromField("ts") // .watermarksPeriodicBounded(60000) //) ) bsTableEnv.sqlUpdate("""INSERT INTO yyyyy | SELECT `table`, `database` | `data.reference_id`, | `data.transaction_type`, | `data.merchant_id`, | `data.create_time`, | `data.status` | FROM xxxx""".stripMargin)