Re: 对于kafka partition 设置时间戳及watermark
我现在用的flink 版本1.10.1 ,我点开 setAutoWatermarkInterval 看到private long autoWatermarkInterval = 0; 是否代表它默认的执行频率是0,我理解的意思抽取的时间戳同时生成watermark,它们是一一对应的,不知道我的理解是否正确 赵一旦 于2020年12月20日周日 下午11:15写道: > setAutoWatermarkInterval这个只是设置interval。决定你那个抽取ts的函数的执行频率的。 > > r pp 于2020年12月20日周日 上午10:49写道: > > > 是的 > > > > 张锴 于2020年12月19日周六 下午5:45写道: > > > > > 我按官网操作,重写了序列化方式 > > > > > > val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, > > > props)kafkaSource.assignTimestampsAndWatermarks(new > > > AscendingTimestampExtractor[MyType] { > > > def extractAscendingTimestamp(element: MyType): Long = > > > element.eventTimestamp}) > > > val stream: DataStream[MyType] = env.addSource(kafkaSource) > > > > > > *有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢? > > > > > >
Re: 对于kafka partition 设置时间戳及watermark
setAutoWatermarkInterval这个只是设置interval。决定你那个抽取ts的函数的执行频率的。 r pp 于2020年12月20日周日 上午10:49写道: > 是的 > > 张锴 于2020年12月19日周六 下午5:45写道: > > > 我按官网操作,重写了序列化方式 > > > > val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, > > props)kafkaSource.assignTimestampsAndWatermarks(new > > AscendingTimestampExtractor[MyType] { > > def extractAscendingTimestamp(element: MyType): Long = > > element.eventTimestamp}) > > val stream: DataStream[MyType] = env.addSource(kafkaSource) > > > > *有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢? > > >
Re: 对于kafka partition 设置时间戳及watermark
是的 张锴 于2020年12月19日周六 下午5:45写道: > 我按官网操作,重写了序列化方式 > > val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, > props)kafkaSource.assignTimestampsAndWatermarks(new > AscendingTimestampExtractor[MyType] { > def extractAscendingTimestamp(element: MyType): Long = > element.eventTimestamp}) > val stream: DataStream[MyType] = env.addSource(kafkaSource) > > *有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢? >