Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-22 文章 赵一旦
connect前生成watermark也是可以的应该,但是你需要把ruleConfigSource流也赋watermark。我猜是这个地方出问题了。 huang botao 于2020年11月19日周四 下午12:58写道: > hi, zhisheng, hailongwang: > > 感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect() > 后面没有定义watermar导致,在connect后指定watermark就可以触发window了。 > > > > On Wed, Nov 18, 2020 at 10:46 PM

Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 文章 huang botao
hi, zhisheng, hailongwang: 感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect() 后面没有定义watermar导致,在connect后指定watermark就可以触发window了。 On Wed, Nov 18, 2020 at 10:46 PM zhisheng wrote: > 可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。 > > Best > zhisheng > > huang botao 于2020年11月18日周三

Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 文章 zhisheng
可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。 Best zhisheng huang botao 于2020年11月18日周三 下午10:34写道: > 感谢您的回复,是这样的,我这边的环境设置用的是eventTime > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); >

Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 文章 huang botao
感谢您的回复,是这样的,我这边的环境设置用的是eventTime StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法 On Wed, Nov 18, 2020 at 5:50 PM hailongwang

Re:求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 文章 hailongwang
应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进 在 2020-11-18 15:29:54,"huang botao" 写道: >Hi ,请教一个奇怪的问题: > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction()) > >.assignTimestampsAndWatermarks(new >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds())) > >

求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 文章 huang botao
Hi ,请教一个奇怪的问题: streamSource.flatMap(new ComeIntoMaxFlatMapFunction()) .assignTimestampsAndWatermarks(new CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds())) .connect(ruleConfigSource) .process(new MetricDataFilterProcessFunction()) .keyBy((KeySelector) metric ->