This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b497726fa445252bcb2162c14a4ed088d6fa4113
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Thu Sep 12 16:04:54 2024 +0800

    [FLINK-34511][Checkpoint] Remove force checkpointing flag
---
 .../api/environment/CheckpointConfig.java          | 26 ----------
 .../environment/ExecutionCheckpointingOptions.java | 13 -----
 .../environment/StreamExecutionEnvironment.java    | 41 ---------------
 .../api/graph/StreamingJobGraphGenerator.java      |  6 ---
 .../api/scala/StreamExecutionEnvironment.scala     | 28 -----------
 .../utils/DummyStreamExecutionEnvironment.java     | 12 -----
 .../test/streaming/runtime/IterateITCase.java      | 58 ----------------------
 7 files changed, 184 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index d3c0d1530f1..0448fab868f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -382,32 +382,6 @@ public class CheckpointConfig implements 
java.io.Serializable {
                 CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 
maxConcurrentCheckpoints);
     }
 
-    /**
-     * Checks whether checkpointing is forced, despite currently 
non-checkpointable iteration
-     * feedback.
-     *
-     * @return True, if checkpointing is forced, false otherwise.
-     * @deprecated This will be removed once iterations properly participate 
in checkpointing.
-     */
-    @Deprecated
-    @PublicEvolving
-    public boolean isForceCheckpointing() {
-        return 
configuration.get(ExecutionCheckpointingOptions.FORCE_CHECKPOINTING);
-    }
-
-    /**
-     * Checks whether checkpointing is forced, despite currently 
non-checkpointable iteration
-     * feedback.
-     *
-     * @param forceCheckpointing The flag to force checkpointing.
-     * @deprecated This will be removed once iterations properly participate 
in checkpointing.
-     */
-    @Deprecated
-    @PublicEvolving
-    public void setForceCheckpointing(boolean forceCheckpointing) {
-        configuration.set(ExecutionCheckpointingOptions.FORCE_CHECKPOINTING, 
forceCheckpointing);
-    }
-
     /**
      * Checks whether unaligned checkpoints are forced, despite iteration 
feedback.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
index a7a3129aca0..de647c5568e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
@@ -338,19 +338,6 @@ public class ExecutionCheckpointingOptions {
                                                     "the important 
considerations"))
                                     .build());
 
-    /**
-     * Access to this option is officially only supported via {@link
-     * CheckpointConfig#setForceCheckpointing(boolean)}, but there is no good 
reason behind this.
-     *
-     * @deprecated This will be removed once iterations properly participate 
in checkpointing.
-     */
-    @Internal @Deprecated @Documentation.ExcludeFromDocumentation
-    public static final ConfigOption<Boolean> FORCE_CHECKPOINTING =
-            key("execution.checkpointing.force")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("Flag to force checkpointing in iterative 
jobs.");
-
     /**
      * Access to this option is officially only supported via {@link
      * CheckpointConfig#enableApproximateLocalRecovery(boolean)}, but there is 
no good reason behind
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index dca2e8639cf..8f4097ccb60 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -559,35 +559,6 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         return this;
     }
 
-    /**
-     * Enables checkpointing for the streaming job. The distributed state of 
the streaming dataflow
-     * will be periodically snapshotted. In case of a failure, the streaming 
dataflow will be
-     * restarted from the latest completed checkpoint.
-     *
-     * <p>The job draws checkpoints periodically, in the given interval. The 
state will be stored in
-     * the configured state backend.
-     *
-     * <p>NOTE: Checkpointing iterative streaming dataflows is not properly 
supported at the moment.
-     * If the "force" parameter is set to true, the system will execute the 
job nonetheless.
-     *
-     * @param interval Time interval between state checkpoints in millis.
-     * @param mode The checkpointing mode, selecting between "exactly once" 
and "at least once"
-     *     guaranteed.
-     * @param force If true checkpointing will be enabled for iterative jobs 
as well.
-     * @deprecated Use {@link #enableCheckpointing(long, CheckpointingMode)} 
instead. Forcing
-     *     checkpoints will be removed in the future.
-     */
-    @Deprecated
-    @SuppressWarnings("deprecation")
-    @PublicEvolving
-    public StreamExecutionEnvironment enableCheckpointing(
-            long interval, org.apache.flink.streaming.api.CheckpointingMode 
mode, boolean force) {
-        checkpointCfg.setCheckpointingMode(mode);
-        checkpointCfg.setCheckpointInterval(interval);
-        checkpointCfg.setForceCheckpointing(force);
-        return this;
-    }
-
     /**
      * Enables checkpointing for the streaming job. The distributed state of 
the streaming dataflow
      * will be periodically snapshotted. In case of a failure, the streaming 
dataflow will be
@@ -620,18 +591,6 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         return checkpointCfg.getCheckpointInterval();
     }
 
-    /**
-     * Returns whether checkpointing is force-enabled.
-     *
-     * @deprecated Forcing checkpoints will be removed in future version.
-     */
-    @Deprecated
-    @SuppressWarnings("deprecation")
-    @PublicEvolving
-    public boolean isForceCheckpointing() {
-        return checkpointCfg.isForceCheckpointing();
-    }
-
     /** Returns whether unaligned checkpoints are enabled. */
     @PublicEvolving
     public boolean isUnalignedCheckpointsEnabled() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 7fd640c49cc..570335a8789 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -501,12 +501,6 @@ public class StreamingJobGraphGenerator {
 
         if (checkpointConfig.isCheckpointingEnabled()) {
             // temporarily forbid checkpointing for iterative jobs
-            if (streamGraph.isIterative() && 
!checkpointConfig.isForceCheckpointing()) {
-                throw new UnsupportedOperationException(
-                        "Checkpointing is currently not supported by default 
for iterative jobs, as we cannot guarantee exactly once semantics. "
-                                + "State checkpoints happen normally, but 
records in-transit during the snapshot will be lost upon failure. "
-                                + "\nThe user can force enable state 
checkpoints with the reduced guarantees by calling: 
env.enableCheckpointing(interval,true)");
-            }
             if (streamGraph.isIterative()
                     && checkpointConfig.isUnalignedCheckpointsEnabled()
                     && !checkpointConfig.isForceUnalignedCheckpoints()) {
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 91f4b188e51..3508289b8d6 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -177,34 +177,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends 
AutoCloseable {
    */
   def getCheckpointConfig = javaEnv.getCheckpointConfig()
 
-  /**
-   * Enables checkpointing for the streaming job. The distributed state of the 
streaming dataflow
-   * will be periodically snapshotted. In case of a failure, the streaming 
dataflow will be
-   * restarted from the latest completed checkpoint.
-   *
-   * The job draws checkpoints periodically, in the given interval. The state 
will be stored in the
-   * configured state backend.
-   *
-   * NOTE: Checkpointing iterative streaming dataflows in not properly 
supported at the moment. If
-   * the "force" parameter is set to true, the system will execute the job 
nonetheless.
-   *
-   * @param interval
-   *   Time interval between state checkpoints in millis.
-   * @param mode
-   *   The checkpointing mode, selecting between "exactly once" and "at least 
once" guarantees.
-   * @param force
-   *   If true checkpointing will be enabled for iterative jobs as well.
-   */
-  @deprecated
-  @PublicEvolving
-  def enableCheckpointing(
-      interval: Long,
-      mode: org.apache.flink.streaming.api.CheckpointingMode,
-      force: Boolean): StreamExecutionEnvironment = {
-    javaEnv.enableCheckpointing(interval, mode, force)
-    this
-  }
-
   /**
    * Enables checkpointing for the streaming job. The distributed state of the 
streaming dataflow
    * will be periodically snapshotted. In case of a failure, the streaming 
dataflow will be
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
index c26d4ef8e04..f701aff8d36 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
@@ -162,13 +162,6 @@ public class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment
                 "This is a dummy StreamExecutionEnvironment, 
enableCheckpointing method is unsupported.");
     }
 
-    @Override
-    public StreamExecutionEnvironment enableCheckpointing(
-            long interval, org.apache.flink.streaming.api.CheckpointingMode 
mode, boolean force) {
-        throw new UnsupportedOperationException(
-                "This is a dummy StreamExecutionEnvironment, 
enableCheckpointing method is unsupported.");
-    }
-
     @Override
     public StreamExecutionEnvironment enableCheckpointing() {
         throw new UnsupportedOperationException(
@@ -180,11 +173,6 @@ public class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment
         return realExecEnv.getCheckpointInterval();
     }
 
-    @Override
-    public boolean isForceCheckpointing() {
-        return realExecEnv.isForceCheckpointing();
-    }
-
     @Override
     public org.apache.flink.streaming.api.CheckpointingMode 
getCheckpointingMode() {
         return realExecEnv.getCheckpointingMode();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
index 73bdaba8d8f..481199c9eb4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -631,63 +630,6 @@ public class IterateITCase extends AbstractTestBaseJUnit4 {
         }
     }
 
-    @SuppressWarnings("deprecation")
-    @Test
-    public void testWithCheckPointing() throws Exception {
-        int numRetries = 5;
-        int timeoutScale = 1;
-
-        for (int numRetry = 0; numRetry < numRetries; numRetry++) {
-            try {
-                StreamExecutionEnvironment env =
-                        StreamExecutionEnvironment.getExecutionEnvironment();
-
-                try {
-                    createIteration(env, timeoutScale);
-                    env.execute();
-
-                    // this statement should never be reached
-                    fail();
-                } catch (UnsupportedOperationException e) {
-                    // expected behaviour
-                }
-
-                // Test force checkpointing
-
-                try {
-                    createIteration(env, timeoutScale);
-                    env.enableCheckpointing(
-                            
CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME,
-                            
org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE,
-                            false);
-                    env.execute();
-
-                    // this statement should never be reached
-                    fail();
-                } catch (UnsupportedOperationException e) {
-                    // expected behaviour
-                }
-
-                createIteration(env, timeoutScale);
-                env.enableCheckpointing(
-                        
CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME,
-                        
org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE,
-                        true);
-                env.getStreamGraph().getJobGraph();
-
-                break; // success
-            } catch (Throwable t) {
-                LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " 
failed", t);
-
-                if (numRetry >= numRetries - 1) {
-                    throw t;
-                } else {
-                    timeoutScale *= 2;
-                }
-            }
-        }
-    }
-
     private void createIteration(StreamExecutionEnvironment env, int 
timeoutScale) {
         env.enableCheckpointing();
 

Reply via email to