Re: 对于kafka partition 设置时间戳及watermark

2020-12-20 文章 张锴
我现在用的flink 版本1.10.1 ,我点开 setAutoWatermarkInterval 看到private long autoWatermarkInterval = 0; 是否代表它默认的执行频率是0,我理解的意思抽取的时间戳同时生成watermark,它们是一一对应的,不知道我的理解是否正确 赵一旦 于2020年12月20日周日 下午11:15写道: > setAutoWatermarkInterval这个只是设置interval。决定你那个抽取ts的函数的执行频率的。 > > r pp 于2020年12月20日周日 上午10:49写道: > > > 是的 > > >

Re: 对于kafka partition 设置时间戳及watermark

2020-12-20 文章 赵一旦
setAutoWatermarkInterval这个只是设置interval。决定你那个抽取ts的函数的执行频率的。 r pp 于2020年12月20日周日 上午10:49写道: > 是的 > > 张锴 于2020年12月19日周六 下午5:45写道: > > > 我按官网操作,重写了序列化方式 > > > > val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, > > props)kafkaSource.assignTimestampsAndWatermarks(new > >

Re: 对于kafka partition 设置时间戳及watermark

2020-12-19 文章 r pp
是的 张锴 于2020年12月19日周六 下午5:45写道: > 我按官网操作,重写了序列化方式 > > val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, > props)kafkaSource.assignTimestampsAndWatermarks(new > AscendingTimestampExtractor[MyType] { > def extractAscendingTimestamp(element: MyType): Long = >

对于kafka partition 设置时间戳及watermark

2020-12-19 文章 张锴
我按官网操作,重写了序列化方式 val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] { def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp}) val stream: DataStream[MyType] =