Hey,

Could You please show a sample data that You want to process? This would
help in verifying the issue.

Best Regards,
Dom.

wt., 13 lis 2018 o 13:58 Jeff Zhang <zjf...@gmail.com> napisał(a):

> Hi,
>
> I hit the following error when I try to use kafka connector in flink table
> api. There's very little document about how to use kafka connector in flink
> table api, could anyone help me on that ? Thanks
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field 'event_ts' could not be resolved by the field mapping.
> at
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
> at
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
> at
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
> at
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
> at
> org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
> at
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
> at
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
>
> And here's the source code:
>
>
>
>  case class Record(status: String, direction: String, var event_ts: Timestamp)
>
>
>   def main(args: Array[String]): Unit = {
>     val senv = StreamExecutionEnvironment.getExecutionEnvironment
>     senv.setParallelism(1)
>     senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>     val data: DataStream[Record] = ...
>     val tEnv = TableEnvironment.getTableEnvironment(senv)
>     tEnv
>       // declare the external system to connect to
>       .connect(
>       new Kafka()
>         .version("0.11")
>         .topic("processed5.events")
>         .startFromEarliest()
>         .property("zookeeper.connect", "localhost:2181")
>         .property("bootstrap.servers", "localhost:9092"))
>       .withFormat(new Json()
>         .failOnMissingField(false)
>         .deriveSchema()
>       )
>       .withSchema(
>         new Schema()
>           .field("status", Types.STRING)
>           .field("direction", Types.STRING)
>           .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
>           new 
> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>       )
>
>       // specify the update-mode for streaming tables
>       .inAppendMode()
>
>       // register as source, sink, or both and under a name
>       .registerTableSourceAndSink("MyUserTable");
>
>     tEnv.fromDataStream(data).insertInto("MyUserTable")
>
>

Reply via email to