Thanks hequn, it is very helpful

On Wed, Nov 14, 2018 at 2:32 PM Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi jeff,
>
> We need a different field name for the rowtime indicator, something looks
> like:
>
>>       new Schema()
>>         .field("status", Types.STRING)
>>         .field("direction", Types.STRING)
>>         .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
>>         new
>> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>
>
> Furthermore, we should define another sink schema which contains no
> rowtime definitions, since currently time attributes and custom field
> mappings are not supported yet for sink.
>
>>     val sinkSchema =
>>       new Schema()
>>         .field("status", Types.STRING)
>>         .field("direction", Types.STRING)
>>         .field("rowtime", Types.SQL_TIMESTAMP)
>
>
> Btw, a unified api for source and sink is under discussion now. More
> details here[1]
>
> Best, Hequn
>
> [1]
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#heading=h.41fd6rs7b3cf
>
>
> On Wed, Nov 14, 2018 at 9:18 AM Jeff Zhang <zjf...@gmail.com> wrote:
>
>>
>> Hi,
>>
>> I hit the following error when I try to use kafka connector in flink
>> table api. There's very little document about how to use kafka connector in
>> flink table api, could anyone help me on that ? Thanks
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Field 'event_ts' could not
>> be resolved by the field mapping.
>> at
>> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
>> at
>> org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
>> at
>> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
>> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
>> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
>>
>> And here's the source code:
>>
>>
>>
>>  case class Record(status: String, direction: String, var event_ts: 
>> Timestamp)
>>
>>
>>   def main(args: Array[String]): Unit = {
>>     val senv = StreamExecutionEnvironment.getExecutionEnvironment
>>     senv.setParallelism(1)
>>     senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>>     val data: DataStream[Record] = ...
>>     val tEnv = TableEnvironment.getTableEnvironment(senv)
>>     tEnv
>>       // declare the external system to connect to
>>       .connect(
>>       new Kafka()
>>         .version("0.11")
>>         .topic("processed5.events")
>>         .startFromEarliest()
>>         .property("zookeeper.connect", "localhost:2181")
>>         .property("bootstrap.servers", "localhost:9092"))
>>       .withFormat(new Json()
>>         .failOnMissingField(false)
>>         .deriveSchema()
>>       )
>>       .withSchema(
>>         new Schema()
>>           .field("status", Types.STRING)
>>           .field("direction", Types.STRING)
>>           .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
>>           new 
>> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>>       )
>>
>>       // specify the update-mode for streaming tables
>>       .inAppendMode()
>>
>>       // register as source, sink, or both and under a name
>>       .registerTableSourceAndSink("MyUserTable");
>>
>>     tEnv.fromDataStream(data).insertInto("MyUserTable")
>>
>> 0封新邮件
>> 回复
>>
>>

-- 
Best Regards

Jeff Zhang

Reply via email to