This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0d0c185c7c2 Revert "[FLINK-37158][tests] Introduce ForSt to existing
ITCases (#26000)" (#26291)
0d0c185c7c2 is described below
commit 0d0c185c7c29bef472c3755a4e395748dec5eaec
Author: Yanfei Lei <[email protected]>
AuthorDate: Thu Mar 13 16:00:40 2025 +0800
Revert "[FLINK-37158][tests] Introduce ForSt to existing ITCases (#26000)"
(#26291)
This reverts commit 6099697b501d58d08deb9154f77cb81e46117fd6.
---
.../forst/sync/ForStSyncKeyedStateBackend.java | 3 +-
.../test/checkpointing/AutoRescalingITCase.java | 4 +-
.../EventTimeWindowCheckpointingITCase.java | 34 ------
.../KeyedStateCheckpointingITCase.java | 18 ----
.../RescaleCheckpointManuallyITCase.java | 118 ++-------------------
5 files changed, 8 insertions(+), 169 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
index da26ae0d937..63bdb9eda78 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
@@ -608,8 +608,7 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
@Nonnull
@Override
public SavepointResources<K> savepoint() throws Exception {
- throw new UnsupportedOperationException(
- "Canonical savepoints are not supported by ForSt State
Backend.");
+ throw new UnsupportedOperationException("This method is not
supported.");
}
@Override
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
index 42eb9992f89..374e53963c9 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
@@ -116,9 +116,7 @@ public class AutoRescalingITCase extends TestLogger {
new Object[][] {
{"rocksdb", false},
{"rocksdb", true},
- {"hashmap", false},
- {"forst", false},
- {"forst", true}
+ {"hashmap", false}
});
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index ca07e3a6ab2..d3369ff7598 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -35,8 +35,6 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.state.forst.ForStOptions;
-import org.apache.flink.state.forst.ForStStateBackend;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.RocksDBOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -112,7 +110,6 @@ public class EventTimeWindowCheckpointingITCase extends
TestLogger {
ROCKSDB_FULL,
ROCKSDB_INCREMENTAL,
ROCKSDB_INCREMENTAL_ZK,
- FORST_INCREMENTAL
}
@Parameterized.Parameters(name = "statebackend type ={0}")
@@ -194,14 +191,6 @@ public class EventTimeWindowCheckpointingITCase extends
TestLogger {
setupRocksDB(config, 16, true);
break;
}
- case FORST_INCREMENTAL:
- {
- config.set(
- ForStOptions.TIMER_SERVICE_FACTORY,
- ForStStateBackend.PriorityQueueStateType.ForStDB);
- setupForSt(config, 16);
- break;
- }
default:
throw new IllegalStateException("No backend selected.");
}
@@ -240,29 +229,6 @@ public class EventTimeWindowCheckpointingITCase extends
TestLogger {
config.set(RocksDBOptions.LOCAL_DIRECTORIES, rocksDb);
}
- private void setupForSt(Configuration config, int fileSizeThreshold)
throws IOException {
- // Configure the managed memory size as 64MB per slot for rocksDB
state backend.
- config.set(
- TaskManagerOptions.MANAGED_MEMORY_SIZE,
- MemorySize.ofMebiBytes(PARALLELISM / NUM_OF_TASK_MANAGERS *
64));
-
- final String forstdb = tempFolder.newFolder().getAbsolutePath();
- final File backups = tempFolder.newFolder().getAbsoluteFile();
- // we use the fs backend with small threshold here to test the
behaviour with file
- // references, not self contained byte handles
- config.set(StateBackendOptions.STATE_BACKEND, "forst");
- config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
- config.set(
- CheckpointingOptions.CHECKPOINTS_DIRECTORY,
- Path.fromLocalFile(backups).toUri().toString());
- if (fileSizeThreshold != -1) {
- config.set(
- CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
- MemorySize.parse(fileSizeThreshold + "b"));
- }
- config.set(ForStOptions.LOCAL_DIRECTORIES, forstdb);
- }
-
protected Configuration createClusterConfig() throws IOException {
TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
index be9ba1eeac8..150096ab43a 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -25,13 +25,10 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.state.forst.ForStOptions;
import org.apache.flink.state.rocksdb.RocksDBOptions;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -146,21 +143,6 @@ public class KeyedStateCheckpointingITCase extends
TestLogger {
testProgramWithBackend(env);
}
- @Test
- public void testWithForStBackendIncremental() throws Exception {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.configure(
- new Configuration()
- .set(StateBackendOptions.STATE_BACKEND, "forst")
- .set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS,
true)
- .set(
- ForStOptions.LOCAL_DIRECTORIES,
- tmpFolder.newFolder().getAbsolutePath()));
- CheckpointStorageUtils.configureFileSystemCheckpointStorage(
- env, tmpFolder.newFolder().toURI().toString());
- testProgramWithBackend(env);
- }
-
// ------------------------------------------------------------------------
protected void testProgramWithBackend(StreamExecutionEnvironment env)
throws Exception {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
index f811c267404..a4f05362348 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
@@ -19,12 +19,9 @@
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.state.v2.StateFuture;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
@@ -43,10 +40,8 @@ import
org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
@@ -61,12 +56,8 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
@@ -83,7 +74,6 @@ import static org.junit.Assert.assertNotNull;
* NotifyingDefiniteKeySource, SubtaskIndexFlatMapper and CollectionSink refer
to RescalingITCase,
* because the static fields in these classes can not be shared.
*/
-@RunWith(Parameterized.class)
public class RescaleCheckpointManuallyITCase extends TestLogger {
private static final int NUM_TASK_MANAGERS = 2;
@@ -94,24 +84,10 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
@ClassRule public static TemporaryFolder temporaryFolder = new
TemporaryFolder();
- @Parameterized.Parameter(0)
- public String statebackendType;
-
- @Parameterized.Parameter(1)
- public boolean enableAsyncState;
-
- @Parameterized.Parameters(name = "statebackend type ={0},
enableAsyncState={1}")
- public static Collection<Object[]> parameter() {
- return Arrays.asList(
- new Object[][] {
- {"forst", true}, {"forst", false}, {"rocksdb", true},
{"rocksdb", false}
- });
- }
-
@Before
public void setup() throws Exception {
Configuration config = new Configuration();
- config.set(StateBackendOptions.STATE_BACKEND, statebackendType);
+ config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
cluster =
@@ -287,7 +263,7 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
SharedReference<JobID> jobID = sharedObjects.add(new JobID());
SharedReference<MiniCluster> miniClusterRef =
sharedObjects.add(miniCluster);
- KeyedStream<Integer, Integer> input =
+ DataStream<Integer> input =
env.addSource(
new NotifyingDefiniteKeySource(
numberKeys, numberElements,
failAfterEmission) {
@@ -324,18 +300,10 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
return value;
}
});
- if (enableAsyncState) {
- input.enableAsyncState();
- DataStream<Tuple2<Integer, Integer>> result =
- input.flatMap(new
AsyncSubtaskIndexFlatMapper(numberElementsExpect));
+ DataStream<Tuple2<Integer, Integer>> result =
+ input.flatMap(new
SubtaskIndexFlatMapper(numberElementsExpect));
- result.sinkTo(new CollectionSink<>());
- } else {
- DataStream<Tuple2<Integer, Integer>> result =
- input.flatMap(new
SubtaskIndexFlatMapper(numberElementsExpect));
-
- result.sinkTo(new CollectionSink<>());
- }
+ result.sinkTo(new CollectionSink<>());
return
env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobID.get());
}
@@ -381,9 +349,8 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
} else {
boolean newCheckpoint = false;
long waited = 0L;
- running = false;
// maximum wait 5min
- while (!newCheckpoint && waited < 300000L) {
+ while (!newCheckpoint && waited < 30000L) {
synchronized (ctx.getCheckpointLock()) {
newCheckpoint = waitCheckpointCompleted();
}
@@ -456,79 +423,6 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
}
}
- private static class AsyncSubtaskIndexFlatMapper
- extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
- implements CheckpointedFunction {
-
- private static final long serialVersionUID = 1L;
-
- private transient
org.apache.flink.api.common.state.v2.ValueState<Integer> counter;
- private transient
org.apache.flink.api.common.state.v2.ValueState<Integer> sum;
-
- private final int numberElements;
-
- public AsyncSubtaskIndexFlatMapper(int numberElements) {
- this.numberElements = numberElements;
- }
-
- @Override
- public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>>
out)
- throws Exception {
- StateFuture<Integer> counterFuture =
- counter.asyncValue()
- .thenCompose(
- (Integer c) -> {
- int updated = c == null ? 1 : c + 1;
- return counter.asyncUpdate(updated)
- .thenApply(nothing -> updated);
- });
- StateFuture<Integer> sumFuture =
- sum.asyncValue()
- .thenCompose(
- (Integer s) -> {
- int updated = s == null ? value : s +
value;
- return sum.asyncUpdate(updated)
- .thenApply(nothing -> updated);
- });
-
- counterFuture.thenCombine(
- sumFuture,
- (c, s) -> {
- if (c == numberElements) {
- out.collect(
- Tuple2.of(
- getRuntimeContext()
- .getTaskInfo()
- .getIndexOfThisSubtask(),
- s));
- }
- return null;
- });
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws
Exception {
- // all managed, nothing to do.
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context)
throws Exception {}
-
- @Override
- public void open(OpenContext openContext) throws Exception {
- counter =
- ((StreamingRuntimeContext) getRuntimeContext())
- .getValueState(
- new
org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
- "counter",
BasicTypeInfo.INT_TYPE_INFO));
- sum =
- ((StreamingRuntimeContext) getRuntimeContext())
- .getValueState(
- new
org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
- "sum",
BasicTypeInfo.INT_TYPE_INFO));
- }
- }
-
private static class CollectionSink<IN> implements Sink<IN> {
private static final ConcurrentHashMap<JobID, CollectionSinkWriter<?>>
writers =