Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。

2022-06-27 文章 yidan zhao
同时,貌似基于 ContinuousEventTimeTrigger 也是不行的。
对于key1的window1来说,第一个element进入时,注册的 window.maxTimestamp 肯定是10小时后才触发。
基于 ContinuousEventTimeTrigger 注册的 timestamp - (timestamp % interval)
也差不多是10小时后才触发。


现在来看,要么我再覆盖实现为原逻辑。  要么我就重新实现个基于 eventTime 的
ContinuousEventTimeTrigger,让持续注册的timestamp不再基于 element 的
timestamp,而是基于 ctx.getCurrentWatermark。相当于我用watermark当做processtime用,实现一个
continuousWatermarkTrigger。这样应该可以让代码逻辑恢复正常,就是无法解决我上一个内容提到的问题,长时间延迟数据迅速恢复时导致的大量et
trigger的触发。

yidan zhao  于2022年6月28日周二 12:40写道:
>
> 仔细回忆了下最初为啥要改造实现组合 eventTimeTrigger 和
> continuousProcessTimeTrigger。还是因为我的watermark是latestTs-10小时,我是考虑如果出现了数据延迟10小时左右的情况,那么我恢复的时候,数据补充进来,如果使用
> continuousEventTimeTrigger,10个小时的数据快速涌入,会快速导致很多此event time
> trigger。相当于单key单window,很快触发 10小时/10s =3600次,全key全window肯定会爆炸。
> 考虑到这种情况下,我期望基于processTime进行continuous trigger,所以做了这种改造。
>
> yidan zhao  于2022年6月28日周二 12:25写道:
> >
> > 刚刚贴完代码,就分析除问题来了,如下。
> > 我看了下,变化主要是 ContinuousProcessingTimeTrigger 中的注册 trigger 时的时间逻辑,加了个
> > long nextFireTimestamp = Math.min(time + interval,
> > window.maxTimestamp()); 取min的逻辑。
> > 我这个任务的watermark是latestTs-10小时,因为这个任务特别,压力不大,但对数据完整性要求高,综合考虑这么做的。
> > 同时,为了避免10小时后才输出结果,而且需求上,窗口闭合前就需要输出结果,每10s一次,因此采用 continuousProcessTrigger 
> > 这种方式。
> >
> > 我分析,对于同一个key下的窗口1,窗口闭合时trigger触发,然后触发下一次,然后会在同一个time继续registerTime,然后立即触发,死循环,这个过程对于每个key的每个窗口都需要持续10小时,因为窗口需要10小时后才会闭合。
> >
> >
> >
> > 不清楚加这么个逻辑的目的是什么呢? 对于ContinuousProcessingTimeTrigger来说,是基于pt进行触发的,而
> > window.maxTimetamp() 在我的场景下是 et。
> > 找了下https://issues.apache.org/jira/browse/FLINK-20443对到这个jira,没看懂最终讨论了个啥结论,这貌似也不像是啥bug,为啥需要这么改呢。
> >
> > yidan zhao  于2022年6月28日周二 11:48写道:
> > >
> > > 对比实验了下,就是自定义的 trigger
> > > 问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger:
> > >
> > > public class ContinuousProcessTimeTriggerForEventTimeWindow
> > > extends Trigger {
> > >
> > > private final EventTimeTrigger eventTimeTrigger;
> > >
> > > private final ContinuousProcessingTimeTrigger
> > > continuousProcessTimeTrigger;
> > >
> > > public static ContinuousProcessTimeTriggerForEventTimeWindow
> > > of(long windowUpdateTimeInSeconds) {
> > > return new
> > > ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds);
> > > }
> > >
> > > private ContinuousProcessTimeTriggerForEventTimeWindow(long
> > > windowUpdateTimeInSeconds) {
> > > eventTimeTrigger = EventTimeTrigger.create();
> > > continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of(
> > > Time.seconds(windowUpdateTimeInSeconds)
> > > );
> > > }
> > >
> > > @Override
> > > public TriggerResult onElement(
> > > Object element, long timestamp, TimeWindow window,
> > > TriggerContext ctx
> > > ) throws Exception {
> > > continuousProcessTimeTrigger.onElement(element, timestamp, 
> > > window, ctx);
> > > return eventTimeTrigger.onElement(element, timestamp, window, 
> > > ctx);
> > > }
> > >
> > > @Override
> > > public TriggerResult onEventTime(
> > > long time, TimeWindow window, TriggerContext ctx
> > > ) throws Exception {
> > > return eventTimeTrigger.onEventTime(time, window, ctx);
> > > }
> > >
> > > @Override
> > > public TriggerResult onProcessingTime(
> > > long time, TimeWindow window, TriggerContext ctx
> > > ) throws Exception {
> > > return continuousProcessTimeTrigger.onProcessingTime(time, 
> > > window, ctx);
> > > }
> > >
> > > @Override
> > > public void clear(
> > > TimeWindow window, TriggerContext ctx
> > > ) throws Exception {
> > > eventTimeTrigger.clear(window, ctx);
> > > continuousProcessTimeTrigger.clear(window, ctx);
> > > }
> > >
> > > @Override
> > > public boolean canMerge() {
> > > return true;
> > > }
> > >
> > > @Override
> > > public void onMerge(
> > > TimeWindow window, OnMergeContext ctx
> > > ) throws Exception {
> > > eventTimeTrigger.onMerge(window, ctx);
> > > continuousProcessTimeTrigger.onMerge(window, ctx);
> > > }
> > > }
> > >
> > > Shengkai Fang  于2022年6月28日周二 10:51写道:
> > > >
> > > > Hi.
> > > >
> > > > 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。
> > > >
> > > > Best,
> > > > Shengkai
> > > >
> > > > yidan zhao  于2022年6月28日周二 10:44写道:
> > > >
> > > > > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
> > > > >
> > > > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> > > > > window是event time window,配合自定义的
> > > > > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> > > > > trigger,但是统计窗口是et window)。
> > > > >
> > > > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
> > > > >


Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。

2022-06-27 文章 yidan zhao
仔细回忆了下最初为啥要改造实现组合 eventTimeTrigger 和
continuousProcessTimeTrigger。还是因为我的watermark是latestTs-10小时,我是考虑如果出现了数据延迟10小时左右的情况,那么我恢复的时候,数据补充进来,如果使用
continuousEventTimeTrigger,10个小时的数据快速涌入,会快速导致很多此event time
trigger。相当于单key单window,很快触发 10小时/10s =3600次,全key全window肯定会爆炸。
考虑到这种情况下,我期望基于processTime进行continuous trigger,所以做了这种改造。

yidan zhao  于2022年6月28日周二 12:25写道:
>
> 刚刚贴完代码,就分析除问题来了,如下。
> 我看了下,变化主要是 ContinuousProcessingTimeTrigger 中的注册 trigger 时的时间逻辑,加了个
> long nextFireTimestamp = Math.min(time + interval,
> window.maxTimestamp()); 取min的逻辑。
> 我这个任务的watermark是latestTs-10小时,因为这个任务特别,压力不大,但对数据完整性要求高,综合考虑这么做的。
> 同时,为了避免10小时后才输出结果,而且需求上,窗口闭合前就需要输出结果,每10s一次,因此采用 continuousProcessTrigger 
> 这种方式。
>
> 我分析,对于同一个key下的窗口1,窗口闭合时trigger触发,然后触发下一次,然后会在同一个time继续registerTime,然后立即触发,死循环,这个过程对于每个key的每个窗口都需要持续10小时,因为窗口需要10小时后才会闭合。
>
>
>
> 不清楚加这么个逻辑的目的是什么呢? 对于ContinuousProcessingTimeTrigger来说,是基于pt进行触发的,而
> window.maxTimetamp() 在我的场景下是 et。
> 找了下https://issues.apache.org/jira/browse/FLINK-20443对到这个jira,没看懂最终讨论了个啥结论,这貌似也不像是啥bug,为啥需要这么改呢。
>
> yidan zhao  于2022年6月28日周二 11:48写道:
> >
> > 对比实验了下,就是自定义的 trigger
> > 问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger:
> >
> > public class ContinuousProcessTimeTriggerForEventTimeWindow
> > extends Trigger {
> >
> > private final EventTimeTrigger eventTimeTrigger;
> >
> > private final ContinuousProcessingTimeTrigger
> > continuousProcessTimeTrigger;
> >
> > public static ContinuousProcessTimeTriggerForEventTimeWindow
> > of(long windowUpdateTimeInSeconds) {
> > return new
> > ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds);
> > }
> >
> > private ContinuousProcessTimeTriggerForEventTimeWindow(long
> > windowUpdateTimeInSeconds) {
> > eventTimeTrigger = EventTimeTrigger.create();
> > continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of(
> > Time.seconds(windowUpdateTimeInSeconds)
> > );
> > }
> >
> > @Override
> > public TriggerResult onElement(
> > Object element, long timestamp, TimeWindow window,
> > TriggerContext ctx
> > ) throws Exception {
> > continuousProcessTimeTrigger.onElement(element, timestamp, window, 
> > ctx);
> > return eventTimeTrigger.onElement(element, timestamp, window, ctx);
> > }
> >
> > @Override
> > public TriggerResult onEventTime(
> > long time, TimeWindow window, TriggerContext ctx
> > ) throws Exception {
> > return eventTimeTrigger.onEventTime(time, window, ctx);
> > }
> >
> > @Override
> > public TriggerResult onProcessingTime(
> > long time, TimeWindow window, TriggerContext ctx
> > ) throws Exception {
> > return continuousProcessTimeTrigger.onProcessingTime(time, window, 
> > ctx);
> > }
> >
> > @Override
> > public void clear(
> > TimeWindow window, TriggerContext ctx
> > ) throws Exception {
> > eventTimeTrigger.clear(window, ctx);
> > continuousProcessTimeTrigger.clear(window, ctx);
> > }
> >
> > @Override
> > public boolean canMerge() {
> > return true;
> > }
> >
> > @Override
> > public void onMerge(
> > TimeWindow window, OnMergeContext ctx
> > ) throws Exception {
> > eventTimeTrigger.onMerge(window, ctx);
> > continuousProcessTimeTrigger.onMerge(window, ctx);
> > }
> > }
> >
> > Shengkai Fang  于2022年6月28日周二 10:51写道:
> > >
> > > Hi.
> > >
> > > 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。
> > >
> > > Best,
> > > Shengkai
> > >
> > > yidan zhao  于2022年6月28日周二 10:44写道:
> > >
> > > > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
> > > >
> > > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> > > > window是event time window,配合自定义的
> > > > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> > > > trigger,但是统计窗口是et window)。
> > > >
> > > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
> > > >


Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。

2022-06-27 文章 yidan zhao
刚刚贴完代码,就分析除问题来了,如下。
我看了下,变化主要是 ContinuousProcessingTimeTrigger 中的注册 trigger 时的时间逻辑,加了个
long nextFireTimestamp = Math.min(time + interval,
window.maxTimestamp()); 取min的逻辑。
我这个任务的watermark是latestTs-10小时,因为这个任务特别,压力不大,但对数据完整性要求高,综合考虑这么做的。
同时,为了避免10小时后才输出结果,而且需求上,窗口闭合前就需要输出结果,每10s一次,因此采用 continuousProcessTrigger 这种方式。

我分析,对于同一个key下的窗口1,窗口闭合时trigger触发,然后触发下一次,然后会在同一个time继续registerTime,然后立即触发,死循环,这个过程对于每个key的每个窗口都需要持续10小时,因为窗口需要10小时后才会闭合。



不清楚加这么个逻辑的目的是什么呢? 对于ContinuousProcessingTimeTrigger来说,是基于pt进行触发的,而
window.maxTimetamp() 在我的场景下是 et。
找了下https://issues.apache.org/jira/browse/FLINK-20443对到这个jira,没看懂最终讨论了个啥结论,这貌似也不像是啥bug,为啥需要这么改呢。

yidan zhao  于2022年6月28日周二 11:48写道:
>
> 对比实验了下,就是自定义的 trigger
> 问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger:
>
> public class ContinuousProcessTimeTriggerForEventTimeWindow
> extends Trigger {
>
> private final EventTimeTrigger eventTimeTrigger;
>
> private final ContinuousProcessingTimeTrigger
> continuousProcessTimeTrigger;
>
> public static ContinuousProcessTimeTriggerForEventTimeWindow
> of(long windowUpdateTimeInSeconds) {
> return new
> ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds);
> }
>
> private ContinuousProcessTimeTriggerForEventTimeWindow(long
> windowUpdateTimeInSeconds) {
> eventTimeTrigger = EventTimeTrigger.create();
> continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of(
> Time.seconds(windowUpdateTimeInSeconds)
> );
> }
>
> @Override
> public TriggerResult onElement(
> Object element, long timestamp, TimeWindow window,
> TriggerContext ctx
> ) throws Exception {
> continuousProcessTimeTrigger.onElement(element, timestamp, window, 
> ctx);
> return eventTimeTrigger.onElement(element, timestamp, window, ctx);
> }
>
> @Override
> public TriggerResult onEventTime(
> long time, TimeWindow window, TriggerContext ctx
> ) throws Exception {
> return eventTimeTrigger.onEventTime(time, window, ctx);
> }
>
> @Override
> public TriggerResult onProcessingTime(
> long time, TimeWindow window, TriggerContext ctx
> ) throws Exception {
> return continuousProcessTimeTrigger.onProcessingTime(time, window, 
> ctx);
> }
>
> @Override
> public void clear(
> TimeWindow window, TriggerContext ctx
> ) throws Exception {
> eventTimeTrigger.clear(window, ctx);
> continuousProcessTimeTrigger.clear(window, ctx);
> }
>
> @Override
> public boolean canMerge() {
> return true;
> }
>
> @Override
> public void onMerge(
> TimeWindow window, OnMergeContext ctx
> ) throws Exception {
> eventTimeTrigger.onMerge(window, ctx);
> continuousProcessTimeTrigger.onMerge(window, ctx);
> }
> }
>
> Shengkai Fang  于2022年6月28日周二 10:51写道:
> >
> > Hi.
> >
> > 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。
> >
> > Best,
> > Shengkai
> >
> > yidan zhao  于2022年6月28日周二 10:44写道:
> >
> > > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
> > >
> > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> > > window是event time window,配合自定义的
> > > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> > > trigger,但是统计窗口是et window)。
> > >
> > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
> > >


Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。

2022-06-27 文章 yidan zhao
对比实验了下,就是自定义的 trigger
问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger:

public class ContinuousProcessTimeTriggerForEventTimeWindow
extends Trigger {

private final EventTimeTrigger eventTimeTrigger;

private final ContinuousProcessingTimeTrigger
continuousProcessTimeTrigger;

public static ContinuousProcessTimeTriggerForEventTimeWindow
of(long windowUpdateTimeInSeconds) {
return new
ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds);
}

private ContinuousProcessTimeTriggerForEventTimeWindow(long
windowUpdateTimeInSeconds) {
eventTimeTrigger = EventTimeTrigger.create();
continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of(
Time.seconds(windowUpdateTimeInSeconds)
);
}

@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window,
TriggerContext ctx
) throws Exception {
continuousProcessTimeTrigger.onElement(element, timestamp, window, ctx);
return eventTimeTrigger.onElement(element, timestamp, window, ctx);
}

@Override
public TriggerResult onEventTime(
long time, TimeWindow window, TriggerContext ctx
) throws Exception {
return eventTimeTrigger.onEventTime(time, window, ctx);
}

@Override
public TriggerResult onProcessingTime(
long time, TimeWindow window, TriggerContext ctx
) throws Exception {
return continuousProcessTimeTrigger.onProcessingTime(time, window, ctx);
}

@Override
public void clear(
TimeWindow window, TriggerContext ctx
) throws Exception {
eventTimeTrigger.clear(window, ctx);
continuousProcessTimeTrigger.clear(window, ctx);
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(
TimeWindow window, OnMergeContext ctx
) throws Exception {
eventTimeTrigger.onMerge(window, ctx);
continuousProcessTimeTrigger.onMerge(window, ctx);
}
}

Shengkai Fang  于2022年6月28日周二 10:51写道:
>
> Hi.
>
> 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。
>
> Best,
> Shengkai
>
> yidan zhao  于2022年6月28日周二 10:44写道:
>
> > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
> >
> > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> > window是event time window,配合自定义的
> > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> > trigger,但是统计窗口是et window)。
> >
> > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
> >


Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。

2022-06-27 文章 Shengkai Fang
Hi.

这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。

Best,
Shengkai

yidan zhao  于2022年6月28日周二 10:44写道:

> 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
>
> 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> window是event time window,配合自定义的
> continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> trigger,但是统计窗口是et window)。
>
> 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
>


flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。

2022-06-27 文章 yidan zhao
目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。

目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
window是event time window,配合自定义的
continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
trigger,但是统计窗口是et window)。

请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。