[ 
https://issues.apache.org/jira/browse/FLINK-14550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hehuiyuan updated FLINK-14550:
------------------------------
    Description: 
*_The data schame :_*

 

final String schemaString =
 "

{\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\","
 + "\"fields\": [

{\"name\":\"name\",\"type\":\"string\"}

,

{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]}

," +
 "

{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}

,

{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]}

" +
 ",\{\"name\":\"type_double_test\",\"type\":\"double\"},

{\"name\":\"type_null_test\",\"type\":[\"null\",\"string\"]}

," +
 "

{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]}

,{\"name\":\"type_array_string\",\"type\":" +
 
"\{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\","
 +
 "\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",

{\"type\":\"array\"," + "\"items\":\"string\"}

],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\"," +
 
"\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\","
 +
 "\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\",

{\"type\":\"fixed\",\"name\":\"Fixed16\"," + "\"size\":16}

],\"size\":16},

{\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]}

," +
 
*"{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[**{\"name\":\"num\","
 
+"\"type\":\"int\"}**,\{\"name\":\"street\",\"type\":\"string\"},\{\"name\":\"city\",\"type\":\"string\"},"
 +*
 
*"\{\"name\":\"state\",\"type\":\"string\"},\{\"name\":\"zip\",\"type\":\"string\"}]}]},*

{\"name\":\"type_bytes\"," + "\"type\":\"bytes\"}

,\{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
 +
 
"\{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\","
 +
 
"\"type\":\{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\","
 +
 
"\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\","
 +
 
"\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\","
 +
 
"\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\","
 +
 
"\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}";

 

*_The code :_*

tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street,userActionTime.proctime");

 

_*The error is as follows:*_

Exception in thread "main" org.apache.flink.table.api.TableException: The 
proctime attribute can only be appended to the table schema and not replace an 
existing field. Please move 'userActionTime' to the end of the schema.Exception 
in thread "main" org.apache.flink.table.api.TableException: The proctime 
attribute can only be appended to the table schema and not replace an existing 
field. Please move 'userActionTime' to the end of the schema. at 
org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractProctime$1(StreamTableEnvironment.scala:649)
 at 
org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:676)
 at 
org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:668)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
org.apache.flink.table.api.StreamTableEnvironment.validateAndExtractTimeAttributes(StreamTableEnvironment.scala:668)
 at 
org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:549)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:136)
 at 
com.jd.flink.sql.demo.validate.schema.avro.AvroQuickStartMain.main(AvroQuickStartMain.java:145)

 

 

The code is ok.

tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street");

 

The code is ok.

tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested,userActionTime.proctime");

 

 

 

  was:
*_The data schame :_*

 

final String schemaString =
 "

{\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\","
 + "\"fields\": [\\{\"name\":\"name\",\"type\":\"string\"}

,{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]}," +
 
"{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]}"
 +
 
",\{\"name\":\"type_double_test\",\"type\":\"double\"},{\"name\":\"type_null_test\",\"type\":[\"null\",\"string\"]},"
 +
 
"{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":"
 +
 
"\{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\","
 +
 "\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",

{\"type\":\"array\"," + "\"items\":\"string\"}

],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\"," +
 
"\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\","
 +
 "\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\",

{\"type\":\"fixed\",\"name\":\"Fixed16\"," + "\"size\":16}

],\"size\":16},{\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]},"
 +
*"{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[*

*{\"name\":\"num\"," +"\"type\":\"int\"}*

*,\{\"name\":\"street\",\"type\":\"string\"},\{\"name\":\"city\",\"type\":\"string\"},"
 +*
 
*"\{\"name\":\"state\",\"type\":\"string\"},\{\"name\":\"zip\",\"type\":\"string\"}]}]},*

{\"name\":\"type_bytes\"," + "\"type\":\"bytes\"}

,\{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
 +
 
"\{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\","
 +
 
"\"type\":\{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\","
 +
 
"\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\","
 +
 
"\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\","
 +
 
"\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\","
 +
 
"\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}";

 

*_The code :_*

tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street,userActionTime.proctime");

 

_*The error is as follows:*_

Exception in thread "main" org.apache.flink.table.api.TableException: The 
proctime attribute can only be appended to the table schema and not replace an 
existing field. Please move 'userActionTime' to the end of the schema.Exception 
in thread "main" org.apache.flink.table.api.TableException: The proctime 
attribute can only be appended to the table schema and not replace an existing 
field. Please move 'userActionTime' to the end of the schema. at 
org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractProctime$1(StreamTableEnvironment.scala:649)
 at 
org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:676)
 at 
org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:668)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
org.apache.flink.table.api.StreamTableEnvironment.validateAndExtractTimeAttributes(StreamTableEnvironment.scala:668)
 at 
org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:549)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:136)
 at 
com.jd.flink.sql.demo.validate.schema.avro.AvroQuickStartMain.main(AvroQuickStartMain.java:145)

 

 

The code is ok.

tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street");

 

The code is ok.

tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested,userActionTime.proctime");

 

 

 


> can't use proctime attribute when register datastream for table and exist 
> nested fields
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-14550
>                 URL: https://issues.apache.org/jira/browse/FLINK-14550
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / API
>            Reporter: hehuiyuan
>            Priority: Major
>
> *_The data schame :_*
>  
> final String schemaString =
>  "
> {\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\","
>  + "\"fields\": [
> {\"name\":\"name\",\"type\":\"string\"}
> ,
> {\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]}
> ," +
>  "
> {\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}
> ,
> {\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]}
> " +
>  ",\{\"name\":\"type_double_test\",\"type\":\"double\"},
> {\"name\":\"type_null_test\",\"type\":[\"null\",\"string\"]}
> ," +
>  "
> {\"name\":\"type_bool_test\",\"type\":[\"boolean\"]}
> ,{\"name\":\"type_array_string\",\"type\":" +
>  
> "\{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\","
>  +
>  
> "\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",
> {\"type\":\"array\"," + "\"items\":\"string\"}
> ],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\"," +
>  
> "\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\","
>  +
>  "\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\",
> {\"type\":\"fixed\",\"name\":\"Fixed16\"," + "\"size\":16}
> ],\"size\":16},
> {\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]}
> ," +
>  
> *"{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[**{\"name\":\"num\","
>  
> +"\"type\":\"int\"}**,\{\"name\":\"street\",\"type\":\"string\"},\{\"name\":\"city\",\"type\":\"string\"},"
>  +*
>  
> *"\{\"name\":\"state\",\"type\":\"string\"},\{\"name\":\"zip\",\"type\":\"string\"}]}]},*
> {\"name\":\"type_bytes\"," + "\"type\":\"bytes\"}
> ,\{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
>  +
>  
> "\{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\","
>  +
>  
> "\"type\":\{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\","
>  +
>  
> "\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\","
>  +
>  
> "\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\","
>  +
>  
> "\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\","
>  +
>  
> "\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}";
>  
> *_The code :_*
> tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street,userActionTime.proctime");
>  
> _*The error is as follows:*_
> Exception in thread "main" org.apache.flink.table.api.TableException: The 
> proctime attribute can only be appended to the table schema and not replace 
> an existing field. Please move 'userActionTime' to the end of the 
> schema.Exception in thread "main" org.apache.flink.table.api.TableException: 
> The proctime attribute can only be appended to the table schema and not 
> replace an existing field. Please move 'userActionTime' to the end of the 
> schema. at 
> org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractProctime$1(StreamTableEnvironment.scala:649)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:676)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:668)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
> org.apache.flink.table.api.StreamTableEnvironment.validateAndExtractTimeAttributes(StreamTableEnvironment.scala:668)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:549)
>  at 
> org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:136)
>  at 
> com.jd.flink.sql.demo.validate.schema.avro.AvroQuickStartMain.main(AvroQuickStartMain.java:145)
>  
>  
> The code is ok.
> tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street");
>  
> The code is ok.
> tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested,userActionTime.proctime");
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to