This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d0c185c7c2 Revert "[FLINK-37158][tests] Introduce ForSt to existing 
ITCases (#26000)" (#26291)
0d0c185c7c2 is described below

commit 0d0c185c7c29bef472c3755a4e395748dec5eaec
Author: Yanfei Lei <[email protected]>
AuthorDate: Thu Mar 13 16:00:40 2025 +0800

    Revert "[FLINK-37158][tests] Introduce ForSt to existing ITCases (#26000)" 
(#26291)
    
    This reverts commit 6099697b501d58d08deb9154f77cb81e46117fd6.
---
 .../forst/sync/ForStSyncKeyedStateBackend.java     |   3 +-
 .../test/checkpointing/AutoRescalingITCase.java    |   4 +-
 .../EventTimeWindowCheckpointingITCase.java        |  34 ------
 .../KeyedStateCheckpointingITCase.java             |  18 ----
 .../RescaleCheckpointManuallyITCase.java           | 118 ++-------------------
 5 files changed, 8 insertions(+), 169 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
index da26ae0d937..63bdb9eda78 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
@@ -608,8 +608,7 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
     @Nonnull
     @Override
     public SavepointResources<K> savepoint() throws Exception {
-        throw new UnsupportedOperationException(
-                "Canonical savepoints are not supported by ForSt State 
Backend.");
+        throw new UnsupportedOperationException("This method is not 
supported.");
     }
 
     @Override
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
index 42eb9992f89..374e53963c9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
@@ -116,9 +116,7 @@ public class AutoRescalingITCase extends TestLogger {
                 new Object[][] {
                     {"rocksdb", false},
                     {"rocksdb", true},
-                    {"hashmap", false},
-                    {"forst", false},
-                    {"forst", true}
+                    {"hashmap", false}
                 });
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index ca07e3a6ab2..d3369ff7598 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -35,8 +35,6 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.state.forst.ForStOptions;
-import org.apache.flink.state.forst.ForStStateBackend;
 import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
 import org.apache.flink.state.rocksdb.RocksDBOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -112,7 +110,6 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
         ROCKSDB_FULL,
         ROCKSDB_INCREMENTAL,
         ROCKSDB_INCREMENTAL_ZK,
-        FORST_INCREMENTAL
     }
 
     @Parameterized.Parameters(name = "statebackend type ={0}")
@@ -194,14 +191,6 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                     setupRocksDB(config, 16, true);
                     break;
                 }
-            case FORST_INCREMENTAL:
-                {
-                    config.set(
-                            ForStOptions.TIMER_SERVICE_FACTORY,
-                            ForStStateBackend.PriorityQueueStateType.ForStDB);
-                    setupForSt(config, 16);
-                    break;
-                }
             default:
                 throw new IllegalStateException("No backend selected.");
         }
@@ -240,29 +229,6 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
         config.set(RocksDBOptions.LOCAL_DIRECTORIES, rocksDb);
     }
 
-    private void setupForSt(Configuration config, int fileSizeThreshold) 
throws IOException {
-        // Configure the managed memory size as 64MB per slot for rocksDB 
state backend.
-        config.set(
-                TaskManagerOptions.MANAGED_MEMORY_SIZE,
-                MemorySize.ofMebiBytes(PARALLELISM / NUM_OF_TASK_MANAGERS * 
64));
-
-        final String forstdb = tempFolder.newFolder().getAbsolutePath();
-        final File backups = tempFolder.newFolder().getAbsoluteFile();
-        // we use the fs backend with small threshold here to test the 
behaviour with file
-        // references, not self contained byte handles
-        config.set(StateBackendOptions.STATE_BACKEND, "forst");
-        config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
-        config.set(
-                CheckpointingOptions.CHECKPOINTS_DIRECTORY,
-                Path.fromLocalFile(backups).toUri().toString());
-        if (fileSizeThreshold != -1) {
-            config.set(
-                    CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
-                    MemorySize.parse(fileSizeThreshold + "b"));
-        }
-        config.set(ForStOptions.LOCAL_DIRECTORIES, forstdb);
-    }
-
     protected Configuration createClusterConfig() throws IOException {
         TemporaryFolder temporaryFolder = new TemporaryFolder();
         temporaryFolder.create();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
index be9ba1eeac8..150096ab43a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -25,13 +25,10 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.state.forst.ForStOptions;
 import org.apache.flink.state.rocksdb.RocksDBOptions;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -146,21 +143,6 @@ public class KeyedStateCheckpointingITCase extends 
TestLogger {
         testProgramWithBackend(env);
     }
 
-    @Test
-    public void testWithForStBackendIncremental() throws Exception {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.configure(
-                new Configuration()
-                        .set(StateBackendOptions.STATE_BACKEND, "forst")
-                        .set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
true)
-                        .set(
-                                ForStOptions.LOCAL_DIRECTORIES,
-                                tmpFolder.newFolder().getAbsolutePath()));
-        CheckpointStorageUtils.configureFileSystemCheckpointStorage(
-                env, tmpFolder.newFolder().toURI().toString());
-        testProgramWithBackend(env);
-    }
-
     // ------------------------------------------------------------------------
 
     protected void testProgramWithBackend(StreamExecutionEnvironment env) 
throws Exception {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
index f811c267404..a4f05362348 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
@@ -19,12 +19,9 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.state.v2.StateFuture;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
@@ -43,10 +40,8 @@ import 
org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.testutils.junit.SharedObjects;
 import org.apache.flink.testutils.junit.SharedReference;
@@ -61,12 +56,8 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
@@ -83,7 +74,6 @@ import static org.junit.Assert.assertNotNull;
  * NotifyingDefiniteKeySource, SubtaskIndexFlatMapper and CollectionSink refer 
to RescalingITCase,
  * because the static fields in these classes can not be shared.
  */
-@RunWith(Parameterized.class)
 public class RescaleCheckpointManuallyITCase extends TestLogger {
 
     private static final int NUM_TASK_MANAGERS = 2;
@@ -94,24 +84,10 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
 
     @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
 
-    @Parameterized.Parameter(0)
-    public String statebackendType;
-
-    @Parameterized.Parameter(1)
-    public boolean enableAsyncState;
-
-    @Parameterized.Parameters(name = "statebackend type ={0}, 
enableAsyncState={1}")
-    public static Collection<Object[]> parameter() {
-        return Arrays.asList(
-                new Object[][] {
-                    {"forst", true}, {"forst", false}, {"rocksdb", true}, 
{"rocksdb", false}
-                });
-    }
-
     @Before
     public void setup() throws Exception {
         Configuration config = new Configuration();
-        config.set(StateBackendOptions.STATE_BACKEND, statebackendType);
+        config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
         config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
 
         cluster =
@@ -287,7 +263,7 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
 
         SharedReference<JobID> jobID = sharedObjects.add(new JobID());
         SharedReference<MiniCluster> miniClusterRef = 
sharedObjects.add(miniCluster);
-        KeyedStream<Integer, Integer> input =
+        DataStream<Integer> input =
                 env.addSource(
                                 new NotifyingDefiniteKeySource(
                                         numberKeys, numberElements, 
failAfterEmission) {
@@ -324,18 +300,10 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
                                         return value;
                                     }
                                 });
-        if (enableAsyncState) {
-            input.enableAsyncState();
-            DataStream<Tuple2<Integer, Integer>> result =
-                    input.flatMap(new 
AsyncSubtaskIndexFlatMapper(numberElementsExpect));
+        DataStream<Tuple2<Integer, Integer>> result =
+                input.flatMap(new 
SubtaskIndexFlatMapper(numberElementsExpect));
 
-            result.sinkTo(new CollectionSink<>());
-        } else {
-            DataStream<Tuple2<Integer, Integer>> result =
-                    input.flatMap(new 
SubtaskIndexFlatMapper(numberElementsExpect));
-
-            result.sinkTo(new CollectionSink<>());
-        }
+        result.sinkTo(new CollectionSink<>());
 
         return 
env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobID.get());
     }
@@ -381,9 +349,8 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
                 } else {
                     boolean newCheckpoint = false;
                     long waited = 0L;
-                    running = false;
                     // maximum wait 5min
-                    while (!newCheckpoint && waited < 300000L) {
+                    while (!newCheckpoint && waited < 30000L) {
                         synchronized (ctx.getCheckpointLock()) {
                             newCheckpoint = waitCheckpointCompleted();
                         }
@@ -456,79 +423,6 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
         }
     }
 
-    private static class AsyncSubtaskIndexFlatMapper
-            extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
-            implements CheckpointedFunction {
-
-        private static final long serialVersionUID = 1L;
-
-        private transient 
org.apache.flink.api.common.state.v2.ValueState<Integer> counter;
-        private transient 
org.apache.flink.api.common.state.v2.ValueState<Integer> sum;
-
-        private final int numberElements;
-
-        public AsyncSubtaskIndexFlatMapper(int numberElements) {
-            this.numberElements = numberElements;
-        }
-
-        @Override
-        public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>> 
out)
-                throws Exception {
-            StateFuture<Integer> counterFuture =
-                    counter.asyncValue()
-                            .thenCompose(
-                                    (Integer c) -> {
-                                        int updated = c == null ? 1 : c + 1;
-                                        return counter.asyncUpdate(updated)
-                                                .thenApply(nothing -> updated);
-                                    });
-            StateFuture<Integer> sumFuture =
-                    sum.asyncValue()
-                            .thenCompose(
-                                    (Integer s) -> {
-                                        int updated = s == null ? value : s + 
value;
-                                        return sum.asyncUpdate(updated)
-                                                .thenApply(nothing -> updated);
-                                    });
-
-            counterFuture.thenCombine(
-                    sumFuture,
-                    (c, s) -> {
-                        if (c == numberElements) {
-                            out.collect(
-                                    Tuple2.of(
-                                            getRuntimeContext()
-                                                    .getTaskInfo()
-                                                    .getIndexOfThisSubtask(),
-                                            s));
-                        }
-                        return null;
-                    });
-        }
-
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-            // all managed, nothing to do.
-        }
-
-        @Override
-        public void initializeState(FunctionInitializationContext context) 
throws Exception {}
-
-        @Override
-        public void open(OpenContext openContext) throws Exception {
-            counter =
-                    ((StreamingRuntimeContext) getRuntimeContext())
-                            .getValueState(
-                                    new 
org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
-                                            "counter", 
BasicTypeInfo.INT_TYPE_INFO));
-            sum =
-                    ((StreamingRuntimeContext) getRuntimeContext())
-                            .getValueState(
-                                    new 
org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
-                                            "sum", 
BasicTypeInfo.INT_TYPE_INFO));
-        }
-    }
-
     private static class CollectionSink<IN> implements Sink<IN> {
 
         private static final ConcurrentHashMap<JobID, CollectionSinkWriter<?>> 
writers =

Reply via email to