大家好,
我在学习Flink checkpoint时,做了一个示例没有得到期望结果,麻烦帮忙看看是哪里设置有问题。谢谢 1. 启动checkpoint 2. 设置statebackend为FsStateBackend 3. 从socketTextStream读取数据,统计单词个数 (“hello”, 5), (“world”, 1) 4. 通过触发异常,来模拟终止程序 5. 重新启动程序,那么启动之后的统计数据的初始值应该是上一次checkpoint 成功存储的值 (“hello”, 5), (“world”, 1) , 那么再次输入hello, 应该输出(“hello”, 6) 而在实际输出结果为(“hello”, 1) 环境和版本信息 1. MacOS - Oracle JDK 1.8 2. 版本信息 <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.11.6</flink.version> <flink.lang>scala</flink.lang> <target.java.version>1.8</target.java.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <log4j.version>2.12.1</log4j.version> </properties> 代码 object RestartStrategyFsStateBackend { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(1000L) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2)) val backendPath = "file:///Users/arthur/Documents/Workspace/java/quickstart" + "/flink-spring/src/main/resources/backend.out/restartstrategyv3" env.setStateBackend(new FsStateBackend(backendPath)) // socket数据源 env.socketTextStream("localhost", 7077) .map(value => { if (value == "restart") { throw new RuntimeException("restart is triggered, oooops~~~~~") } (value, 1) } ) .keyBy(_._1) .sum(1) .print("RestartStrategy") env.execute("RestartStrategy") } } BR. Arthur