请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次
配置相关: 配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置 目前采取的排查步骤: 1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) 很快就oom掉了 2.采用了evictor .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) .evictor(TimeEvictor.of(Time.seconds(0),true)) 3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3)))) 2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大 mat分析dump 内存占用较大的是 org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和 org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助 代码: //其中ShoppingRecords UserClickModel都是普通的bean对象 inputStream .filter(data -> "pv".equals(data.getBehavior())) .keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate, Long>>() { @Override public Tuple2<LocalDate, Long> getKey(ShoppingRecords value) throws Exception { Instant instant = Instant.ofEpochMilli(value.getTs()); return Tuple2.of( LocalDateTime.ofInstant(instant, ZoneId.of("Asia/Shanghai")).toLocalDate(), value.getItemId() ); } }) .window(TumblingEventTimeWindows.of(Time.days(1))) // .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) // .evictor(TimeEvictor.of(Time.seconds(0),true)) .trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3)))) .process(new ProcessWindowFunctionBitMap()) // .addSink(new RedisSink<>(conf, new UvRedisSink())); .addSink(new PrintSinkFunction()); public static class ProcessWindowFunctionBitMap extends ProcessWindowFunction<ShoppingRecords, UserClickModel, Tuple2<LocalDate, Long>, TimeWindow> { private transient ValueState<Integer> pvState; private transient ValueState<Roaring64NavigableMap> bitMapState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ValueStateDescriptor<Integer> pvStateDescriptor = new ValueStateDescriptor<>("pv", Integer.class); ValueStateDescriptor<Roaring64NavigableMap> bitMapStateDescriptor = new ValueStateDescriptor("bitMap" , TypeInformation.of(new TypeHint<Roaring64NavigableMap>() { })); // BlockingQueue // 过期状态清除 StateTtlConfig stateTtlConfig = StateTtlConfig .newBuilder(org.apache.flink.api.common.time.Time.days(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); // 开启ttl pvStateDescriptor.enableTimeToLive(stateTtlConfig); bitMapStateDescriptor.enableTimeToLive(stateTtlConfig); pvState = this.getRuntimeContext().getState(pvStateDescriptor); bitMapState = this.getRuntimeContext().getState(bitMapStateDescriptor); } @Override public void process(Tuple2<LocalDate, Long> key, ProcessWindowFunction<ShoppingRecords, UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context, Iterable<ShoppingRecords> elements, Collector<UserClickModel> out) throws Exception { // 当前状态的pv uv Integer pv = pvState.value(); Roaring64NavigableMap bitMap = bitMapState.value(); if (bitMap == null) { bitMap = new Roaring64NavigableMap(); pv = 0; } Iterator<ShoppingRecords> iterator = elements.iterator(); while (iterator.hasNext()) { pv = pv + 1; long uid = iterator.next().getUser_id(); //如果userId可以转成long bitMap.add(uid); } // 更新pv pvState.update(pv); UserClickModel UserClickModel = new UserClickModel(); UserClickModel.setDate(key.f0.toString()); UserClickModel.setProduct(key.f1); UserClickModel.setPv(pv); UserClickModel.setUv(bitMap.getIntCardinality()); out.collect(UserClickModel); } }