对比实验了下,就是自定义的 trigger 问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger:
public class ContinuousProcessTimeTriggerForEventTimeWindow extends Trigger<Object, TimeWindow> { private final EventTimeTrigger eventTimeTrigger; private final ContinuousProcessingTimeTrigger<TimeWindow> continuousProcessTimeTrigger; public static ContinuousProcessTimeTriggerForEventTimeWindow of(long windowUpdateTimeInSeconds) { return new ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds); } private ContinuousProcessTimeTriggerForEventTimeWindow(long windowUpdateTimeInSeconds) { eventTimeTrigger = EventTimeTrigger.create(); continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of( Time.seconds(windowUpdateTimeInSeconds) ); } @Override public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx ) throws Exception { continuousProcessTimeTrigger.onElement(element, timestamp, window, ctx); return eventTimeTrigger.onElement(element, timestamp, window, ctx); } @Override public TriggerResult onEventTime( long time, TimeWindow window, TriggerContext ctx ) throws Exception { return eventTimeTrigger.onEventTime(time, window, ctx); } @Override public TriggerResult onProcessingTime( long time, TimeWindow window, TriggerContext ctx ) throws Exception { return continuousProcessTimeTrigger.onProcessingTime(time, window, ctx); } @Override public void clear( TimeWindow window, TriggerContext ctx ) throws Exception { eventTimeTrigger.clear(window, ctx); continuousProcessTimeTrigger.clear(window, ctx); } @Override public boolean canMerge() { return true; } @Override public void onMerge( TimeWindow window, OnMergeContext ctx ) throws Exception { eventTimeTrigger.onMerge(window, ctx); continuousProcessTimeTrigger.onMerge(window, ctx); } } Shengkai Fang <fskm...@gmail.com> 于2022年6月28日周二 10:51写道: > > Hi. > > 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。 > > Best, > Shengkai > > yidan zhao <hinobl...@gmail.com> 于2022年6月28日周二 10:44写道: > > > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。 > > > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。 > > window是event time window,配合自定义的 > > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous > > trigger,但是统计窗口是et window)。 > > > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。 > >