Fabian Hueske created FLINK-11824:
-------------------------------------
Summary: Event-time attribute cannot have same name as in original
format
Key: FLINK-11824
URL: https://issues.apache.org/jira/browse/FLINK-11824
Project: Flink
Issue Type: Bug
Components: API / Table SQL
Affects Versions: 1.7.2, 1.8.0
Reporter: Fabian Hueske
When a table is defined, event-time attributes are typically defined by linking
them to an existing field in the original format (e.g., CSV, Avro, JSON, ...).
However, right now, the event-time attribute in the defined table cannot have
the same name as the original field.
The following table definition fails with an exception:
{code}
// set up execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val names: Array[String] = Array("name", "t")
val types: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG)
tEnv.connect(new Kafka()
.version("universal")
.topic("namesTopic")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "testGroup"))
.withFormat(new Csv()
.schema(Types.ROW(names, types)))
.withSchema(new Schema()
.field("name", Types.STRING)
.field("t", Types.SQL_TIMESTAMP) // changing "t" to "t2" works
.rowtime(new Rowtime()
.timestampsFromField("t")
.watermarksPeriodicAscending()))
.inAppendMode()
.registerTableSource("Names")
{code}
{code}
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field 't' could not be resolved by the field mapping.
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
at
org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
at
org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
at
org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)