对比实验了下,就是自定义的 trigger
问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger:

public class ContinuousProcessTimeTriggerForEventTimeWindow
        extends Trigger<Object, TimeWindow> {

    private final EventTimeTrigger eventTimeTrigger;

    private final ContinuousProcessingTimeTrigger<TimeWindow>
continuousProcessTimeTrigger;

    public static ContinuousProcessTimeTriggerForEventTimeWindow
of(long windowUpdateTimeInSeconds) {
        return new
ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds);
    }

    private ContinuousProcessTimeTriggerForEventTimeWindow(long
windowUpdateTimeInSeconds) {
        eventTimeTrigger = EventTimeTrigger.create();
        continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of(
                Time.seconds(windowUpdateTimeInSeconds)
        );
    }

    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window,
TriggerContext ctx
    ) throws Exception {
        continuousProcessTimeTrigger.onElement(element, timestamp, window, ctx);
        return eventTimeTrigger.onElement(element, timestamp, window, ctx);
    }

    @Override
    public TriggerResult onEventTime(
            long time, TimeWindow window, TriggerContext ctx
    ) throws Exception {
        return eventTimeTrigger.onEventTime(time, window, ctx);
    }

    @Override
    public TriggerResult onProcessingTime(
            long time, TimeWindow window, TriggerContext ctx
    ) throws Exception {
        return continuousProcessTimeTrigger.onProcessingTime(time, window, ctx);
    }

    @Override
    public void clear(
            TimeWindow window, TriggerContext ctx
    ) throws Exception {
        eventTimeTrigger.clear(window, ctx);
        continuousProcessTimeTrigger.clear(window, ctx);
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(
            TimeWindow window, OnMergeContext ctx
    ) throws Exception {
        eventTimeTrigger.onMerge(window, ctx);
        continuousProcessTimeTrigger.onMerge(window, ctx);
    }
}

Shengkai Fang <fskm...@gmail.com> 于2022年6月28日周二 10:51写道:
>
> Hi.
>
> 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。
>
> Best,
> Shengkai
>
> yidan zhao <hinobl...@gmail.com> 于2022年6月28日周二 10:44写道:
>
> > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
> >
> > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> > window是event time window,配合自定义的
> > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> > trigger,但是统计窗口是et window)。
> >
> > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
> >

回复