[ 
https://issues.apache.org/jira/browse/FLINK-11824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf reopened FLINK-11824:
--------------------------------------

Re-opening in accordance with https://issues.apache.org/jira/browse/FLINK-23206.

> Event-time attribute cannot have same name as in original format
> ----------------------------------------------------------------
>
>                 Key: FLINK-11824
>                 URL: https://issues.apache.org/jira/browse/FLINK-11824
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.7.2, 1.8.0
>            Reporter: Fabian Hueske
>            Priority: Minor
>              Labels: auto-closed
>
> When a table is defined, event-time attributes are typically defined by 
> linking them to an existing field in the original format (e.g., CSV, Avro, 
> JSON, ...). However, right now, the event-time attribute in the defined table 
> cannot have the same name as the original field.
> The following table definition fails with an exception:
> {code}
> // set up execution environment
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val tEnv = StreamTableEnvironment.create(env)
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val names: Array[String] = Array("name", "t")
>     val types: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG)
>     tEnv.connect(new Kafka()
>         .version("universal")
>         .topic("namesTopic")
>         .property("zookeeper.connect", "localhost:2181")
>         .property("bootstrap.servers", "localhost:9092")
>         .property("group.id", "testGroup"))
>       .withFormat(new Csv()
>         .schema(Types.ROW(names, types)))
>       .withSchema(new Schema()
>         .field("name", Types.STRING)
>         .field("t", Types.SQL_TIMESTAMP) // changing "t" to "t2" works
>           .rowtime(new Rowtime()
>             .timestampsFromField("t")
>             .watermarksPeriodicAscending()))
>       .inAppendMode()
>       .registerTableSource("Names")
> {code}
> {code}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Field 't' could not be resolved by the field mapping.
>       at 
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
>       at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>       at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>       at 
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
>       at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
>       at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
>       at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
>       at 
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to