Hi,请教各位一个困扰了几天的问题, 我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败 StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs 且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息 使用的flink版本是1.11.2
附件为web上checkpoint失败的截图,使用的是去掉业务逻辑后的简单测试代码 测试代码部分: public class TestStateProcess extends KeyedProcessFunction<String, NLMessage, NLMessage> { private transient ValueState<Integer> userCount; @Override public void open(Configuration parameters) throws Exception { try { ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("userId", TypeInformation.of(new TypeHint<Integer>() {})); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(org.apache.flink.api.common.time.Time.minutes(10)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); descriptor.enableTimeToLive(ttlConfig); userCount = getRuntimeContext().getState(descriptor); } catch (Exception e) { e.printStackTrace(); } } @Override public void processElement(NLMessage value, Context ctx, Collector<NLMessage> out) throws Exception { try { if (null == userCount.value()) { userCount.update(1); } else { userCount.update(userCount.value() + 1); } if (userCount.value() > 10) { System.out.println(new Date() + " userId: " + ctx.getCurrentKey() + " count: " + userCount.value()); } out.collect(value); } catch (IOException e) { e.printStackTrace(); } } } checkpoint配置: env.setStateBackend(new MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE, false)); env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60 * 1000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 万分感谢!