[ https://issues.apache.org/jira/browse/FLINK-14550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
hehuiyuan updated FLINK-14550: ------------------------------ Description: *_The data schame :_* final String schemaString = " {\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\"," + "\"fields\": [ {\"name\":\"name\",\"type\":\"string\"} , {\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]} ," + " {\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]} , {\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]} " + ",\{\"name\":\"type_double_test\",\"type\":\"double\"}, {\"name\":\"type_null_test\",\"type\":[\"null\",\"string\"]} ," + " {\"name\":\"type_bool_test\",\"type\":[\"boolean\"]} ,{\"name\":\"type_array_string\",\"type\":" + "\{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\"," + "\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\", {\"type\":\"array\"," + "\"items\":\"string\"} ],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\"," + "\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\"," + "\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\", {\"type\":\"fixed\",\"name\":\"Fixed16\"," + "\"size\":16} ],\"size\":16}, {\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]} ," + *"{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[**{\"name\":\"num\"," +"\"type\":\"int\"}**,\{\"name\":\"street\",\"type\":\"string\"},\{\"name\":\"city\",\"type\":\"string\"}," +* *"\{\"name\":\"state\",\"type\":\"string\"},\{\"name\":\"zip\",\"type\":\"string\"}]}]},* {\"name\":\"type_bytes\"," + "\"type\":\"bytes\"} ,\{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}," + "\{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\"," + "\"type\":\{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," + "\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," + "\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\"," + "\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\"," + "\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}"; *_The code :_* tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street,userActionTime.proctime"); _*The error is as follows:*_ Exception in thread "main" org.apache.flink.table.api.TableException: The proctime attribute can only be appended to the table schema and not replace an existing field. Please move 'userActionTime' to the end of the schema.Exception in thread "main" org.apache.flink.table.api.TableException: The proctime attribute can only be appended to the table schema and not replace an existing field. Please move 'userActionTime' to the end of the schema. at org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractProctime$1(StreamTableEnvironment.scala:649) at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:676) at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:668) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.flink.table.api.StreamTableEnvironment.validateAndExtractTimeAttributes(StreamTableEnvironment.scala:668) at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:549) at org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:136) at com.jd.flink.sql.demo.validate.schema.avro.AvroQuickStartMain.main(AvroQuickStartMain.java:145) The code is ok. tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street"); The code is ok. tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested,userActionTime.proctime"); was: *_The data schame :_* final String schemaString = " {\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\"," + "\"fields\": [\\{\"name\":\"name\",\"type\":\"string\"} ,{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]}," + "{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]}" + ",\{\"name\":\"type_double_test\",\"type\":\"double\"},{\"name\":\"type_null_test\",\"type\":[\"null\",\"string\"]}," + "{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":" + "\{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\"," + "\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\", {\"type\":\"array\"," + "\"items\":\"string\"} ],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\"," + "\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\"," + "\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\", {\"type\":\"fixed\",\"name\":\"Fixed16\"," + "\"size\":16} ],\"size\":16},{\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]}," + *"{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[* *{\"name\":\"num\"," +"\"type\":\"int\"}* *,\{\"name\":\"street\",\"type\":\"string\"},\{\"name\":\"city\",\"type\":\"string\"}," +* *"\{\"name\":\"state\",\"type\":\"string\"},\{\"name\":\"zip\",\"type\":\"string\"}]}]},* {\"name\":\"type_bytes\"," + "\"type\":\"bytes\"} ,\{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}," + "\{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\"," + "\"type\":\{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," + "\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," + "\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\"," + "\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\"," + "\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}"; *_The code :_* tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street,userActionTime.proctime"); _*The error is as follows:*_ Exception in thread "main" org.apache.flink.table.api.TableException: The proctime attribute can only be appended to the table schema and not replace an existing field. Please move 'userActionTime' to the end of the schema.Exception in thread "main" org.apache.flink.table.api.TableException: The proctime attribute can only be appended to the table schema and not replace an existing field. Please move 'userActionTime' to the end of the schema. at org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractProctime$1(StreamTableEnvironment.scala:649) at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:676) at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:668) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.flink.table.api.StreamTableEnvironment.validateAndExtractTimeAttributes(StreamTableEnvironment.scala:668) at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:549) at org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:136) at com.jd.flink.sql.demo.validate.schema.avro.AvroQuickStartMain.main(AvroQuickStartMain.java:145) The code is ok. tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street"); The code is ok. tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested,userActionTime.proctime"); > can't use proctime attribute when register datastream for table and exist > nested fields > --------------------------------------------------------------------------------------- > > Key: FLINK-14550 > URL: https://issues.apache.org/jira/browse/FLINK-14550 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API > Reporter: hehuiyuan > Priority: Major > > *_The data schame :_* > > final String schemaString = > " > {\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\"," > + "\"fields\": [ > {\"name\":\"name\",\"type\":\"string\"} > , > {\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]} > ," + > " > {\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]} > , > {\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]} > " + > ",\{\"name\":\"type_double_test\",\"type\":\"double\"}, > {\"name\":\"type_null_test\",\"type\":[\"null\",\"string\"]} > ," + > " > {\"name\":\"type_bool_test\",\"type\":[\"boolean\"]} > ,{\"name\":\"type_array_string\",\"type\":" + > > "\{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\"," > + > > "\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\", > {\"type\":\"array\"," + "\"items\":\"string\"} > ],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\"," + > > "\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\"," > + > "\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\", > {\"type\":\"fixed\",\"name\":\"Fixed16\"," + "\"size\":16} > ],\"size\":16}, > {\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]} > ," + > > *"{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[**{\"name\":\"num\"," > > +"\"type\":\"int\"}**,\{\"name\":\"street\",\"type\":\"string\"},\{\"name\":\"city\",\"type\":\"string\"}," > +* > > *"\{\"name\":\"state\",\"type\":\"string\"},\{\"name\":\"zip\",\"type\":\"string\"}]}]},* > {\"name\":\"type_bytes\"," + "\"type\":\"bytes\"} > ,\{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}," > + > > "\{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\"," > + > > "\"type\":\{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," > + > > "\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," > + > > "\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\"," > + > > "\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\"," > + > > "\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}"; > > *_The code :_* > tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street,userActionTime.proctime"); > > _*The error is as follows:*_ > Exception in thread "main" org.apache.flink.table.api.TableException: The > proctime attribute can only be appended to the table schema and not replace > an existing field. Please move 'userActionTime' to the end of the > schema.Exception in thread "main" org.apache.flink.table.api.TableException: > The proctime attribute can only be appended to the table schema and not > replace an existing field. Please move 'userActionTime' to the end of the > schema. at > org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractProctime$1(StreamTableEnvironment.scala:649) > at > org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:676) > at > org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:668) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at > org.apache.flink.table.api.StreamTableEnvironment.validateAndExtractTimeAttributes(StreamTableEnvironment.scala:668) > at > org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:549) > at > org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:136) > at > com.jd.flink.sql.demo.validate.schema.avro.AvroQuickStartMain.main(AvroQuickStartMain.java:145) > > > The code is ok. > tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street"); > > The code is ok. > tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested,userActionTime.proctime"); > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)