This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b07b0b4d994d961c9f3d5d2def1372cbbbc85cf3
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Wed Jul 17 17:42:44 2024 +0800

    [FLINK-35858][Runtime/State] Add namespace in internal async states
---
 .../asyncprocessing/AsyncExecutionController.java  |   8 ++
 .../runtime/asyncprocessing/RecordContext.java     |  18 +++
 .../asyncprocessing/StateRequestHandler.java       |   8 ++
 .../runtime/state/v2/InternalAggregatingState.java |   2 +-
 .../flink/runtime/state/v2/InternalKeyedState.java |   7 +-
 .../flink/runtime/state/v2/InternalListState.java  |   3 +-
 .../flink/runtime/state/v2/InternalMapState.java   |   2 +-
 .../v2/InternalPartitionedState.java}              |  20 +---
 .../runtime/state/v2/InternalReducingState.java    |   2 +-
 .../flink/runtime/state/v2/InternalValueState.java |   3 +-
 .../AsyncExecutionControllerTest.java              | 128 +++++++++++++++++++--
 .../state/v2/InternalAggregatingStateTest.java     |   2 +-
 .../runtime/state/v2/InternalListStateTest.java    |   3 +-
 .../runtime/state/v2/InternalMapStateTest.java     |   2 +-
 .../state/v2/InternalReducingStateTest.java        |   2 +-
 .../runtime/state/v2/InternalValueStateTest.java   |   3 +-
 .../state/forst/ForStStateRequestClassifier.java   |  12 +-
 .../apache/flink/state/forst/ForStValueState.java  |   2 +-
 .../state/forst/ForStDBOperationTestBase.java      |  10 +-
 .../forst/ForStGeneralMultiGetOperationTest.java   |   9 +-
 .../flink/state/forst/ForStStateExecutorTest.java  |  20 ++--
 .../state/forst/ForStWriteBatchOperationTest.java  |   9 +-
 22 files changed, 219 insertions(+), 56 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index d06888ab6c4..d48208e452f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -25,12 +25,14 @@ import org.apache.flink.core.state.InternalStateFuture;
 import 
org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
 import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.v2.InternalPartitionedState;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Optional;
@@ -255,6 +257,12 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
         return stateFuture;
     }
 
+    @Override
+    public <N> void setCurrentNamespaceForState(
+            @Nonnull InternalPartitionedState<N> state, N namespace) {
+        currentContext.setNamespace(state, namespace);
+    }
+
     <IN, OUT> void insertActiveBuffer(StateRequest<K, IN, OUT> request) {
         stateRequestsBuffer.enqueueToActive(request);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
index 86352ce1cf9..b9f48125c29 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
@@ -19,9 +19,12 @@
 package org.apache.flink.runtime.asyncprocessing;
 
 import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
+import org.apache.flink.runtime.state.v2.InternalPartitionedState;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.function.Consumer;
 
@@ -56,6 +59,9 @@ public class RecordContext<K> extends 
ReferenceCounted<RecordContext.DisposerRun
     /** The keyGroup to which key belongs. */
     private final int keyGroup;
 
+    /** The namespaces of states. Lazy initialization for saving memory. */
+    private Map<InternalPartitionedState<?>, Object> namespaces = null;
+
     /**
      * The extra context info which is used to hold customized data defined by 
state backend. The
      * state backend can use this field to cache some data that can be used 
multiple times in
@@ -111,6 +117,18 @@ public class RecordContext<K> extends 
ReferenceCounted<RecordContext.DisposerRun
         return keyGroup;
     }
 
+    @SuppressWarnings("unchecked")
+    public <N> N getNamespace(InternalPartitionedState<N> state) {
+        return namespaces == null ? null : (N) namespaces.get(state);
+    }
+
+    public <N> void setNamespace(InternalPartitionedState<N> state, N 
namespace) {
+        if (namespaces == null) {
+            namespaces = new HashMap<>();
+        }
+        namespaces.put(state, namespace);
+    }
+
     public void setExtra(Object extra) {
         this.extra = extra;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java
index 1288ad8a01b..fc81f87f9fb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.asyncprocessing;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.runtime.state.v2.InternalPartitionedState;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 /** The handler which can process {@link StateRequest}. */
@@ -39,4 +41,10 @@ public interface StateRequestHandler {
      */
     <IN, OUT> InternalStateFuture<OUT> handleRequest(
             @Nullable State state, StateRequestType type, @Nullable IN 
payload);
+
+    /**
+     * Set current namespace for a state. See {@link
+     * InternalPartitionedState#setCurrentNamespace(Object)}.
+     */
+    <N> void setCurrentNamespaceForState(@Nonnull InternalPartitionedState<N> 
state, N namespace);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalAggregatingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalAggregatingState.java
index 7f05cdbd4d2..971af6f7f7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalAggregatingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalAggregatingState.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.runtime.asyncprocessing.StateRequestType;
  * @param <ACC> TThe type of the accumulator (intermediate aggregation state).
  * @param <OUT> The type of the values that are returned from the state.
  */
-public class InternalAggregatingState<K, IN, ACC, OUT> extends 
InternalKeyedState<K, ACC>
+public class InternalAggregatingState<K, N, IN, ACC, OUT> extends 
InternalKeyedState<K, N, ACC>
         implements AggregatingState<IN, OUT> {
 
     protected final AggregateFunction<IN, ACC, OUT> aggregateFunction;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java
index 2b1be2a6764..408ca80f6c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java
@@ -37,7 +37,7 @@ import 
org.apache.flink.runtime.asyncprocessing.StateRequestType;
  * @param <V> The type of values kept internally in state.
  */
 @Internal
-public abstract class InternalKeyedState<K, V> implements State {
+public abstract class InternalKeyedState<K, N, V> implements 
InternalPartitionedState<N> {
 
     private final StateRequestHandler stateRequestHandler;
 
@@ -64,6 +64,11 @@ public abstract class InternalKeyedState<K, V> implements 
State {
         return stateRequestHandler.handleRequest(this, stateRequestType, 
payload);
     }
 
+    @Override
+    public void setCurrentNamespace(N namespace) {
+        stateRequestHandler.setCurrentNamespaceForState(this, namespace);
+    }
+
     @Override
     public final StateFuture<Void> asyncClear() {
         return handleRequest(StateRequestType.CLEAR, null);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java
index 7bcc652c2b5..ac1c43005c2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java
@@ -32,7 +32,8 @@ import java.util.List;
  * @param <K> The type of key the state is associated to.
  * @param <V> The type of values kept internally in state.
  */
-public class InternalListState<K, V> extends InternalKeyedState<K, V> 
implements ListState<V> {
+public class InternalListState<K, N, V> extends InternalKeyedState<K, N, V>
+        implements ListState<V> {
 
     public InternalListState(
             StateRequestHandler stateRequestHandler, ListStateDescriptor<V> 
stateDescriptor) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalMapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalMapState.java
index b718a3eb687..d028f9d611c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalMapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalMapState.java
@@ -34,7 +34,7 @@ import java.util.Map;
  * @param <UK> The type of user key of this state.
  * @param <V> The type of values kept internally in state.
  */
-public class InternalMapState<K, UK, V> extends InternalKeyedState<K, V>
+public class InternalMapState<K, N, UK, V> extends InternalKeyedState<K, N, V>
         implements MapState<UK, V> {
 
     public InternalMapState(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalPartitionedState.java
similarity index 57%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalPartitionedState.java
index 1288ad8a01b..46e35b89838 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalPartitionedState.java
@@ -16,27 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.asyncprocessing;
+package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.v2.State;
-import org.apache.flink.core.state.InternalStateFuture;
 
-import javax.annotation.Nullable;
-
-/** The handler which can process {@link StateRequest}. */
+/** A state that is partitioned into different namespaces. */
 @Internal
-public interface StateRequestHandler {
+public interface InternalPartitionedState<N> extends State {
 
     /**
-     * Submit a {@link StateRequest} to this StateRequestHandler.
+     * Set current namespace and access state under specified namespace 
afterward.
      *
-     * @param state the state to request. Could be {@code null} if the type is 
{@link
-     *     StateRequestType#SYNC_POINT}.
-     * @param type the type of this request.
-     * @param payload the payload input for this request.
-     * @return the state future.
+     * @param namespace the specified namespace
      */
-    <IN, OUT> InternalStateFuture<OUT> handleRequest(
-            @Nullable State state, StateRequestType type, @Nullable IN 
payload);
+    void setCurrentNamespace(N namespace);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalReducingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalReducingState.java
index eed6eae0dec..64d538d6bb1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalReducingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalReducingState.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.runtime.asyncprocessing.StateRequestType;
  * @param <K> The type of key the state is associated to.
  * @param <V> The type of values kept internally in state.
  */
-public class InternalReducingState<K, V> extends InternalKeyedState<K, V>
+public class InternalReducingState<K, N, V> extends InternalKeyedState<K, N, V>
         implements ReducingState<V> {
 
     protected final ReduceFunction<V> reduceFunction;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java
index 2a9c643d17a..8c87685a62b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java
@@ -30,7 +30,8 @@ import 
org.apache.flink.runtime.asyncprocessing.StateRequestType;
  * @param <K> The type of key the state is associated to.
  * @param <V> The type of values kept internally in state.
  */
-public class InternalValueState<K, V> extends InternalKeyedState<K, V> 
implements ValueState<V> {
+public class InternalValueState<K, N, V> extends InternalKeyedState<K, N, V>
+        implements ValueState<V> {
 
     public InternalValueState(
             StateRequestHandler stateRequestHandler, ValueStateDescriptor<V> 
valueStateDescriptor) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index e98a66b82a0..53337d71740 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
 import 
org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
 import org.apache.flink.core.state.StateFutureUtils;
@@ -45,6 +46,7 @@ import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@@ -232,6 +234,115 @@ class AsyncExecutionControllerTest {
         resourceRegistry.close();
     }
 
+    @Test
+    void testNamespace() throws IOException {
+        final Consumer<String> userCode =
+                (r) -> {
+                    valueState.setCurrentNamespace(r);
+                    valueState
+                            .asyncValue()
+                            .thenCompose(
+                                    val -> {
+                                        int updated = (val == null ? 1 : (val 
+ 1));
+                                        return valueState
+                                                .asyncUpdate(updated)
+                                                .thenCompose(
+                                                        o ->
+                                                                
StateFutureUtils.completedFuture(
+                                                                        
updated));
+                                    })
+                            .thenAccept(val -> output.set(val));
+                };
+        CloseableRegistry resourceRegistry = new CloseableRegistry();
+        setup(
+                100,
+                10000L,
+                1000,
+                new SyncMailboxExecutor(),
+                new TestAsyncFrameworkExceptionHandler(),
+                resourceRegistry);
+        // ============================ element1 ============================
+        String record1 = "key1-r1";
+        String key1 = "key1";
+        // Simulate the wrapping in {@link 
RecordProcessorUtils#getRecordProcessor()}, wrapping the
+        // record and key with RecordContext.
+        RecordContext<String> recordContext1 = aec.buildContext(record1, key1);
+        aec.setCurrentContext(recordContext1);
+        // execute user code
+        userCode.accept(record1);
+
+        // Single-step run.
+        // Firstly, the user code generates value get in active buffer.
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        assertThat(aec.inFlightRecordNum.get()).isEqualTo(1);
+        aec.triggerIfNeeded(true);
+        // After running, the value update is in active buffer.
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        aec.triggerIfNeeded(true);
+        // Value update finishes.
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
+        assertThat(output.get()).isEqualTo(1);
+        assertThat(recordContext1.getReferenceCount()).isEqualTo(0);
+        assertThat(aec.inFlightRecordNum.get()).isEqualTo(0);
+
+        // ============================ element 2 & 3(1) 
============================
+        String record2 = "key1-r2";
+        String key2 = "key1";
+        RecordContext<String> recordContext2 = aec.buildContext(record2, key2);
+        aec.setCurrentContext(recordContext2);
+        // execute user code
+        userCode.accept(record2);
+
+        String record3 = "key1-r1";
+        String key3 = "key1";
+        RecordContext<String> recordContext3 = aec.buildContext(record3, key3);
+        aec.setCurrentContext(recordContext3);
+        // execute user code
+        userCode.accept(record3);
+
+        // Single-step run.
+        // Firstly, the user code for record2 generates value get in active 
buffer,
+        // while user code for record3 generates value get in blocking buffer.
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
+        assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        assertThat(aec.inFlightRecordNum.get()).isEqualTo(2);
+        aec.triggerIfNeeded(true);
+        // After running, the value update for record2 is in active buffer.
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
+        assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        assertThat(aec.inFlightRecordNum.get()).isEqualTo(2);
+        aec.triggerIfNeeded(true);
+        // Value update for record2 finishes. The value get for record3 is 
migrated from blocking
+        // buffer to active buffer actively.
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        assertThat(aec.inFlightRecordNum.get()).isEqualTo(1);
+        assertThat(output.get()).isEqualTo(1);
+        assertThat(recordContext2.getReferenceCount()).isEqualTo(0);
+        assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
+
+        // Let value get for record3 to run.
+        aec.triggerIfNeeded(true);
+        // After running, the value update for record3 is in active buffer.
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        assertThat(aec.inFlightRecordNum.get()).isEqualTo(1);
+        aec.triggerIfNeeded(true);
+        // Value update for record3 finishes.
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
+        assertThat(aec.inFlightRecordNum.get()).isEqualTo(0);
+        assertThat(output.get()).isEqualTo(2);
+        assertThat(recordContext3.getReferenceCount()).isEqualTo(0);
+
+        resourceRegistry.close();
+    }
+
     @Test
     void testRecordsRunInOrder() throws IOException {
         CloseableRegistry resourceRegistry = new CloseableRegistry();
@@ -727,22 +838,22 @@ class AsyncExecutionControllerTest {
     /** Simulate the underlying state that is actually used to execute the 
request. */
     static class TestUnderlyingState {
 
-        private final HashMap<String, Integer> hashMap;
+        private final HashMap<Tuple2<String, String>, Integer> hashMap;
 
         public TestUnderlyingState() {
             this.hashMap = new HashMap<>();
         }
 
-        public Integer get(String key) {
-            return hashMap.get(key);
+        public Integer get(String key, String namespace) {
+            return hashMap.get(Tuple2.of(key, namespace));
         }
 
-        public void update(String key, Integer val) {
-            hashMap.put(key, val);
+        public void update(String key, String namespace, Integer val) {
+            hashMap.put(Tuple2.of(key, namespace), val);
         }
     }
 
-    static class TestValueState extends InternalValueState<String, Integer> {
+    static class TestValueState extends InternalValueState<String, String, 
Integer> {
 
         private final TestUnderlyingState underlyingState;
 
@@ -776,7 +887,9 @@ class AsyncExecutionControllerTest {
                     Preconditions.checkState(request.getState() != null);
                     TestValueState state = (TestValueState) request.getState();
                     Integer val =
-                            state.underlyingState.get((String) 
request.getRecordContext().getKey());
+                            state.underlyingState.get(
+                                    (String) 
request.getRecordContext().getKey(),
+                                    (String) 
request.getRecordContext().getNamespace(state));
                     request.getFuture().complete(val);
                 } else if (request.getRequestType() == 
StateRequestType.VALUE_UPDATE) {
                     Preconditions.checkState(request.getState() != null);
@@ -784,6 +897,7 @@ class AsyncExecutionControllerTest {
 
                     state.underlyingState.update(
                             (String) request.getRecordContext().getKey(),
+                            (String) 
request.getRecordContext().getNamespace(state),
                             (Integer) request.getPayload());
                     request.getFuture().complete(null);
                 } else {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalAggregatingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalAggregatingStateTest.java
index 6e9724a6155..97410748189 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalAggregatingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalAggregatingStateTest.java
@@ -55,7 +55,7 @@ class InternalAggregatingStateTest extends 
InternalKeyedStateTestBase {
         AggregatingStateDescriptor<Integer, Integer, Integer> descriptor =
                 new AggregatingStateDescriptor<>(
                         "testAggState", aggregator, 
BasicTypeInfo.INT_TYPE_INFO);
-        InternalAggregatingState<String, Integer, Integer, Integer> state =
+        InternalAggregatingState<String, Void, Integer, Integer, Integer> 
state =
                 new InternalAggregatingState<>(aec, descriptor);
 
         aec.setCurrentContext(aec.buildContext("test", "test"));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java
index 941092fcdb3..ccb4c977233 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java
@@ -34,7 +34,8 @@ public class InternalListStateTest extends 
InternalKeyedStateTestBase {
     public void testEachOperation() {
         ListStateDescriptor<Integer> descriptor =
                 new ListStateDescriptor<>("testState", 
BasicTypeInfo.INT_TYPE_INFO);
-        InternalListState<String, Integer> listState = new 
InternalListState<>(aec, descriptor);
+        InternalListState<String, Void, Integer> listState =
+                new InternalListState<>(aec, descriptor);
         aec.setCurrentContext(aec.buildContext("test", "test"));
 
         listState.asyncClear();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalMapStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalMapStateTest.java
index 3eb7c652cce..3cfd214c0ae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalMapStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalMapStateTest.java
@@ -36,7 +36,7 @@ public class InternalMapStateTest extends 
InternalKeyedStateTestBase {
         MapStateDescriptor<String, Integer> descriptor =
                 new MapStateDescriptor<>(
                         "testState", BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO);
-        InternalMapState<String, String, Integer> mapState =
+        InternalMapState<String, Void, String, Integer> mapState =
                 new InternalMapState<>(aec, descriptor);
         aec.setCurrentContext(aec.buildContext("test", "test"));
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalReducingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalReducingStateTest.java
index 80ccc1ede9a..6340952aad3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalReducingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalReducingStateTest.java
@@ -33,7 +33,7 @@ public class InternalReducingStateTest extends 
InternalKeyedStateTestBase {
         ReduceFunction<Integer> reducer = Integer::sum;
         ReducingStateDescriptor<Integer> descriptor =
                 new ReducingStateDescriptor<>("testState", reducer, 
BasicTypeInfo.INT_TYPE_INFO);
-        InternalReducingState<String, Integer> reducingState =
+        InternalReducingState<String, Void, Integer> reducingState =
                 new InternalReducingState<>(aec, descriptor);
         aec.setCurrentContext(aec.buildContext("test", "test"));
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalValueStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalValueStateTest.java
index 801f264573e..d4754ce7f55 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalValueStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalValueStateTest.java
@@ -31,7 +31,8 @@ public class InternalValueStateTest extends 
InternalKeyedStateTestBase {
     public void testEachOperation() {
         ValueStateDescriptor<Integer> descriptor =
                 new ValueStateDescriptor<>("testState", 
BasicTypeInfo.INT_TYPE_INFO);
-        InternalValueState<String, Integer> valueState = new 
InternalValueState<>(aec, descriptor);
+        InternalValueState<String, Void, Integer> valueState =
+                new InternalValueState<>(aec, descriptor);
         aec.setCurrentContext(aec.buildContext("test", "test"));
 
         valueState.asyncClear();
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
index f32b3b665af..42324eccb67 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
@@ -56,23 +56,23 @@ public class ForStStateRequestClassifier implements 
StateRequestContainer {
         switch (stateRequestType) {
             case VALUE_GET:
                 {
-                    ForStValueState<?, ?> forStValueState =
-                            (ForStValueState<?, ?>) stateRequest.getState();
+                    ForStValueState<?, ?, ?> forStValueState =
+                            (ForStValueState<?, ?, ?>) stateRequest.getState();
                     
dbGetRequests.add(forStValueState.buildDBGetRequest(stateRequest));
                     return;
                 }
             case VALUE_UPDATE:
                 {
-                    ForStValueState<?, ?> forStValueState =
-                            (ForStValueState<?, ?>) stateRequest.getState();
+                    ForStValueState<?, ?, ?> forStValueState =
+                            (ForStValueState<?, ?, ?>) stateRequest.getState();
                     
dbPutRequests.add(forStValueState.buildDBPutRequest(stateRequest));
                     return;
                 }
             case CLEAR:
                 {
                     if (stateRequest.getState() instanceof ForStValueState) {
-                        ForStValueState<?, ?> forStValueState =
-                                (ForStValueState<?, ?>) 
stateRequest.getState();
+                        ForStValueState<?, ?, ?> forStValueState =
+                                (ForStValueState<?, ?, ?>) 
stateRequest.getState();
                         
dbPutRequests.add(forStValueState.buildDBPutRequest(stateRequest));
                         return;
                     } else {
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
index 52f2e6d848c..153cdbc0fa1 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
@@ -42,7 +42,7 @@ import java.util.function.Supplier;
  * @param <K> The type of the key.
  * @param <V> The type of the value.
  */
-public class ForStValueState<K, V> extends InternalValueState<K, V>
+public class ForStValueState<K, N, V> extends InternalValueState<K, N, V>
         implements ValueState<V>, ForStInnerTable<ContextKey<K>, V> {
 
     /** The column family which this internal value state belongs to. */
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
index 73b914ebe52..fa09bfc46d7 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.v2.InternalPartitionedState;
 import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
 import org.apache.flink.util.function.BiFunctionWithException;
 import org.apache.flink.util.function.FunctionWithException;
@@ -45,6 +46,7 @@ import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.RocksDB;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.nio.file.Path;
@@ -84,6 +86,12 @@ public class ForStDBOperationTestBase {
                     @Nullable State state, StateRequestType type, @Nullable IN 
payload) {
                 throw new UnsupportedOperationException();
             }
+
+            @Override
+            public <N> void setCurrentNamespaceForState(
+                    @Nonnull InternalPartitionedState<N> state, N namespace) {
+                throw new UnsupportedOperationException();
+            }
         };
     }
 
@@ -94,7 +102,7 @@ public class ForStDBOperationTestBase {
         return new ContextKey<>(recordContext);
     }
 
-    protected ForStValueState<Integer, String> buildForStValueState(String 
stateName)
+    protected ForStValueState<Integer, Void, String> 
buildForStValueState(String stateName)
             throws Exception {
         ColumnFamilyHandle cf = createColumnFamilyHandle(stateName);
         ValueStateDescriptor<String> valueStateDescriptor =
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
index 5b8263ec25e..2b24a65d650 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
@@ -34,15 +34,18 @@ public class ForStGeneralMultiGetOperationTest extends 
ForStDBOperationTestBase
 
     @Test
     public void testValueStateMultiGet() throws Exception {
-        ForStValueState<Integer, String> valueState1 = 
buildForStValueState("test-multiGet-1");
-        ForStValueState<Integer, String> valueState2 = 
buildForStValueState("test-multiGet-2");
+        ForStValueState<Integer, Void, String> valueState1 =
+                buildForStValueState("test-multiGet-1");
+        ForStValueState<Integer, Void, String> valueState2 =
+                buildForStValueState("test-multiGet-2");
         List<ForStDBGetRequest<?, ?>> batchGetRequest = new ArrayList<>();
         List<Tuple2<String, TestStateFuture<String>>> resultCheckList = new 
ArrayList<>();
 
         int keyNum = 1000;
         for (int i = 0; i < keyNum; i++) {
             TestStateFuture<String> future = new TestStateFuture<>();
-            ForStValueState<Integer, String> table = ((i % 2 == 0) ? 
valueState1 : valueState2);
+            ForStValueState<Integer, Void, String> table =
+                    ((i % 2 == 0) ? valueState1 : valueState2);
             ForStDBGetRequest<ContextKey<Integer>, String> request =
                     ForStDBGetRequest.of(buildContextKey(i), table, future);
             batchGetRequest.add(request);
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
index e8717750329..95ea527fa2a 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
@@ -42,8 +42,8 @@ public class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
     @SuppressWarnings("unchecked")
     public void testExecuteValueStateRequest() throws Exception {
         ForStStateExecutor forStStateExecutor = new ForStStateExecutor(4, db, 
new WriteOptions());
-        ForStValueState<Integer, String> state1 = 
buildForStValueState("value-state-1");
-        ForStValueState<Integer, String> state2 = 
buildForStValueState("value-state-2");
+        ForStValueState<Integer, Void, String> state1 = 
buildForStValueState("value-state-1");
+        ForStValueState<Integer, Void, String> state2 = 
buildForStValueState("value-state-2");
 
         StateRequestContainer stateRequestContainer =
                 forStStateExecutor.createStateRequestContainer();
@@ -52,7 +52,7 @@ public class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
         // 1. Update value state: keyRange [0, keyNum)
         int keyNum = 1000;
         for (int i = 0; i < keyNum; i++) {
-            ForStValueState<Integer, String> state = (i % 2 == 0 ? state1 : 
state2);
+            ForStValueState<Integer, Void, String> state = (i % 2 == 0 ? 
state1 : state2);
             stateRequestContainer.offer(
                     buildStateRequest(state, StateRequestType.VALUE_UPDATE, i, 
"test-" + i, i * 2));
         }
@@ -64,14 +64,14 @@ public class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
         // 2. Get value state: keyRange [0, keyNum)
         //    Update value state: keyRange [keyNum, keyNum + 100]
         for (int i = 0; i < keyNum; i++) {
-            ForStValueState<Integer, String> state = (i % 2 == 0 ? state1 : 
state2);
+            ForStValueState<Integer, Void, String> state = (i % 2 == 0 ? 
state1 : state2);
             StateRequest<?, ?, ?> getRequest =
                     buildStateRequest(state, StateRequestType.VALUE_GET, i, 
null, i * 2);
             stateRequestContainer.offer(getRequest);
             checkList.add(getRequest);
         }
         for (int i = keyNum; i < keyNum + 100; i++) {
-            ForStValueState<Integer, String> state = (i % 2 == 0 ? state1 : 
state2);
+            ForStValueState<Integer, Void, String> state = (i % 2 == 0 ? 
state1 : state2);
             stateRequestContainer.offer(
                     buildStateRequest(state, StateRequestType.VALUE_UPDATE, i, 
"test-" + i, i * 2));
         }
@@ -90,12 +90,12 @@ public class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
         //    Update state with null-value : keyRange [keyNum, keyNum + 100]
         stateRequestContainer = 
forStStateExecutor.createStateRequestContainer();
         for (int i = keyNum - 100; i < keyNum; i++) {
-            ForStValueState<Integer, String> state = (i % 2 == 0 ? state1 : 
state2);
+            ForStValueState<Integer, Void, String> state = (i % 2 == 0 ? 
state1 : state2);
             stateRequestContainer.offer(
                     buildStateRequest(state, StateRequestType.CLEAR, i, null, 
i * 2));
         }
         for (int i = keyNum; i < keyNum + 100; i++) {
-            ForStValueState<Integer, String> state = (i % 2 == 0 ? state1 : 
state2);
+            ForStValueState<Integer, Void, String> state = (i % 2 == 0 ? 
state1 : state2);
             stateRequestContainer.offer(
                     buildStateRequest(state, StateRequestType.VALUE_UPDATE, i, 
null, i * 2));
         }
@@ -105,7 +105,7 @@ public class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
         stateRequestContainer = 
forStStateExecutor.createStateRequestContainer();
         checkList.clear();
         for (int i = keyNum - 100; i < keyNum + 100; i++) {
-            ForStValueState<Integer, String> state = (i % 2 == 0 ? state1 : 
state2);
+            ForStValueState<Integer, Void, String> state = (i % 2 == 0 ? 
state1 : state2);
             StateRequest<?, ?, ?> getRequest =
                     buildStateRequest(state, StateRequestType.VALUE_GET, i, 
null, i * 2);
             stateRequestContainer.offer(getRequest);
@@ -121,8 +121,8 @@ public class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private <K, V, R> StateRequest<?, ?, ?> buildStateRequest(
-            InternalKeyedState<K, V> innerTable,
+    private <K, N, V, R> StateRequest<?, ?, ?> buildStateRequest(
+            InternalKeyedState<K, N, V> innerTable,
             StateRequestType requestType,
             K key,
             V value,
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java
index 19e11684fbf..dce11d12d77 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java
@@ -34,8 +34,10 @@ public class ForStWriteBatchOperationTest extends 
ForStDBOperationTestBase {
 
     @Test
     public void testValueStateWriteBatch() throws Exception {
-        ForStValueState<Integer, String> valueState1 = 
buildForStValueState("test-write-batch-1");
-        ForStValueState<Integer, String> valueState2 = 
buildForStValueState("test-write-batch-2");
+        ForStValueState<Integer, Void, String> valueState1 =
+                buildForStValueState("test-write-batch-1");
+        ForStValueState<Integer, Void, String> valueState2 =
+                buildForStValueState("test-write-batch-2");
         List<ForStDBPutRequest<?, ?>> batchPutRequest = new ArrayList<>();
         int keyNum = 100;
         for (int i = 0; i < keyNum; i++) {
@@ -62,7 +64,8 @@ public class ForStWriteBatchOperationTest extends 
ForStDBOperationTestBase {
 
     @Test
     public void testWriteBatchWithNullValue() throws Exception {
-        ForStValueState<Integer, String> valueState = 
buildForStValueState("test-write-batch");
+        ForStValueState<Integer, Void, String> valueState =
+                buildForStValueState("test-write-batch");
         List<ForStDBPutRequest<?, ?>> batchPutRequest = new ArrayList<>();
         // 1. write some data without null value
         int keyNum = 100;


Reply via email to