This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 795fcbe79a7 [FLINK-37554][state/forst] Add UT for state compatibility
between ForStKeyeStateBackend and ForStSyncKeyeStateBackend (#26440)
795fcbe79a7 is described below
commit 795fcbe79a76effd9fb310671bf868c27161341c
Author: mayuehappy <[email protected]>
AuthorDate: Mon Apr 14 19:45:40 2025 +0800
[FLINK-37554][state/forst] Add UT for state compatibility between
ForStKeyeStateBackend and ForStSyncKeyeStateBackend (#26440)
---
.../forst/ForStAsyncAndSyncCompatibilityTest.java | 232 +++++++++++++++++++++
.../flink/state/forst/ForStStateMigrationTest.java | 2 +-
.../apache/flink/state/forst/ForStTestUtils.java | 26 +++
3 files changed, 259 insertions(+), 1 deletion(-)
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStAsyncAndSyncCompatibilityTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStAsyncAndSyncCompatibilityTest.java
new file mode 100644
index 00000000000..749f4ef621c
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStAsyncAndSyncCompatibilityTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.v2.StateDescriptorUtils;
+import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
+import org.apache.flink.util.IOUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+import static
org.apache.flink.state.forst.ForStStateTestBase.getMockEnvironment;
+import static
org.apache.flink.state.forst.ForStTestUtils.createKeyedStateBackend;
+import static
org.apache.flink.state.forst.ForStTestUtils.createSyncKeyedStateBackend;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+/** Compatibility test for {@link ForStKeyedStateBackend} and {@link
ForStSyncKeyedStateBackend}. */
+class ForStAsyncAndSyncCompatibilityTest {
+ protected ForStStateBackend forStStateBackend;
+ protected AsyncExecutionController<String> aec;
+ protected MailboxExecutor mailboxExecutor;
+
+ protected RecordContext<String> context;
+
+ protected MockEnvironment env;
+
+ @BeforeEach
+ public void setup(@TempDir File temporaryFolder) throws IOException {
+ FileSystem.initialize(new Configuration(), null);
+ Configuration configuration = new Configuration();
+ configuration.set(ForStOptions.PRIMARY_DIRECTORY,
temporaryFolder.toURI().toString());
+ forStStateBackend = new ForStStateBackend().configure(configuration,
null);
+
+ env = getMockEnvironment(temporaryFolder);
+
+ mailboxExecutor =
+ new MailboxExecutorImpl(
+ new TaskMailboxImpl(), 0,
StreamTaskActionExecutor.IMMEDIATE);
+ }
+
+ @Test
+ void testForStTransFromAsyncToSync() throws Exception {
+ ForStKeyedStateBackend<String> keyedBackend =
+ setUpAsyncKeyedStateBackend(Collections.emptyList());
+ MapStateDescriptor<Integer, String> descriptor =
+ new MapStateDescriptor<>(
+ "testState", IntSerializer.INSTANCE,
StringSerializer.INSTANCE);
+
+ MapState<Integer, String> asyncMapState =
+ keyedBackend.createState(1, IntSerializer.INSTANCE,
descriptor);
+
+ context = aec.buildContext("testRecord", "testKey");
+ context.retain();
+ aec.setCurrentContext(context);
+ asyncMapState.asyncPut(1, "1");
+ context.release();
+ aec.drainInflightRecords(0);
+
+ RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
+ keyedBackend.snapshot(
+ 1L,
+ System.currentTimeMillis(),
+ env.getCheckpointStorageAccess()
+ .resolveCheckpointStorageLocation(
+ 1L,
CheckpointStorageLocationReference.getDefault()),
+ CheckpointOptions.forCheckpointWithDefaultLocation());
+
+ if (!snapshot.isDone()) {
+ snapshot.run();
+ }
+ SnapshotResult<KeyedStateHandle> snapshotResult = snapshot.get();
+ KeyedStateHandle stateHandle =
snapshotResult.getJobManagerOwnedSnapshot();
+ IOUtils.closeQuietly(keyedBackend);
+ ForStSyncKeyedStateBackend<String> syncKeyedStateBackend =
+ createSyncKeyedStateBackend(
+ forStStateBackend,
+ env,
+ StringSerializer.INSTANCE,
+ Collections.singletonList(stateHandle));
+
+ try {
+ org.apache.flink.api.common.state.MapState<Integer, String>
syncMapState =
+ syncKeyedStateBackend.getOrCreateKeyedState(
+ IntSerializer.INSTANCE,
+
StateDescriptorUtils.transformFromV2ToV1(descriptor));
+ fail();
+
+ syncKeyedStateBackend.setCurrentKey("testKey");
+ ((InternalKvState) syncKeyedStateBackend).setCurrentNamespace(1);
+ assertThat(syncMapState.get(1)).isEqualTo("1");
+ } catch (Exception e) {
+ // Currently, ForStStateBackend does not support switching from
Async to Sync, so this
+ // exception will be caught here
+ assertThat(e).isInstanceOf(ClassCastException.class);
+ assertThat(e.getMessage())
+ .contains(
+
"org.apache.flink.runtime.state.v2.RegisteredKeyAndUserKeyValueStateBackendMetaInfo
cannot be cast to class
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo");
+
+ } finally {
+ IOUtils.closeQuietly(syncKeyedStateBackend);
+ }
+ }
+
+ @Test
+ void testForStTransFromSyncToAsync() throws Exception {
+ ForStSyncKeyedStateBackend<String> keyedBackend =
+ createSyncKeyedStateBackend(
+ forStStateBackend, env, StringSerializer.INSTANCE,
Collections.emptyList());
+ org.apache.flink.api.common.state.MapStateDescriptor<Integer, String>
descriptor =
+ new org.apache.flink.api.common.state.MapStateDescriptor<>(
+ "testState", IntSerializer.INSTANCE,
StringSerializer.INSTANCE);
+ org.apache.flink.api.common.state.MapState<Integer, String> mapState =
+ keyedBackend.getOrCreateKeyedState(IntSerializer.INSTANCE,
descriptor);
+ keyedBackend.setCurrentKey("testKey");
+ ((InternalKvState) mapState).setCurrentNamespace(1);
+ mapState.put(1, "1");
+
+ RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
+ keyedBackend.snapshot(
+ 1L,
+ System.currentTimeMillis(),
+ env.getCheckpointStorageAccess()
+ .resolveCheckpointStorageLocation(
+ 1L,
CheckpointStorageLocationReference.getDefault()),
+ CheckpointOptions.forCheckpointWithDefaultLocation());
+
+ if (!snapshot.isDone()) {
+ snapshot.run();
+ }
+ SnapshotResult<KeyedStateHandle> snapshotResult = snapshot.get();
+ KeyedStateHandle stateHandle =
snapshotResult.getJobManagerOwnedSnapshot();
+ IOUtils.closeQuietly(keyedBackend);
+
+ ForStKeyedStateBackend<String> asyncKeyedStateBackend =
+
setUpAsyncKeyedStateBackend(Collections.singletonList(stateHandle));
+
+ MapStateDescriptor<Integer, String> newStateDescriptor =
+ new MapStateDescriptor<>(
+ "testState", IntSerializer.INSTANCE,
StringSerializer.INSTANCE);
+ try {
+ MapState<Integer, String> asyncMapState =
+ asyncKeyedStateBackend.createState(
+ 1, IntSerializer.INSTANCE, newStateDescriptor);
+ fail();
+
+ context = aec.buildContext("testRecord", "testKey");
+ context.retain();
+ aec.setCurrentContext(context);
+ asyncMapState
+ .asyncGet(1)
+ .thenAccept(
+ value -> {
+ assertThat(value).isEqualTo("1");
+ });
+ context.release();
+ aec.drainInflightRecords(0);
+ } catch (Exception e) {
+ // Currently, ForStStateBackend does not support switching from
Sync to Async, so this
+ // exception will be caught here
+ assertThat(e).isInstanceOf(ClassCastException.class);
+ assertThat(e.getMessage())
+ .contains(
+
"org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo cannot
be cast to class
org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo");
+ } finally {
+ IOUtils.closeQuietly(asyncKeyedStateBackend);
+ }
+ }
+
+ private ForStKeyedStateBackend setUpAsyncKeyedStateBackend(
+ Collection<KeyedStateHandle> stateHandles) throws IOException {
+ ForStKeyedStateBackend<String> keyedStateBackend =
+ createKeyedStateBackend(
+ forStStateBackend, env, StringSerializer.INSTANCE,
stateHandles);
+ aec =
+ new AsyncExecutionController<>(
+ mailboxExecutor,
+ (a, b) -> {},
+ keyedStateBackend.createStateExecutor(),
+ new DeclarationManager(),
+ 1,
+ 100,
+ 0,
+ 1,
+ null,
+ null);
+ keyedStateBackend.setup(aec);
+ return keyedStateBackend;
+ }
+}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java
index 32841170a12..aef6116a804 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java
@@ -51,7 +51,7 @@ import static
org.apache.flink.state.forst.ForStTestUtils.createKeyedStateBacken
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
-/** Tests for {@link ForStListState}. */
+/** Tests for the State Migration of {@link ForStKeyedStateBackend}. */
public class ForStStateMigrationTest extends ForStStateTestBase {
@Test
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStTestUtils.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStTestUtils.java
index 62cbca93015..89e31baee08 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStTestUtils.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStTestUtils.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend;
import java.io.IOException;
import java.util.Collection;
@@ -58,6 +59,31 @@ public final class ForStTestUtils {
1.0));
}
+ public static <K> ForStSyncKeyedStateBackend<K>
createSyncKeyedStateBackend(
+ ForStStateBackend forStStateBackend,
+ Environment env,
+ TypeSerializer<K> keySerializer,
+ Collection<KeyedStateHandle> stateHandles)
+ throws IOException {
+
+ return (ForStSyncKeyedStateBackend<K>)
+ forStStateBackend.createKeyedStateBackend(
+ new KeyedStateBackendParametersImpl<>(
+ env,
+ env.getJobID(),
+ "test_op",
+ keySerializer,
+ 1,
+ new KeyGroupRange(0, 0),
+ env.getTaskKvStateRegistry(),
+ TtlTimeProvider.DEFAULT,
+ new UnregisteredMetricsGroup(),
+ (name, value) -> {},
+ stateHandles,
+ new CloseableRegistry(),
+ 1.0));
+ }
+
public static <K> ForStKeyedStateBackend<K> createKeyedStateBackend(
ForStStateBackend forStStateBackend, Environment env,
TypeSerializer<K> keySerializer)
throws IOException {