请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次
配置相关:
配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置
目前采取的排查步骤:
1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) 很快就oom掉了
2.采用了evictor
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
.evictor(TimeEvictor.of(Time.seconds(0),true))
3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大
mat分析dump 内存占用较大的是
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和
org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助
代码:
//其中ShoppingRecords UserClickModel都是普通的bean对象
inputStream
.filter(data -> "pv".equals(data.getBehavior()))
.keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate,
Long>>() {
@Override
public Tuple2<LocalDate, Long> getKey(ShoppingRecords
value) throws Exception {
Instant instant =
Instant.ofEpochMilli(value.getTs());
return Tuple2.of(
LocalDateTime.ofInstant(instant,
ZoneId.of("Asia/Shanghai")).toLocalDate(),
value.getItemId()
);
}
})
.window(TumblingEventTimeWindows.of(Time.days(1)))
// .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
// .evictor(TimeEvictor.of(Time.seconds(0),true))
.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
.process(new ProcessWindowFunctionBitMap())
// .addSink(new RedisSink<>(conf, new UvRedisSink()));
.addSink(new PrintSinkFunction());
public static class ProcessWindowFunctionBitMap
extends ProcessWindowFunction<ShoppingRecords, UserClickModel,
Tuple2<LocalDate, Long>, TimeWindow> {
private transient ValueState<Integer> pvState;
private transient ValueState<Roaring64NavigableMap> bitMapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Integer> pvStateDescriptor = new
ValueStateDescriptor<>("pv", Integer.class);
ValueStateDescriptor<Roaring64NavigableMap>
bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
, TypeInformation.of(new
TypeHint<Roaring64NavigableMap>() {
}));
// BlockingQueue
// 过期状态清除
StateTtlConfig stateTtlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
// 开启ttl
pvStateDescriptor.enableTimeToLive(stateTtlConfig);
bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);
pvState = this.getRuntimeContext().getState(pvStateDescriptor);
bitMapState =
this.getRuntimeContext().getState(bitMapStateDescriptor);
}
@Override
public void process(Tuple2<LocalDate, Long> key,
ProcessWindowFunction<ShoppingRecords,
UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context,
Iterable<ShoppingRecords> elements,
Collector<UserClickModel> out) throws Exception {
// 当前状态的pv uv
Integer pv = pvState.value();
Roaring64NavigableMap bitMap = bitMapState.value();
if (bitMap == null) {
bitMap = new Roaring64NavigableMap();
pv = 0;
}
Iterator<ShoppingRecords> iterator = elements.iterator();
while (iterator.hasNext()) {
pv = pv + 1;
long uid = iterator.next().getUser_id();
//如果userId可以转成long
bitMap.add(uid);
}
// 更新pv
pvState.update(pv);
UserClickModel UserClickModel = new UserClickModel();
UserClickModel.setDate(key.f0.toString());
UserClickModel.setProduct(key.f1);
UserClickModel.setPv(pv);
UserClickModel.setUv(bitMap.getIntCardinality());
out.collect(UserClickModel);
}
}