This issue was already resolved in another thread by the same author.

On 15.11.2018 10:52, Dominik Wosiński wrote:
Hey,

Could You please show a sample data that You want to process? This would help in verifying the issue.

Best Regards,
Dom.

wt., 13 lis 2018 o 13:58 Jeff Zhang <zjf...@gmail.com <mailto:zjf...@gmail.com>> napisał(a):

    Hi,

    I hit the following error when I try to use kafka connector in
    flink table api. There's very little document about how to use
    kafka connector in flink table api, could anyone help me on that ?
    Thanks

    Exception in thread "main"
    org.apache.flink.table.api.ValidationException: Field 'event_ts'
    could not be resolved by the field mapping.
    at
    
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
    at
    
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
    at
    
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
    at
    
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at
    
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at
    
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at
    scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at
    
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
    at
    
org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
    at
    
org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
    at
    
org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
    at
    
org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
    at
    
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
    at
    
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)

    And here's the source code:



      case class Record(status:String, direction:String,var event_ts: Timestamp)


       def main(args: Array[String]): Unit = {
         val senv = StreamExecutionEnvironment.getExecutionEnvironment 
senv.setParallelism(1)
         senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

         val data: DataStream[Record] = ...
         val tEnv = TableEnvironment.getTableEnvironment(senv)tEnv
           // declare the external system to connect to .connect(
           new Kafka()
             .version("0.11")
             .topic("processed5.events")
             .startFromEarliest()
             .property("zookeeper.connect","localhost:2181")
             .property("bootstrap.servers","localhost:9092"))
           .withFormat(new Json()
             .failOnMissingField(false)
             .deriveSchema()
           )
           .withSchema(
             new Schema()
               .field("status", Types.STRING)
               .field("direction", Types.STRING)
               .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
               new 
Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
           )

           // specify the update-mode for streaming tables .inAppendMode()

           // register as source, sink, or both and under a name 
.registerTableSourceAndSink("MyUserTable");

         tEnv.fromDataStream(data).insertInto("MyUserTable")


Reply via email to