??????counts ?????? ???????????????????? List<String&gt; list = 
Lists.newArrayList(counts.get()) ;
            for(String ss : list){
                System.out.println("!!!" + ss);
                log.info("!!!" + ss);
            }????????????????????????????????????????????????????
@Slf4j
public class FlatMapTestState extends RichFlatMapFunction<String, Test222&gt; {


    private transient ListState<String&gt; counts;


    @Override
    public void open(Configuration parameters) throws Exception {
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(30))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        ListStateDescriptor<String&gt; lastUserLogin = new 
ListStateDescriptor<&gt;("lastUserLogin", String.class);
        lastUserLogin.enableTimeToLive(ttlConfig);
        counts = getRuntimeContext().getListState(lastUserLogin);
    }


    @Override
    public void flatMap(String s, Collector<Test222&gt; collector) throws 
Exception {
            Test222 message = JSONUtil.toObject(s, new 
TypeReference<Test222&gt;() {
            });

            System.out.println(DateUtil.toLongDateString(new Date()));
            log.info(DateUtil.toLongDateString(new Date()));
            counts.add(message.getId());
            List<String&gt; list = Lists.newArrayList(counts.get()) ;
            for(String ss : list){
                System.out.println("!!!" + ss);
                log.info("!!!" + ss);
            }
              log.info(DateUtil.toLongDateString(new Date()));
            System.out.println(DateUtil.toLongDateString(new Date()));
    }
}










------------------&nbsp;????????&nbsp;------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<qcx978132...@gmail.com&gt;;
????????:&nbsp;2020??7??16??(??????) ????8:16
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: state??????checkpoint??????



Hi

1 counts ??????????????????????????????????????????????????????????
2 ???????????? counts ??????????????????
3. ?????????????? checkpoint ???????????????????? JM log ??????
4. ?????????????????????????????????????????? state-process-api[1] 
?????????????????????????????? restore ??????????

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian


sun <1392427...@qq.com&gt; ??2020??7??16?????? ????6:16??????

&gt;
&gt; 
????????env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
&gt; //????????????????
&gt; env.setRestartStrategy(RestartStrategies.noRestart());
&gt; env.getCheckpointConfig().setCheckpointTimeout(500);
&gt;
&gt; 
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
&gt;
&gt; 
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; env.setStateBackend(new
&gt; RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
&gt;&nbsp;&nbsp; ??????????????private transient ListState<String&amp;gt; 
counts;
&gt;
&gt;
&gt; @Override
&gt; public void open(Configuration parameters) throws Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp; StateTtlConfig ttlConfig = StateTtlConfig
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
.newBuilder(Time.minutes(30))
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
&gt;
&gt; .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
.build();
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; ListStateDescriptor<String&amp;gt; lastUserLogin = 
new
&gt; ListStateDescriptor<&amp;gt;("lastUserLogin", String.class);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; lastUserLogin.enableTimeToLive(ttlConfig);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; counts = 
getRuntimeContext().getListState(lastUserLogin);
&gt; }
&gt; ????????task managers ????????&nbsp; counts&nbsp; ??????????????????

回复