Hi all
小弟遇到个问题期望大佬解答解答:
通过         env.setStateBackend(new 
RocksDBStateBackend("file:///data/flink/checkpoints"));设置状态存储位置,job运行起来后找不到状态数据,


flink1.12 yarn pre job 模式,下面是我的配置,job运行起来后在服务器上找不到 
“/data/flink/checkpoints”这个目录,像我设置了状态的存储位置是不是job一运行起来对应的存储位置就应该有状态的数据呢?


public class FlinkTestDemo {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
env.getConfig().setAutoWatermarkInterval(200);
env.setStateBackend(new RocksDBStateBackend("file:///data/flink/checkpoints"));
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
bsSettings);

bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
 Duration.ofMinutes(5));

Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "6000");
configuration.setString("table.exec.mini-batch.size", "5000");

| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制

回复