[ https://issues.apache.org/jira/browse/FLINK-11824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-11824: ----------------------------------- Labels: stale-minor (was: ) > Event-time attribute cannot have same name as in original format > ---------------------------------------------------------------- > > Key: FLINK-11824 > URL: https://issues.apache.org/jira/browse/FLINK-11824 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.7.2, 1.8.0 > Reporter: Fabian Hueske > Priority: Minor > Labels: stale-minor > > When a table is defined, event-time attributes are typically defined by > linking them to an existing field in the original format (e.g., CSV, Avro, > JSON, ...). However, right now, the event-time attribute in the defined table > cannot have the same name as the original field. > The following table definition fails with an exception: > {code} > // set up execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val names: Array[String] = Array("name", "t") > val types: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG) > tEnv.connect(new Kafka() > .version("universal") > .topic("namesTopic") > .property("zookeeper.connect", "localhost:2181") > .property("bootstrap.servers", "localhost:9092") > .property("group.id", "testGroup")) > .withFormat(new Csv() > .schema(Types.ROW(names, types))) > .withSchema(new Schema() > .field("name", Types.STRING) > .field("t", Types.SQL_TIMESTAMP) // changing "t" to "t2" works > .rowtime(new Rowtime() > .timestampsFromField("t") > .watermarksPeriodicAscending())) > .inAppendMode() > .registerTableSource("Names") > {code} > {code} > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field 't' could not be resolved by the field mapping. > at > org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > at > org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521) > at > org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127) > at > org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33) > at > org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150) > at > org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541) > at > org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)