请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次

配置相关:
配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置

目前采取的排查步骤:
1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) 很快就oom掉了
2.采用了evictor
               .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
                .evictor(TimeEvictor.of(Time.seconds(0),true))
3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))

2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大

mat分析dump 内存占用较大的是
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和
org.apache.flink.streaming.api.operators.TimerHeapInternalTimer

使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助

代码:
//其中ShoppingRecords UserClickModel都是普通的bean对象
inputStream
                .filter(data -> "pv".equals(data.getBehavior()))
                .keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate,
Long>>() {
                    @Override
                    public Tuple2<LocalDate, Long> getKey(ShoppingRecords
value) throws Exception {
                        Instant instant =
Instant.ofEpochMilli(value.getTs());
                        return Tuple2.of(
                                LocalDateTime.ofInstant(instant,
ZoneId.of("Asia/Shanghai")).toLocalDate(),
                                value.getItemId()
                        );
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.days(1)))
//                .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
//                .evictor(TimeEvictor.of(Time.seconds(0),true))

.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
                .process(new ProcessWindowFunctionBitMap())
//                .addSink(new RedisSink<>(conf, new UvRedisSink()));
                .addSink(new PrintSinkFunction());

 public static class ProcessWindowFunctionBitMap
            extends ProcessWindowFunction<ShoppingRecords, UserClickModel,
Tuple2<LocalDate, Long>, TimeWindow> {

        private transient ValueState<Integer> pvState;
        private transient ValueState<Roaring64NavigableMap> bitMapState;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            ValueStateDescriptor<Integer> pvStateDescriptor = new
ValueStateDescriptor<>("pv", Integer.class);
            ValueStateDescriptor<Roaring64NavigableMap>
bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
                    , TypeInformation.of(new
TypeHint<Roaring64NavigableMap>() {
            }));
//            BlockingQueue
            // 过期状态清除
            StateTtlConfig stateTtlConfig = StateTtlConfig

.newBuilder(org.apache.flink.api.common.time.Time.days(1))

.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .build();
            // 开启ttl
            pvStateDescriptor.enableTimeToLive(stateTtlConfig);
            bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);

            pvState = this.getRuntimeContext().getState(pvStateDescriptor);
            bitMapState =
this.getRuntimeContext().getState(bitMapStateDescriptor);

        }

        @Override
        public void process(Tuple2<LocalDate, Long> key,
                            ProcessWindowFunction<ShoppingRecords,
UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context,
                            Iterable<ShoppingRecords> elements,
Collector<UserClickModel> out) throws Exception {
            // 当前状态的pv uv
            Integer pv = pvState.value();
            Roaring64NavigableMap bitMap = bitMapState.value();
            if (bitMap == null) {
                bitMap = new Roaring64NavigableMap();
                pv = 0;
            }

            Iterator<ShoppingRecords> iterator = elements.iterator();
            while (iterator.hasNext()) {
                pv = pv + 1;
                long uid = iterator.next().getUser_id();
                //如果userId可以转成long
                bitMap.add(uid);
            }

            // 更新pv
            pvState.update(pv);

            UserClickModel UserClickModel = new UserClickModel();

            UserClickModel.setDate(key.f0.toString());
            UserClickModel.setProduct(key.f1);
            UserClickModel.setPv(pv);
            UserClickModel.setUv(bitMap.getIntCardinality());

            out.collect(UserClickModel);
        }
    }

回复