This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b497726fa445252bcb2162c14a4ed088d6fa4113 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Thu Sep 12 16:04:54 2024 +0800 [FLINK-34511][Checkpoint] Remove force checkpointing flag --- .../api/environment/CheckpointConfig.java | 26 ---------- .../environment/ExecutionCheckpointingOptions.java | 13 ----- .../environment/StreamExecutionEnvironment.java | 41 --------------- .../api/graph/StreamingJobGraphGenerator.java | 6 --- .../api/scala/StreamExecutionEnvironment.scala | 28 ----------- .../utils/DummyStreamExecutionEnvironment.java | 12 ----- .../test/streaming/runtime/IterateITCase.java | 58 ---------------------- 7 files changed, 184 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index d3c0d1530f1..0448fab868f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -382,32 +382,6 @@ public class CheckpointConfig implements java.io.Serializable { CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, maxConcurrentCheckpoints); } - /** - * Checks whether checkpointing is forced, despite currently non-checkpointable iteration - * feedback. - * - * @return True, if checkpointing is forced, false otherwise. - * @deprecated This will be removed once iterations properly participate in checkpointing. - */ - @Deprecated - @PublicEvolving - public boolean isForceCheckpointing() { - return configuration.get(ExecutionCheckpointingOptions.FORCE_CHECKPOINTING); - } - - /** - * Checks whether checkpointing is forced, despite currently non-checkpointable iteration - * feedback. - * - * @param forceCheckpointing The flag to force checkpointing. - * @deprecated This will be removed once iterations properly participate in checkpointing. - */ - @Deprecated - @PublicEvolving - public void setForceCheckpointing(boolean forceCheckpointing) { - configuration.set(ExecutionCheckpointingOptions.FORCE_CHECKPOINTING, forceCheckpointing); - } - /** * Checks whether unaligned checkpoints are forced, despite iteration feedback. * diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java index a7a3129aca0..de647c5568e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java @@ -338,19 +338,6 @@ public class ExecutionCheckpointingOptions { "the important considerations")) .build()); - /** - * Access to this option is officially only supported via {@link - * CheckpointConfig#setForceCheckpointing(boolean)}, but there is no good reason behind this. - * - * @deprecated This will be removed once iterations properly participate in checkpointing. - */ - @Internal @Deprecated @Documentation.ExcludeFromDocumentation - public static final ConfigOption<Boolean> FORCE_CHECKPOINTING = - key("execution.checkpointing.force") - .booleanType() - .defaultValue(false) - .withDescription("Flag to force checkpointing in iterative jobs."); - /** * Access to this option is officially only supported via {@link * CheckpointConfig#enableApproximateLocalRecovery(boolean)}, but there is no good reason behind diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index dca2e8639cf..8f4097ccb60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -559,35 +559,6 @@ public class StreamExecutionEnvironment implements AutoCloseable { return this; } - /** - * Enables checkpointing for the streaming job. The distributed state of the streaming dataflow - * will be periodically snapshotted. In case of a failure, the streaming dataflow will be - * restarted from the latest completed checkpoint. - * - * <p>The job draws checkpoints periodically, in the given interval. The state will be stored in - * the configured state backend. - * - * <p>NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. - * If the "force" parameter is set to true, the system will execute the job nonetheless. - * - * @param interval Time interval between state checkpoints in millis. - * @param mode The checkpointing mode, selecting between "exactly once" and "at least once" - * guaranteed. - * @param force If true checkpointing will be enabled for iterative jobs as well. - * @deprecated Use {@link #enableCheckpointing(long, CheckpointingMode)} instead. Forcing - * checkpoints will be removed in the future. - */ - @Deprecated - @SuppressWarnings("deprecation") - @PublicEvolving - public StreamExecutionEnvironment enableCheckpointing( - long interval, org.apache.flink.streaming.api.CheckpointingMode mode, boolean force) { - checkpointCfg.setCheckpointingMode(mode); - checkpointCfg.setCheckpointInterval(interval); - checkpointCfg.setForceCheckpointing(force); - return this; - } - /** * Enables checkpointing for the streaming job. The distributed state of the streaming dataflow * will be periodically snapshotted. In case of a failure, the streaming dataflow will be @@ -620,18 +591,6 @@ public class StreamExecutionEnvironment implements AutoCloseable { return checkpointCfg.getCheckpointInterval(); } - /** - * Returns whether checkpointing is force-enabled. - * - * @deprecated Forcing checkpoints will be removed in future version. - */ - @Deprecated - @SuppressWarnings("deprecation") - @PublicEvolving - public boolean isForceCheckpointing() { - return checkpointCfg.isForceCheckpointing(); - } - /** Returns whether unaligned checkpoints are enabled. */ @PublicEvolving public boolean isUnalignedCheckpointsEnabled() { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 7fd640c49cc..570335a8789 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -501,12 +501,6 @@ public class StreamingJobGraphGenerator { if (checkpointConfig.isCheckpointingEnabled()) { // temporarily forbid checkpointing for iterative jobs - if (streamGraph.isIterative() && !checkpointConfig.isForceCheckpointing()) { - throw new UnsupportedOperationException( - "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. " - + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. " - + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); - } if (streamGraph.isIterative() && checkpointConfig.isUnalignedCheckpointsEnabled() && !checkpointConfig.isForceUnalignedCheckpoints()) { diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 91f4b188e51..3508289b8d6 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -177,34 +177,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable { */ def getCheckpointConfig = javaEnv.getCheckpointConfig() - /** - * Enables checkpointing for the streaming job. The distributed state of the streaming dataflow - * will be periodically snapshotted. In case of a failure, the streaming dataflow will be - * restarted from the latest completed checkpoint. - * - * The job draws checkpoints periodically, in the given interval. The state will be stored in the - * configured state backend. - * - * NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. If - * the "force" parameter is set to true, the system will execute the job nonetheless. - * - * @param interval - * Time interval between state checkpoints in millis. - * @param mode - * The checkpointing mode, selecting between "exactly once" and "at least once" guarantees. - * @param force - * If true checkpointing will be enabled for iterative jobs as well. - */ - @deprecated - @PublicEvolving - def enableCheckpointing( - interval: Long, - mode: org.apache.flink.streaming.api.CheckpointingMode, - force: Boolean): StreamExecutionEnvironment = { - javaEnv.enableCheckpointing(interval, mode, force) - this - } - /** * Enables checkpointing for the streaming job. The distributed state of the streaming dataflow * will be periodically snapshotted. In case of a failure, the streaming dataflow will be diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java index c26d4ef8e04..f701aff8d36 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java @@ -162,13 +162,6 @@ public class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment "This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported."); } - @Override - public StreamExecutionEnvironment enableCheckpointing( - long interval, org.apache.flink.streaming.api.CheckpointingMode mode, boolean force) { - throw new UnsupportedOperationException( - "This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported."); - } - @Override public StreamExecutionEnvironment enableCheckpointing() { throw new UnsupportedOperationException( @@ -180,11 +173,6 @@ public class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment return realExecEnv.getCheckpointInterval(); } - @Override - public boolean isForceCheckpointing() { - return realExecEnv.isForceCheckpointing(); - } - @Override public org.apache.flink.streaming.api.CheckpointingMode getCheckpointingMode() { return realExecEnv.getCheckpointingMode(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java index 73bdaba8d8f..481199c9eb4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.IterativeStream; @@ -631,63 +630,6 @@ public class IterateITCase extends AbstractTestBaseJUnit4 { } } - @SuppressWarnings("deprecation") - @Test - public void testWithCheckPointing() throws Exception { - int numRetries = 5; - int timeoutScale = 1; - - for (int numRetry = 0; numRetry < numRetries; numRetry++) { - try { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - - try { - createIteration(env, timeoutScale); - env.execute(); - - // this statement should never be reached - fail(); - } catch (UnsupportedOperationException e) { - // expected behaviour - } - - // Test force checkpointing - - try { - createIteration(env, timeoutScale); - env.enableCheckpointing( - CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME, - org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE, - false); - env.execute(); - - // this statement should never be reached - fail(); - } catch (UnsupportedOperationException e) { - // expected behaviour - } - - createIteration(env, timeoutScale); - env.enableCheckpointing( - CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME, - org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE, - true); - env.getStreamGraph().getJobGraph(); - - break; // success - } catch (Throwable t) { - LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); - - if (numRetry >= numRetries - 1) { - throw t; - } else { - timeoutScale *= 2; - } - } - } - } - private void createIteration(StreamExecutionEnvironment env, int timeoutScale) { env.enableCheckpointing();