Thanks hequn, it is very helpful On Wed, Nov 14, 2018 at 2:32 PM Hequn Cheng <chenghe...@gmail.com> wrote:
> Hi jeff, > > We need a different field name for the rowtime indicator, something looks > like: > >> new Schema() >> .field("status", Types.STRING) >> .field("direction", Types.STRING) >> .field("rowtime", Types.SQL_TIMESTAMP).rowtime( >> new >> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending()) > > > Furthermore, we should define another sink schema which contains no > rowtime definitions, since currently time attributes and custom field > mappings are not supported yet for sink. > >> val sinkSchema = >> new Schema() >> .field("status", Types.STRING) >> .field("direction", Types.STRING) >> .field("rowtime", Types.SQL_TIMESTAMP) > > > Btw, a unified api for source and sink is under discussion now. More > details here[1] > > Best, Hequn > > [1] > https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#heading=h.41fd6rs7b3cf > > > On Wed, Nov 14, 2018 at 9:18 AM Jeff Zhang <zjf...@gmail.com> wrote: > >> >> Hi, >> >> I hit the following error when I try to use kafka connector in flink >> table api. There's very little document about how to use kafka connector in >> flink table api, could anyone help me on that ? Thanks >> >> Exception in thread "main" >> org.apache.flink.table.api.ValidationException: Field 'event_ts' could not >> be resolved by the field mapping. >> at >> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491) >> at >> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521) >> at >> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) >> at >> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521) >> at >> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127) >> at >> org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33) >> at >> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150) >> at >> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541) >> at >> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47) >> at >> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68) >> >> And here's the source code: >> >> >> >> case class Record(status: String, direction: String, var event_ts: >> Timestamp) >> >> >> def main(args: Array[String]): Unit = { >> val senv = StreamExecutionEnvironment.getExecutionEnvironment >> senv.setParallelism(1) >> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> >> val data: DataStream[Record] = ... >> val tEnv = TableEnvironment.getTableEnvironment(senv) >> tEnv >> // declare the external system to connect to >> .connect( >> new Kafka() >> .version("0.11") >> .topic("processed5.events") >> .startFromEarliest() >> .property("zookeeper.connect", "localhost:2181") >> .property("bootstrap.servers", "localhost:9092")) >> .withFormat(new Json() >> .failOnMissingField(false) >> .deriveSchema() >> ) >> .withSchema( >> new Schema() >> .field("status", Types.STRING) >> .field("direction", Types.STRING) >> .field("event_ts", Types.SQL_TIMESTAMP).rowtime( >> new >> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending()) >> ) >> >> // specify the update-mode for streaming tables >> .inAppendMode() >> >> // register as source, sink, or both and under a name >> .registerTableSourceAndSink("MyUserTable"); >> >> tEnv.fromDataStream(data).insertInto("MyUserTable") >> >> 0封新邮件 >> 回复 >> >> -- Best Regards Jeff Zhang