??????counts ?????? ???????????????????? List<String> list = Lists.newArrayList(counts.get()) ; for(String ss : list){ System.out.println("!!!" + ss); log.info("!!!" + ss); }???????????????????????????????????????????????????? @Slf4j public class FlatMapTestState extends RichFlatMapFunction<String, Test222> {
private transient ListState<String> counts; @Override public void open(Configuration parameters) throws Exception { StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.minutes(30)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ListStateDescriptor<String> lastUserLogin = new ListStateDescriptor<>("lastUserLogin", String.class); lastUserLogin.enableTimeToLive(ttlConfig); counts = getRuntimeContext().getListState(lastUserLogin); } @Override public void flatMap(String s, Collector<Test222> collector) throws Exception { Test222 message = JSONUtil.toObject(s, new TypeReference<Test222>() { }); System.out.println(DateUtil.toLongDateString(new Date())); log.info(DateUtil.toLongDateString(new Date())); counts.add(message.getId()); List<String> list = Lists.newArrayList(counts.get()) ; for(String ss : list){ System.out.println("!!!" + ss); log.info("!!!" + ss); } log.info(DateUtil.toLongDateString(new Date())); System.out.println(DateUtil.toLongDateString(new Date())); } } ------------------ ???????? ------------------ ??????: "user-zh" <qcx978132...@gmail.com>; ????????: 2020??7??16??(??????) ????8:16 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: state??????checkpoint?????? Hi 1 counts ?????????????????????????????????????????????????????????? 2 ???????????? counts ?????????????????? 3. ?????????????? checkpoint ???????????????????? JM log ?????? 4. ?????????????????????????????????????????? state-process-api[1] ?????????????????????????????? restore ?????????? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html Best, Congxian sun <1392427...@qq.com> ??2020??7??16?????? ????6:16?????? > > ????????env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > //???????????????? > env.setRestartStrategy(RestartStrategies.noRestart()); > env.getCheckpointConfig().setCheckpointTimeout(500); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > env.setStateBackend(new > RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); > ??????????????private transient ListState<String&gt; counts; > > > @Override > public void open(Configuration parameters) throws Exception { > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.minutes(30)) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > ListStateDescriptor<String&gt; lastUserLogin = new > ListStateDescriptor<&gt;("lastUserLogin", String.class); > lastUserLogin.enableTimeToLive(ttlConfig); > counts = getRuntimeContext().getListState(lastUserLogin); > } > ????????task managers ???????? counts ??????????????????