Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。
同时,貌似基于 ContinuousEventTimeTrigger 也是不行的。 对于key1的window1来说,第一个element进入时,注册的 window.maxTimestamp 肯定是10小时后才触发。 基于 ContinuousEventTimeTrigger 注册的 timestamp - (timestamp % interval) 也差不多是10小时后才触发。 现在来看,要么我再覆盖实现为原逻辑。 要么我就重新实现个基于 eventTime 的 ContinuousEventTimeTrigger,让持续注册的timestamp不再基于 element 的 timestamp,而是基于 ctx.getCurrentWatermark。相当于我用watermark当做processtime用,实现一个 continuousWatermarkTrigger。这样应该可以让代码逻辑恢复正常,就是无法解决我上一个内容提到的问题,长时间延迟数据迅速恢复时导致的大量et trigger的触发。 yidan zhao 于2022年6月28日周二 12:40写道: > > 仔细回忆了下最初为啥要改造实现组合 eventTimeTrigger 和 > continuousProcessTimeTrigger。还是因为我的watermark是latestTs-10小时,我是考虑如果出现了数据延迟10小时左右的情况,那么我恢复的时候,数据补充进来,如果使用 > continuousEventTimeTrigger,10个小时的数据快速涌入,会快速导致很多此event time > trigger。相当于单key单window,很快触发 10小时/10s =3600次,全key全window肯定会爆炸。 > 考虑到这种情况下,我期望基于processTime进行continuous trigger,所以做了这种改造。 > > yidan zhao 于2022年6月28日周二 12:25写道: > > > > 刚刚贴完代码,就分析除问题来了,如下。 > > 我看了下,变化主要是 ContinuousProcessingTimeTrigger 中的注册 trigger 时的时间逻辑,加了个 > > long nextFireTimestamp = Math.min(time + interval, > > window.maxTimestamp()); 取min的逻辑。 > > 我这个任务的watermark是latestTs-10小时,因为这个任务特别,压力不大,但对数据完整性要求高,综合考虑这么做的。 > > 同时,为了避免10小时后才输出结果,而且需求上,窗口闭合前就需要输出结果,每10s一次,因此采用 continuousProcessTrigger > > 这种方式。 > > > > 我分析,对于同一个key下的窗口1,窗口闭合时trigger触发,然后触发下一次,然后会在同一个time继续registerTime,然后立即触发,死循环,这个过程对于每个key的每个窗口都需要持续10小时,因为窗口需要10小时后才会闭合。 > > > > > > > > 不清楚加这么个逻辑的目的是什么呢? 对于ContinuousProcessingTimeTrigger来说,是基于pt进行触发的,而 > > window.maxTimetamp() 在我的场景下是 et。 > > 找了下https://issues.apache.org/jira/browse/FLINK-20443对到这个jira,没看懂最终讨论了个啥结论,这貌似也不像是啥bug,为啥需要这么改呢。 > > > > yidan zhao 于2022年6月28日周二 11:48写道: > > > > > > 对比实验了下,就是自定义的 trigger > > > 问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger: > > > > > > public class ContinuousProcessTimeTriggerForEventTimeWindow > > > extends Trigger { > > > > > > private final EventTimeTrigger eventTimeTrigger; > > > > > > private final ContinuousProcessingTimeTrigger > > > continuousProcessTimeTrigger; > > > > > > public static ContinuousProcessTimeTriggerForEventTimeWindow > > > of(long windowUpdateTimeInSeconds) { > > > return new > > > ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds); > > > } > > > > > > private ContinuousProcessTimeTriggerForEventTimeWindow(long > > > windowUpdateTimeInSeconds) { > > > eventTimeTrigger = EventTimeTrigger.create(); > > > continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of( > > > Time.seconds(windowUpdateTimeInSeconds) > > > ); > > > } > > > > > > @Override > > > public TriggerResult onElement( > > > Object element, long timestamp, TimeWindow window, > > > TriggerContext ctx > > > ) throws Exception { > > > continuousProcessTimeTrigger.onElement(element, timestamp, > > > window, ctx); > > > return eventTimeTrigger.onElement(element, timestamp, window, > > > ctx); > > > } > > > > > > @Override > > > public TriggerResult onEventTime( > > > long time, TimeWindow window, TriggerContext ctx > > > ) throws Exception { > > > return eventTimeTrigger.onEventTime(time, window, ctx); > > > } > > > > > > @Override > > > public TriggerResult onProcessingTime( > > > long time, TimeWindow window, TriggerContext ctx > > > ) throws Exception { > > > return continuousProcessTimeTrigger.onProcessingTime(time, > > > window, ctx); > > > } > > > > > > @Override > > > public void clear( > > > TimeWindow window, TriggerContext ctx > > > ) throws Exception { > > > eventTimeTrigger.clear(window, ctx); > > > continuousProcessTimeTrigger.clear(window, ctx); > > > } > > > > > > @Override > > > public boolean canMerge() { > > > return true; > > > } > > > > > > @Override > > > public void onMerge( > > > TimeWindow window, OnMergeContext ctx > > > ) throws Exception { > > > eventTimeTrigger.onMerge(window, ctx); > > > continuousProcessTimeTrigger.onMerge(window, ctx); > > > } > > > } > > > > > > Shengkai Fang 于2022年6月28日周二 10:51写道: > > > > > > > > Hi. > > > > > > > > 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。 > > > > > > > > Best, > > > > Shengkai > > > > > > > > yidan zhao 于2022年6月28日周二 10:44写道: > > > > > > > > > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。 > > > > > > > > > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。 > > > > > window是event time window,配合自定义的 > > > > > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous > > > > > trigger,但是统计窗口是et window)。 > > > > > > > > > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。 > > > > >
Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。
仔细回忆了下最初为啥要改造实现组合 eventTimeTrigger 和 continuousProcessTimeTrigger。还是因为我的watermark是latestTs-10小时,我是考虑如果出现了数据延迟10小时左右的情况,那么我恢复的时候,数据补充进来,如果使用 continuousEventTimeTrigger,10个小时的数据快速涌入,会快速导致很多此event time trigger。相当于单key单window,很快触发 10小时/10s =3600次,全key全window肯定会爆炸。 考虑到这种情况下,我期望基于processTime进行continuous trigger,所以做了这种改造。 yidan zhao 于2022年6月28日周二 12:25写道: > > 刚刚贴完代码,就分析除问题来了,如下。 > 我看了下,变化主要是 ContinuousProcessingTimeTrigger 中的注册 trigger 时的时间逻辑,加了个 > long nextFireTimestamp = Math.min(time + interval, > window.maxTimestamp()); 取min的逻辑。 > 我这个任务的watermark是latestTs-10小时,因为这个任务特别,压力不大,但对数据完整性要求高,综合考虑这么做的。 > 同时,为了避免10小时后才输出结果,而且需求上,窗口闭合前就需要输出结果,每10s一次,因此采用 continuousProcessTrigger > 这种方式。 > > 我分析,对于同一个key下的窗口1,窗口闭合时trigger触发,然后触发下一次,然后会在同一个time继续registerTime,然后立即触发,死循环,这个过程对于每个key的每个窗口都需要持续10小时,因为窗口需要10小时后才会闭合。 > > > > 不清楚加这么个逻辑的目的是什么呢? 对于ContinuousProcessingTimeTrigger来说,是基于pt进行触发的,而 > window.maxTimetamp() 在我的场景下是 et。 > 找了下https://issues.apache.org/jira/browse/FLINK-20443对到这个jira,没看懂最终讨论了个啥结论,这貌似也不像是啥bug,为啥需要这么改呢。 > > yidan zhao 于2022年6月28日周二 11:48写道: > > > > 对比实验了下,就是自定义的 trigger > > 问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger: > > > > public class ContinuousProcessTimeTriggerForEventTimeWindow > > extends Trigger { > > > > private final EventTimeTrigger eventTimeTrigger; > > > > private final ContinuousProcessingTimeTrigger > > continuousProcessTimeTrigger; > > > > public static ContinuousProcessTimeTriggerForEventTimeWindow > > of(long windowUpdateTimeInSeconds) { > > return new > > ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds); > > } > > > > private ContinuousProcessTimeTriggerForEventTimeWindow(long > > windowUpdateTimeInSeconds) { > > eventTimeTrigger = EventTimeTrigger.create(); > > continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of( > > Time.seconds(windowUpdateTimeInSeconds) > > ); > > } > > > > @Override > > public TriggerResult onElement( > > Object element, long timestamp, TimeWindow window, > > TriggerContext ctx > > ) throws Exception { > > continuousProcessTimeTrigger.onElement(element, timestamp, window, > > ctx); > > return eventTimeTrigger.onElement(element, timestamp, window, ctx); > > } > > > > @Override > > public TriggerResult onEventTime( > > long time, TimeWindow window, TriggerContext ctx > > ) throws Exception { > > return eventTimeTrigger.onEventTime(time, window, ctx); > > } > > > > @Override > > public TriggerResult onProcessingTime( > > long time, TimeWindow window, TriggerContext ctx > > ) throws Exception { > > return continuousProcessTimeTrigger.onProcessingTime(time, window, > > ctx); > > } > > > > @Override > > public void clear( > > TimeWindow window, TriggerContext ctx > > ) throws Exception { > > eventTimeTrigger.clear(window, ctx); > > continuousProcessTimeTrigger.clear(window, ctx); > > } > > > > @Override > > public boolean canMerge() { > > return true; > > } > > > > @Override > > public void onMerge( > > TimeWindow window, OnMergeContext ctx > > ) throws Exception { > > eventTimeTrigger.onMerge(window, ctx); > > continuousProcessTimeTrigger.onMerge(window, ctx); > > } > > } > > > > Shengkai Fang 于2022年6月28日周二 10:51写道: > > > > > > Hi. > > > > > > 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。 > > > > > > Best, > > > Shengkai > > > > > > yidan zhao 于2022年6月28日周二 10:44写道: > > > > > > > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。 > > > > > > > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。 > > > > window是event time window,配合自定义的 > > > > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous > > > > trigger,但是统计窗口是et window)。 > > > > > > > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。 > > > >
Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。
刚刚贴完代码,就分析除问题来了,如下。 我看了下,变化主要是 ContinuousProcessingTimeTrigger 中的注册 trigger 时的时间逻辑,加了个 long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp()); 取min的逻辑。 我这个任务的watermark是latestTs-10小时,因为这个任务特别,压力不大,但对数据完整性要求高,综合考虑这么做的。 同时,为了避免10小时后才输出结果,而且需求上,窗口闭合前就需要输出结果,每10s一次,因此采用 continuousProcessTrigger 这种方式。 我分析,对于同一个key下的窗口1,窗口闭合时trigger触发,然后触发下一次,然后会在同一个time继续registerTime,然后立即触发,死循环,这个过程对于每个key的每个窗口都需要持续10小时,因为窗口需要10小时后才会闭合。 不清楚加这么个逻辑的目的是什么呢? 对于ContinuousProcessingTimeTrigger来说,是基于pt进行触发的,而 window.maxTimetamp() 在我的场景下是 et。 找了下https://issues.apache.org/jira/browse/FLINK-20443对到这个jira,没看懂最终讨论了个啥结论,这貌似也不像是啥bug,为啥需要这么改呢。 yidan zhao 于2022年6月28日周二 11:48写道: > > 对比实验了下,就是自定义的 trigger > 问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger: > > public class ContinuousProcessTimeTriggerForEventTimeWindow > extends Trigger { > > private final EventTimeTrigger eventTimeTrigger; > > private final ContinuousProcessingTimeTrigger > continuousProcessTimeTrigger; > > public static ContinuousProcessTimeTriggerForEventTimeWindow > of(long windowUpdateTimeInSeconds) { > return new > ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds); > } > > private ContinuousProcessTimeTriggerForEventTimeWindow(long > windowUpdateTimeInSeconds) { > eventTimeTrigger = EventTimeTrigger.create(); > continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of( > Time.seconds(windowUpdateTimeInSeconds) > ); > } > > @Override > public TriggerResult onElement( > Object element, long timestamp, TimeWindow window, > TriggerContext ctx > ) throws Exception { > continuousProcessTimeTrigger.onElement(element, timestamp, window, > ctx); > return eventTimeTrigger.onElement(element, timestamp, window, ctx); > } > > @Override > public TriggerResult onEventTime( > long time, TimeWindow window, TriggerContext ctx > ) throws Exception { > return eventTimeTrigger.onEventTime(time, window, ctx); > } > > @Override > public TriggerResult onProcessingTime( > long time, TimeWindow window, TriggerContext ctx > ) throws Exception { > return continuousProcessTimeTrigger.onProcessingTime(time, window, > ctx); > } > > @Override > public void clear( > TimeWindow window, TriggerContext ctx > ) throws Exception { > eventTimeTrigger.clear(window, ctx); > continuousProcessTimeTrigger.clear(window, ctx); > } > > @Override > public boolean canMerge() { > return true; > } > > @Override > public void onMerge( > TimeWindow window, OnMergeContext ctx > ) throws Exception { > eventTimeTrigger.onMerge(window, ctx); > continuousProcessTimeTrigger.onMerge(window, ctx); > } > } > > Shengkai Fang 于2022年6月28日周二 10:51写道: > > > > Hi. > > > > 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。 > > > > Best, > > Shengkai > > > > yidan zhao 于2022年6月28日周二 10:44写道: > > > > > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。 > > > > > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。 > > > window是event time window,配合自定义的 > > > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous > > > trigger,但是统计窗口是et window)。 > > > > > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。 > > >
Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。
对比实验了下,就是自定义的 trigger 问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger: public class ContinuousProcessTimeTriggerForEventTimeWindow extends Trigger { private final EventTimeTrigger eventTimeTrigger; private final ContinuousProcessingTimeTrigger continuousProcessTimeTrigger; public static ContinuousProcessTimeTriggerForEventTimeWindow of(long windowUpdateTimeInSeconds) { return new ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds); } private ContinuousProcessTimeTriggerForEventTimeWindow(long windowUpdateTimeInSeconds) { eventTimeTrigger = EventTimeTrigger.create(); continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of( Time.seconds(windowUpdateTimeInSeconds) ); } @Override public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx ) throws Exception { continuousProcessTimeTrigger.onElement(element, timestamp, window, ctx); return eventTimeTrigger.onElement(element, timestamp, window, ctx); } @Override public TriggerResult onEventTime( long time, TimeWindow window, TriggerContext ctx ) throws Exception { return eventTimeTrigger.onEventTime(time, window, ctx); } @Override public TriggerResult onProcessingTime( long time, TimeWindow window, TriggerContext ctx ) throws Exception { return continuousProcessTimeTrigger.onProcessingTime(time, window, ctx); } @Override public void clear( TimeWindow window, TriggerContext ctx ) throws Exception { eventTimeTrigger.clear(window, ctx); continuousProcessTimeTrigger.clear(window, ctx); } @Override public boolean canMerge() { return true; } @Override public void onMerge( TimeWindow window, OnMergeContext ctx ) throws Exception { eventTimeTrigger.onMerge(window, ctx); continuousProcessTimeTrigger.onMerge(window, ctx); } } Shengkai Fang 于2022年6月28日周二 10:51写道: > > Hi. > > 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。 > > Best, > Shengkai > > yidan zhao 于2022年6月28日周二 10:44写道: > > > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。 > > > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。 > > window是event time window,配合自定义的 > > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous > > trigger,但是统计窗口是et window)。 > > > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。 > >
Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。
Hi. 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。 Best, Shengkai yidan zhao 于2022年6月28日周二 10:44写道: > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。 > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。 > window是event time window,配合自定义的 > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous > trigger,但是统计窗口是et window)。 > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。 >
flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。
目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。 window是event time window,配合自定义的 continuousProcessTriggerForEventTimeWindow(基于pt进行continuous trigger,但是统计窗口是et window)。 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。