Re: kafka的多分区watermark
谢谢你,我想明白了 Shuai Xia 于2020年12月14日周一 下午2:08写道: > > Hi,没有太理解你的意思,这个MyType只是说你可以把Kafka的数据反序列化后使用,像SimpleStringSchema默认是String,你可以对他进行解析 > > > -- > 发件人:张锴 > 发送时间:2020年12月14日(星期一) 13:51 > 收件人:user-zh > 主 题:kafka的多分区watermark > > 在官网看到对于Kafka分区的时间戳定义描述,给出了示例,如下图: > > FlinkKafkaConsumer09 kafkaSource = new > FlinkKafkaConsumer09<>("myTopic", schema, > props);kafkaSource.assignTimestampsAndWatermarks(new > AscendingTimestampExtractor() { > > @Override > public long extractAscendingTimestamp(MyType element) { > return element.eventTimestamp(); > }}); > DataStream stream = env.addSource(kafkaSource); > > *不太理解这个里面泛型传的是用户定义的case class,还是传*ConsumerRecord,从他里面提取时间戳 >
回复:kafka的多分区watermark
Hi,没有太理解你的意思,这个MyType只是说你可以把Kafka的数据反序列化后使用,像SimpleStringSchema默认是String,你可以对他进行解析 -- 发件人:张锴 发送时间:2020年12月14日(星期一) 13:51 收件人:user-zh 主 题:kafka的多分区watermark 在官网看到对于Kafka分区的时间戳定义描述,给出了示例,如下图: FlinkKafkaConsumer09 kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { @Override public long extractAscendingTimestamp(MyType element) { return element.eventTimestamp(); }}); DataStream stream = env.addSource(kafkaSource); *不太理解这个里面泛型传的是用户定义的case class,还是传*ConsumerRecord,从他里面提取时间戳
kafka的多分区watermark
在官网看到对于Kafka分区的时间戳定义描述,给出了示例,如下图: FlinkKafkaConsumer09 kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { @Override public long extractAscendingTimestamp(MyType element) { return element.eventTimestamp(); }}); DataStream stream = env.addSource(kafkaSource); *不太理解这个里面泛型传的是用户定义的case class,还是传*ConsumerRecord,从他里面提取时间戳