This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5ed471414cb4637836909ce9b4a32d4347c4ec9
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Tue Sep 24 12:20:18 2024 +0800

    [FLINK-34511][Runtime/State] Remove deprecated checkpointing related APIs
---
 .../file/sink/FileSinkCompactionSwitchITCase.java  |  2 +-
 .../sink/StreamingExecutionFileSinkITCase.java     |  2 +-
 .../runtime/checkpoint/CheckpointStatsTracker.java | 17 -----------
 .../flink/runtime/state/CheckpointListener.java    | 33 ----------------------
 .../environment/StreamExecutionEnvironment.java    | 21 --------------
 .../DefaultCheckpointStatsTrackerTest.java         | 22 +++++++++++----
 .../api/scala/StreamExecutionEnvironment.scala     | 15 ----------
 .../utils/DummyStreamExecutionEnvironment.java     |  6 ----
 .../test/streaming/runtime/IterateITCase.java      |  2 +-
 9 files changed, 20 insertions(+), 100 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java
index dc5c50f43ac..5766ffdce39 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.file.sink;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.configuration.Configuration;
@@ -42,7 +43,6 @@ import 
org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
index 6a21fea3ec2..ac279a97abb 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
@@ -19,13 +19,13 @@
 package org.apache.flink.connector.file.sink;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.execution.CheckpointingMode;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index b620c0ce4c5..71a970ddd19 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -27,8 +27,6 @@ import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Tracker for checkpoint statistics.
  *
@@ -47,21 +45,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public interface CheckpointStatsTracker {
 
-    /**
-     * Callback when a checkpoint is restored.
-     *
-     * @param restored The restored checkpoint stats.
-     */
-    @Deprecated
-    default void reportRestoredCheckpoint(RestoredCheckpointStats restored) {
-        checkNotNull(restored, "Restored checkpoint");
-        reportRestoredCheckpoint(
-                restored.getCheckpointId(),
-                restored.getProperties(),
-                restored.getExternalPath(),
-                restored.getStateSize());
-    }
-
     void reportRestoredCheckpoint(
             long checkpointID,
             CheckpointProperties properties,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
deleted file mode 100644
index 5cfd3b50456..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.annotation.Public;
-
-/**
- * This interface must be implemented by functions/operations that want to 
receive a commit
- * notification once a checkpoint has been completely acknowledged by all 
participants.
- *
- * @deprecated This interface has been moved to {@link
- *     org.apache.flink.api.common.state.CheckpointListener}. This class is 
kept to maintain
- *     backwards compatibility and will be removed in future releases.
- */
-@Public
-@Deprecated
-public interface CheckpointListener extends 
org.apache.flink.api.common.state.CheckpointListener {}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 8f4097ccb60..b740a62e0c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -559,27 +559,6 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         return this;
     }
 
-    /**
-     * Enables checkpointing for the streaming job. The distributed state of 
the streaming dataflow
-     * will be periodically snapshotted. In case of a failure, the streaming 
dataflow will be
-     * restarted from the latest completed checkpoint. This method selects 
{@link
-     * CheckpointingMode#EXACTLY_ONCE} guarantees.
-     *
-     * <p>The job draws checkpoints periodically, in the default interval. The 
state will be stored
-     * in the configured state backend.
-     *
-     * <p>NOTE: Checkpointing iterative streaming dataflows is not properly 
supported at the moment.
-     * For that reason, iterative jobs will not be started if used with 
enabled checkpointing.
-     *
-     * @deprecated Use {@link #enableCheckpointing(long)} instead.
-     */
-    @Deprecated
-    @PublicEvolving
-    public StreamExecutionEnvironment enableCheckpointing() {
-        checkpointCfg.setCheckpointInterval(500);
-        return this;
-    }
-
     /**
      * Returns the checkpointing interval or -1 if checkpointing is disabled.
      *
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
index a4c7ed22282..5c7b80d34ce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
@@ -180,7 +180,7 @@ class DefaultCheckpointStatsTrackerTest {
                         null,
                         42);
         tracker.reportInitializationStarted(Collections.emptySet(), 123L);
-        tracker.reportRestoredCheckpoint(restored);
+        reportRestoredCheckpoint(tracker, restored);
 
         CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
 
@@ -340,7 +340,8 @@ class DefaultCheckpointStatsTrackerTest {
 
         // Restore operation => new snapshot
         tracker.reportInitializationStarted(Collections.emptySet(), 0);
-        tracker.reportRestoredCheckpoint(
+        reportRestoredCheckpoint(
+                tracker,
                 new RestoredCheckpointStats(
                         12,
                         CheckpointProperties.forCheckpoint(
@@ -410,7 +411,8 @@ class DefaultCheckpointStatsTrackerTest {
         final ExecutionAttemptID executionAttemptId2 = 
ExecutionAttemptID.randomId();
         tracker.reportInitializationStarted(
                 new HashSet<>(Arrays.asList(executionAttemptId3, 
executionAttemptId2)), 100);
-        tracker.reportRestoredCheckpoint(
+        reportRestoredCheckpoint(
+                tracker,
                 new RestoredCheckpointStats(
                         42,
                         CheckpointProperties.forCheckpoint(
@@ -454,7 +456,8 @@ class DefaultCheckpointStatsTrackerTest {
         final ExecutionAttemptID executionAttemptId = 
ExecutionAttemptID.randomId();
         tracker.reportInitializationStarted(
                 new HashSet<>(Arrays.asList(executionAttemptId1, 
executionAttemptId)), 100);
-        tracker.reportRestoredCheckpoint(
+        reportRestoredCheckpoint(
+                tracker,
                 new RestoredCheckpointStats(
                         44,
                         CheckpointProperties.forCheckpoint(
@@ -722,7 +725,7 @@ class DefaultCheckpointStatsTrackerTest {
                         null,
                         42);
         stats.reportInitializationStarted(Collections.emptySet(), 
restoreTimestamp);
-        stats.reportRestoredCheckpoint(restored);
+        reportRestoredCheckpoint(stats, restored);
 
         assertThat(numCheckpoints.getValue()).isEqualTo(2);
         assertThat(numInProgressCheckpoints.getValue()).isZero();
@@ -754,4 +757,13 @@ class DefaultCheckpointStatsTrackerTest {
     private SubtaskStateStats createSubtaskStats(int index) {
         return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, 
true);
     }
+
+    private void reportRestoredCheckpoint(
+            CheckpointStatsTracker tracker, RestoredCheckpointStats restored) {
+        tracker.reportRestoredCheckpoint(
+                restored.getCheckpointId(),
+                restored.getProperties(),
+                restored.getExternalPath(),
+                restored.getStateSize());
+    }
 }
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 3508289b8d6..01583b17d6b 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -245,21 +245,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends 
AutoCloseable {
     enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE)
   }
 
-  /**
-   * Method for enabling fault-tolerance. Activates monitoring and backup of 
streaming operator
-   * states. Time interval between state checkpoints is specified in in millis.
-   *
-   * Setting this option assumes that the job is used in production and thus 
if not stated
-   * explicitly otherwise with calling the [[setRestartStrategy]] method in 
case of failure the job
-   * will be resubmitted to the cluster indefinitely.
-   */
-  @deprecated
-  @PublicEvolving
-  def enableCheckpointing(): StreamExecutionEnvironment = {
-    javaEnv.enableCheckpointing()
-    this
-  }
-
   /** @deprecated Use [[getCheckpointingConsistencyMode()]] instead. */
   @deprecated
   def getCheckpointingMode = javaEnv.getCheckpointingMode()
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
index f701aff8d36..a04cf1e3d20 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
@@ -162,12 +162,6 @@ public class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment
                 "This is a dummy StreamExecutionEnvironment, 
enableCheckpointing method is unsupported.");
     }
 
-    @Override
-    public StreamExecutionEnvironment enableCheckpointing() {
-        throw new UnsupportedOperationException(
-                "This is a dummy StreamExecutionEnvironment, 
enableCheckpointing method is unsupported.");
-    }
-
     @Override
     public long getCheckpointInterval() {
         return realExecEnv.getCheckpointInterval();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
index 481199c9eb4..f2307e55157 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -631,7 +631,7 @@ public class IterateITCase extends AbstractTestBaseJUnit4 {
     }
 
     private void createIteration(StreamExecutionEnvironment env, int 
timeoutScale) {
-        env.enableCheckpointing();
+        env.enableCheckpointing(500L);
 
         DataStream<Boolean> source =
                 env.fromData(Collections.nCopies(parallelism * 2, false))

Reply via email to