Re: kafka的多分区watermark

2020-12-13 文章 张锴
谢谢你,我想明白了

Shuai Xia  于2020年12月14日周一 下午2:08写道:

>
> Hi,没有太理解你的意思,这个MyType只是说你可以把Kafka的数据反序列化后使用,像SimpleStringSchema默认是String,你可以对他进行解析
>
>
> --
> 发件人:张锴 
> 发送时间:2020年12月14日(星期一) 13:51
> 收件人:user-zh 
> 主 题:kafka的多分区watermark
>
> 在官网看到对于Kafka分区的时间戳定义描述,给出了示例,如下图:
>
> FlinkKafkaConsumer09 kafkaSource = new
> FlinkKafkaConsumer09<>("myTopic", schema,
> props);kafkaSource.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor() {
>
> @Override
> public long extractAscendingTimestamp(MyType element) {
> return element.eventTimestamp();
> }});
> DataStream stream = env.addSource(kafkaSource);
>
> *不太理解这个里面泛型传的是用户定义的case class,还是传*ConsumerRecord,从他里面提取时间戳
>


回复:kafka的多分区watermark

2020-12-13 文章 Shuai Xia
Hi,没有太理解你的意思,这个MyType只是说你可以把Kafka的数据反序列化后使用,像SimpleStringSchema默认是String,你可以对他进行解析


--
发件人:张锴 
发送时间:2020年12月14日(星期一) 13:51
收件人:user-zh 
主 题:kafka的多分区watermark

在官网看到对于Kafka分区的时间戳定义描述,给出了示例,如下图:

FlinkKafkaConsumer09 kafkaSource = new
FlinkKafkaConsumer09<>("myTopic", schema,
props);kafkaSource.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor() {

@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}});
DataStream stream = env.addSource(kafkaSource);

*不太理解这个里面泛型传的是用户定义的case class,还是传*ConsumerRecord,从他里面提取时间戳


kafka的多分区watermark

2020-12-13 文章 张锴
在官网看到对于Kafka分区的时间戳定义描述,给出了示例,如下图:

FlinkKafkaConsumer09 kafkaSource = new
FlinkKafkaConsumer09<>("myTopic", schema,
props);kafkaSource.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor() {

@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}});
DataStream stream = env.addSource(kafkaSource);

*不太理解这个里面泛型传的是用户定义的case class,还是传*ConsumerRecord,从他里面提取时间戳