This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6099697b501 [FLINK-37158][tests] Introduce ForSt to existing ITCases
(#26000)
6099697b501 is described below
commit 6099697b501d58d08deb9154f77cb81e46117fd6
Author: Yanfei Lei <[email protected]>
AuthorDate: Wed Mar 12 13:29:24 2025 +0800
[FLINK-37158][tests] Introduce ForSt to existing ITCases (#26000)
---
.../forst/sync/ForStSyncKeyedStateBackend.java | 3 +-
.../test/checkpointing/AutoRescalingITCase.java | 4 +-
.../EventTimeWindowCheckpointingITCase.java | 34 ++++++
.../KeyedStateCheckpointingITCase.java | 18 ++++
.../RescaleCheckpointManuallyITCase.java | 118 +++++++++++++++++++--
5 files changed, 169 insertions(+), 8 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
index 63bdb9eda78..da26ae0d937 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
@@ -608,7 +608,8 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
@Nonnull
@Override
public SavepointResources<K> savepoint() throws Exception {
- throw new UnsupportedOperationException("This method is not
supported.");
+ throw new UnsupportedOperationException(
+ "Canonical savepoints are not supported by ForSt State
Backend.");
}
@Override
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
index 374e53963c9..42eb9992f89 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
@@ -116,7 +116,9 @@ public class AutoRescalingITCase extends TestLogger {
new Object[][] {
{"rocksdb", false},
{"rocksdb", true},
- {"hashmap", false}
+ {"hashmap", false},
+ {"forst", false},
+ {"forst", true}
});
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index d3369ff7598..ca07e3a6ab2 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -35,6 +35,8 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.state.forst.ForStOptions;
+import org.apache.flink.state.forst.ForStStateBackend;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.RocksDBOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -110,6 +112,7 @@ public class EventTimeWindowCheckpointingITCase extends
TestLogger {
ROCKSDB_FULL,
ROCKSDB_INCREMENTAL,
ROCKSDB_INCREMENTAL_ZK,
+ FORST_INCREMENTAL
}
@Parameterized.Parameters(name = "statebackend type ={0}")
@@ -191,6 +194,14 @@ public class EventTimeWindowCheckpointingITCase extends
TestLogger {
setupRocksDB(config, 16, true);
break;
}
+ case FORST_INCREMENTAL:
+ {
+ config.set(
+ ForStOptions.TIMER_SERVICE_FACTORY,
+ ForStStateBackend.PriorityQueueStateType.ForStDB);
+ setupForSt(config, 16);
+ break;
+ }
default:
throw new IllegalStateException("No backend selected.");
}
@@ -229,6 +240,29 @@ public class EventTimeWindowCheckpointingITCase extends
TestLogger {
config.set(RocksDBOptions.LOCAL_DIRECTORIES, rocksDb);
}
+ private void setupForSt(Configuration config, int fileSizeThreshold)
throws IOException {
+ // Configure the managed memory size as 64MB per slot for rocksDB
state backend.
+ config.set(
+ TaskManagerOptions.MANAGED_MEMORY_SIZE,
+ MemorySize.ofMebiBytes(PARALLELISM / NUM_OF_TASK_MANAGERS *
64));
+
+ final String forstdb = tempFolder.newFolder().getAbsolutePath();
+ final File backups = tempFolder.newFolder().getAbsoluteFile();
+ // we use the fs backend with small threshold here to test the
behaviour with file
+ // references, not self contained byte handles
+ config.set(StateBackendOptions.STATE_BACKEND, "forst");
+ config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
+ config.set(
+ CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+ Path.fromLocalFile(backups).toUri().toString());
+ if (fileSizeThreshold != -1) {
+ config.set(
+ CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
+ MemorySize.parse(fileSizeThreshold + "b"));
+ }
+ config.set(ForStOptions.LOCAL_DIRECTORIES, forstdb);
+ }
+
protected Configuration createClusterConfig() throws IOException {
TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
index 150096ab43a..be9ba1eeac8 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -25,10 +25,13 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.state.forst.ForStOptions;
import org.apache.flink.state.rocksdb.RocksDBOptions;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -143,6 +146,21 @@ public class KeyedStateCheckpointingITCase extends
TestLogger {
testProgramWithBackend(env);
}
+ @Test
+ public void testWithForStBackendIncremental() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.configure(
+ new Configuration()
+ .set(StateBackendOptions.STATE_BACKEND, "forst")
+ .set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS,
true)
+ .set(
+ ForStOptions.LOCAL_DIRECTORIES,
+ tmpFolder.newFolder().getAbsolutePath()));
+ CheckpointStorageUtils.configureFileSystemCheckpointStorage(
+ env, tmpFolder.newFolder().toURI().toString());
+ testProgramWithBackend(env);
+ }
+
// ------------------------------------------------------------------------
protected void testProgramWithBackend(StreamExecutionEnvironment env)
throws Exception {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
index a4f05362348..f811c267404 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
@@ -19,9 +19,12 @@
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
@@ -40,8 +43,10 @@ import
org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
@@ -56,8 +61,12 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
@@ -74,6 +83,7 @@ import static org.junit.Assert.assertNotNull;
* NotifyingDefiniteKeySource, SubtaskIndexFlatMapper and CollectionSink refer
to RescalingITCase,
* because the static fields in these classes can not be shared.
*/
+@RunWith(Parameterized.class)
public class RescaleCheckpointManuallyITCase extends TestLogger {
private static final int NUM_TASK_MANAGERS = 2;
@@ -84,10 +94,24 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
@ClassRule public static TemporaryFolder temporaryFolder = new
TemporaryFolder();
+ @Parameterized.Parameter(0)
+ public String statebackendType;
+
+ @Parameterized.Parameter(1)
+ public boolean enableAsyncState;
+
+ @Parameterized.Parameters(name = "statebackend type ={0},
enableAsyncState={1}")
+ public static Collection<Object[]> parameter() {
+ return Arrays.asList(
+ new Object[][] {
+ {"forst", true}, {"forst", false}, {"rocksdb", true},
{"rocksdb", false}
+ });
+ }
+
@Before
public void setup() throws Exception {
Configuration config = new Configuration();
- config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
+ config.set(StateBackendOptions.STATE_BACKEND, statebackendType);
config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
cluster =
@@ -263,7 +287,7 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
SharedReference<JobID> jobID = sharedObjects.add(new JobID());
SharedReference<MiniCluster> miniClusterRef =
sharedObjects.add(miniCluster);
- DataStream<Integer> input =
+ KeyedStream<Integer, Integer> input =
env.addSource(
new NotifyingDefiniteKeySource(
numberKeys, numberElements,
failAfterEmission) {
@@ -300,10 +324,18 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
return value;
}
});
- DataStream<Tuple2<Integer, Integer>> result =
- input.flatMap(new
SubtaskIndexFlatMapper(numberElementsExpect));
+ if (enableAsyncState) {
+ input.enableAsyncState();
+ DataStream<Tuple2<Integer, Integer>> result =
+ input.flatMap(new
AsyncSubtaskIndexFlatMapper(numberElementsExpect));
- result.sinkTo(new CollectionSink<>());
+ result.sinkTo(new CollectionSink<>());
+ } else {
+ DataStream<Tuple2<Integer, Integer>> result =
+ input.flatMap(new
SubtaskIndexFlatMapper(numberElementsExpect));
+
+ result.sinkTo(new CollectionSink<>());
+ }
return
env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobID.get());
}
@@ -349,8 +381,9 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
} else {
boolean newCheckpoint = false;
long waited = 0L;
+ running = false;
// maximum wait 5min
- while (!newCheckpoint && waited < 30000L) {
+ while (!newCheckpoint && waited < 300000L) {
synchronized (ctx.getCheckpointLock()) {
newCheckpoint = waitCheckpointCompleted();
}
@@ -423,6 +456,79 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
}
}
+ private static class AsyncSubtaskIndexFlatMapper
+ extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
+ implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient
org.apache.flink.api.common.state.v2.ValueState<Integer> counter;
+ private transient
org.apache.flink.api.common.state.v2.ValueState<Integer> sum;
+
+ private final int numberElements;
+
+ public AsyncSubtaskIndexFlatMapper(int numberElements) {
+ this.numberElements = numberElements;
+ }
+
+ @Override
+ public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>>
out)
+ throws Exception {
+ StateFuture<Integer> counterFuture =
+ counter.asyncValue()
+ .thenCompose(
+ (Integer c) -> {
+ int updated = c == null ? 1 : c + 1;
+ return counter.asyncUpdate(updated)
+ .thenApply(nothing -> updated);
+ });
+ StateFuture<Integer> sumFuture =
+ sum.asyncValue()
+ .thenCompose(
+ (Integer s) -> {
+ int updated = s == null ? value : s +
value;
+ return sum.asyncUpdate(updated)
+ .thenApply(nothing -> updated);
+ });
+
+ counterFuture.thenCombine(
+ sumFuture,
+ (c, s) -> {
+ if (c == numberElements) {
+ out.collect(
+ Tuple2.of(
+ getRuntimeContext()
+ .getTaskInfo()
+ .getIndexOfThisSubtask(),
+ s));
+ }
+ return null;
+ });
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ // all managed, nothing to do.
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context)
throws Exception {}
+
+ @Override
+ public void open(OpenContext openContext) throws Exception {
+ counter =
+ ((StreamingRuntimeContext) getRuntimeContext())
+ .getValueState(
+ new
org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
+ "counter",
BasicTypeInfo.INT_TYPE_INFO));
+ sum =
+ ((StreamingRuntimeContext) getRuntimeContext())
+ .getValueState(
+ new
org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
+ "sum",
BasicTypeInfo.INT_TYPE_INFO));
+ }
+ }
+
private static class CollectionSink<IN> implements Sink<IN> {
private static final ConcurrentHashMap<JobID, CollectionSinkWriter<?>>
writers =