This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4294c59e6f2ec8702f53916ea49cf23f6db8961
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Thu Jul 18 19:13:20 2024 +0800

    [FLINK-35858][State/ForSt] Support state namespace
---
 .../org/apache/flink/state/forst/ContextKey.java   | 11 ++++++++---
 .../flink/state/forst/ForStDBGetRequest.java       | 13 +++++++------
 .../flink/state/forst/ForStDBPutRequest.java       | 17 ++++++++++-------
 .../state/forst/ForStGeneralMultiGetOperation.java |  6 +++---
 .../apache/flink/state/forst/ForStInnerTable.java  |  8 ++++----
 .../flink/state/forst/ForStKeyedStateBackend.java  |  1 +
 .../flink/state/forst/ForStStateExecutor.java      |  4 ++--
 .../state/forst/ForStStateRequestClassifier.java   |  8 ++++----
 .../apache/flink/state/forst/ForStValueState.java  | 22 +++++++++++++---------
 .../state/forst/ForStWriteBatchOperation.java      | 10 +++++-----
 .../state/forst/ForStDBOperationTestBase.java      |  4 +++-
 .../forst/ForStGeneralMultiGetOperationTest.java   |  4 ++--
 .../state/forst/ForStWriteBatchOperationTest.java  |  8 ++++----
 13 files changed, 66 insertions(+), 50 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
index 05dd2f58796..9f73cefcc6e 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
@@ -19,6 +19,7 @@
 package org.apache.flink.state.forst;
 
 import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.runtime.state.v2.InternalPartitionedState;
 import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.concurrent.ThreadSafe;
@@ -32,7 +33,7 @@ import java.util.Objects;
  * @param <K> The type of the raw key.
  */
 @ThreadSafe
-public class ContextKey<K> {
+public class ContextKey<K, N> {
 
     private final RecordContext<K> recordContext;
 
@@ -48,6 +49,10 @@ public class ContextKey<K> {
         return recordContext.getKeyGroup();
     }
 
+    public N getNamespace(InternalPartitionedState<N> state) {
+        return recordContext.getNamespace(state);
+    }
+
     /**
      * Get the serialized key. If the cached serialized key within {@code 
RecordContext#payload} is
      * null, the provided serialization function will be called, and the 
serialization result will
@@ -57,7 +62,7 @@ public class ContextKey<K> {
      * @return the serialized bytes.
      */
     public byte[] getOrCreateSerializedKey(
-            FunctionWithException<ContextKey<K>, byte[], IOException> 
serializeKeyFunc)
+            FunctionWithException<ContextKey<K, N>, byte[], IOException> 
serializeKeyFunc)
             throws IOException {
         if (recordContext.getExtra() != null) {
             return (byte[]) recordContext.getExtra();
@@ -84,7 +89,7 @@ public class ContextKey<K> {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        ContextKey<?> that = (ContextKey<?>) o;
+        ContextKey<?, ?> that = (ContextKey<?, ?>) o;
         return Objects.equals(recordContext, that.recordContext);
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java
index 11868defb17..24c699cc018 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java
@@ -30,13 +30,14 @@ import java.io.IOException;
  * @param <K> The type of key in get access request.
  * @param <V> The type of value returned by get request.
  */
-public class ForStDBGetRequest<K, V> {
+public class ForStDBGetRequest<K, N, V> {
 
-    private final K key;
-    private final ForStInnerTable<K, V> table;
+    private final ContextKey<K, N> key;
+    private final ForStInnerTable<K, N, V> table;
     private final InternalStateFuture<V> future;
 
-    private ForStDBGetRequest(K key, ForStInnerTable<K, V> table, 
InternalStateFuture<V> future) {
+    private ForStDBGetRequest(
+            ContextKey<K, N> key, ForStInnerTable<K, N, V> table, 
InternalStateFuture<V> future) {
         this.key = key;
         this.table = table;
         this.future = future;
@@ -63,8 +64,8 @@ public class ForStDBGetRequest<K, V> {
         future.completeExceptionally(message, ex);
     }
 
-    static <K, V> ForStDBGetRequest<K, V> of(
-            K key, ForStInnerTable<K, V> table, InternalStateFuture<V> future) 
{
+    static <K, N, V> ForStDBGetRequest<K, N, V> of(
+            ContextKey<K, N> key, ForStInnerTable<K, N, V> table, 
InternalStateFuture<V> future) {
         return new ForStDBGetRequest<>(key, table, future);
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java
index cc868b8ebb3..bfe1b15d8cc 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java
@@ -32,15 +32,18 @@ import java.io.IOException;
  * @param <K> The type of key in put access request.
  * @param <V> The type of value in put access request.
  */
-public class ForStDBPutRequest<K, V> {
+public class ForStDBPutRequest<K, N, V> {
 
-    private final K key;
+    private final ContextKey<K, N> key;
     @Nullable private final V value;
-    private final ForStInnerTable<K, V> table;
+    private final ForStInnerTable<K, N, V> table;
     private final InternalStateFuture<Void> future;
 
     private ForStDBPutRequest(
-            K key, V value, ForStInnerTable<K, V> table, 
InternalStateFuture<Void> future) {
+            ContextKey<K, N> key,
+            V value,
+            ForStInnerTable<K, N, V> table,
+            InternalStateFuture<Void> future) {
         this.key = key;
         this.value = value;
         this.table = table;
@@ -76,10 +79,10 @@ public class ForStDBPutRequest<K, V> {
      * If the value of the ForStDBPutRequest is null, then the request will 
signify the deletion of
      * the data associated with that key.
      */
-    static <K, V> ForStDBPutRequest<K, V> of(
-            K key,
+    static <K, N, V> ForStDBPutRequest<K, N, V> of(
+            ContextKey<K, N> key,
             @Nullable V value,
-            ForStInnerTable<K, V> table,
+            ForStInnerTable<K, N, V> table,
             InternalStateFuture<Void> future) {
         return new ForStDBPutRequest<>(key, value, table, future);
     }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
index 77885d1a55a..ed7abbf7df8 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
@@ -38,12 +38,12 @@ public class ForStGeneralMultiGetOperation implements 
ForStDBOperation {
 
     private final RocksDB db;
 
-    private final List<ForStDBGetRequest<?, ?>> batchRequest;
+    private final List<ForStDBGetRequest<?, ?, ?>> batchRequest;
 
     private final Executor executor;
 
     ForStGeneralMultiGetOperation(
-            RocksDB db, List<ForStDBGetRequest<?, ?>> batchRequest, Executor 
executor) {
+            RocksDB db, List<ForStDBGetRequest<?, ?, ?>> batchRequest, 
Executor executor) {
         this.db = db;
         this.batchRequest = batchRequest;
         this.executor = executor;
@@ -58,7 +58,7 @@ public class ForStGeneralMultiGetOperation implements 
ForStDBOperation {
         AtomicReference<Exception> error = new AtomicReference<>();
         AtomicInteger counter = new AtomicInteger(batchRequest.size());
         for (int i = 0; i < batchRequest.size(); i++) {
-            ForStDBGetRequest<?, ?> request = batchRequest.get(i);
+            ForStDBGetRequest<?, ?, ?> request = batchRequest.get(i);
             executor.execute(
                     () -> {
                         try {
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java
index 5d36b72097b..e8b763ee816 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java
@@ -34,7 +34,7 @@ import java.io.IOException;
  * @param <K> The key type of the table.
  * @param <V> The value type of the table.
  */
-public interface ForStInnerTable<K, V> {
+public interface ForStInnerTable<K, N, V> {
 
     /** Get the columnFamily handle corresponding to table. */
     ColumnFamilyHandle getColumnFamilyHandle();
@@ -46,7 +46,7 @@ public interface ForStInnerTable<K, V> {
      * @return the key bytes
      * @throws IOException Thrown if the serialization encountered an I/O 
related error.
      */
-    byte[] serializeKey(K key) throws IOException;
+    byte[] serializeKey(ContextKey<K, N> key) throws IOException;
 
     /**
      * Serialize the given value to the outputView.
@@ -73,7 +73,7 @@ public interface ForStInnerTable<K, V> {
      * @param stateRequest The given stateRequest.
      * @return The corresponding ForSt GetRequest.
      */
-    ForStDBGetRequest<K, V> buildDBGetRequest(StateRequest<?, ?, ?> 
stateRequest);
+    ForStDBGetRequest<K, N, V> buildDBGetRequest(StateRequest<?, ?, ?> 
stateRequest);
 
     /**
      * Build a {@link ForStDBPutRequest} that belong to {@code 
ForStInnerTable} with the given
@@ -82,5 +82,5 @@ public interface ForStInnerTable<K, V> {
      * @param stateRequest The given stateRequest.
      * @return The corresponding ForSt PutRequest.
      */
-    ForStDBPutRequest<K, V> buildDBPutRequest(StateRequest<?, ?, ?> 
stateRequest);
+    ForStDBPutRequest<K, N, V> buildDBPutRequest(StateRequest<?, ?, ?> 
stateRequest);
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index e80cd18b2d2..10bfb819cf3 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -154,6 +154,7 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend {
                             columnFamilyHandle,
                             (ValueStateDescriptor<SV>) stateDesc,
                             serializedKeyBuilder,
+                            namespaceSerializer::duplicate,
                             valueSerializerView,
                             valueDeserializerView);
         }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
index e96ee32010d..7973ab05f3b 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
@@ -83,7 +83,7 @@ public class ForStStateExecutor implements StateExecutor {
                 () -> {
                     long startTime = System.currentTimeMillis();
                     List<CompletableFuture<Void>> futures = new ArrayList<>(2);
-                    List<ForStDBPutRequest<?, ?>> putRequests =
+                    List<ForStDBPutRequest<?, ?, ?>> putRequests =
                             stateRequestClassifier.pollDbPutRequests();
                     if (!putRequests.isEmpty()) {
                         ForStWriteBatchOperation writeOperations =
@@ -92,7 +92,7 @@ public class ForStStateExecutor implements StateExecutor {
                         futures.add(writeOperations.process());
                     }
 
-                    List<ForStDBGetRequest<?, ?>> getRequests =
+                    List<ForStDBGetRequest<?, ?, ?>> getRequests =
                             stateRequestClassifier.pollDbGetRequests();
                     if (!getRequests.isEmpty()) {
                         ForStGeneralMultiGetOperation getOperations =
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
index 42324eccb67..1cfc3ca4c8f 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
@@ -31,9 +31,9 @@ import java.util.List;
  */
 public class ForStStateRequestClassifier implements StateRequestContainer {
 
-    private final List<ForStDBGetRequest<?, ?>> dbGetRequests;
+    private final List<ForStDBGetRequest<?, ?, ?>> dbGetRequests;
 
-    private final List<ForStDBPutRequest<?, ?>> dbPutRequests;
+    private final List<ForStDBPutRequest<?, ?, ?>> dbPutRequests;
 
     public ForStStateRequestClassifier() {
         this.dbGetRequests = new ArrayList<>();
@@ -88,11 +88,11 @@ public class ForStStateRequestClassifier implements 
StateRequestContainer {
         }
     }
 
-    public List<ForStDBGetRequest<?, ?>> pollDbGetRequests() {
+    public List<ForStDBGetRequest<?, ?, ?>> pollDbGetRequests() {
         return dbGetRequests;
     }
 
-    public List<ForStDBPutRequest<?, ?>> pollDbPutRequests() {
+    public List<ForStDBPutRequest<?, ?, ?>> pollDbPutRequests() {
         return dbPutRequests;
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
index 153cdbc0fa1..d6bfb97b184 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
@@ -19,6 +19,7 @@
 package org.apache.flink.state.forst;
 
 import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.state.InternalStateFuture;
@@ -43,7 +44,7 @@ import java.util.function.Supplier;
  * @param <V> The type of the value.
  */
 public class ForStValueState<K, N, V> extends InternalValueState<K, N, V>
-        implements ValueState<V>, ForStInnerTable<ContextKey<K>, V> {
+        implements ValueState<V>, ForStInnerTable<K, N, V> {
 
     /** The column family which this internal value state belongs to. */
     private final ColumnFamilyHandle columnFamilyHandle;
@@ -51,6 +52,8 @@ public class ForStValueState<K, N, V> extends 
InternalValueState<K, N, V>
     /** The serialized key builder which should be thread-safe. */
     private final ThreadLocal<SerializedCompositeKeyBuilder<K>> 
serializedKeyBuilder;
 
+    private final ThreadLocal<TypeSerializer<N>> namespaceSerializer;
+
     /** The data outputStream used for value serializer, which should be 
thread-safe. */
     private final ThreadLocal<DataOutputSerializer> valueSerializerView;
 
@@ -62,11 +65,13 @@ public class ForStValueState<K, N, V> extends 
InternalValueState<K, N, V>
             ColumnFamilyHandle columnFamily,
             ValueStateDescriptor<V> valueStateDescriptor,
             Supplier<SerializedCompositeKeyBuilder<K>> 
serializedKeyBuilderInitializer,
+            Supplier<TypeSerializer<N>> namespaceSerializerInitializer,
             Supplier<DataOutputSerializer> valueSerializerViewInitializer,
             Supplier<DataInputDeserializer> valueDeserializerViewInitializer) {
         super(stateRequestHandler, valueStateDescriptor);
         this.columnFamilyHandle = columnFamily;
         this.serializedKeyBuilder = 
ThreadLocal.withInitial(serializedKeyBuilderInitializer);
+        this.namespaceSerializer = 
ThreadLocal.withInitial(namespaceSerializerInitializer);
         this.valueSerializerView = 
ThreadLocal.withInitial(valueSerializerViewInitializer);
         this.valueDeserializerView = 
ThreadLocal.withInitial(valueDeserializerViewInitializer);
     }
@@ -77,12 +82,13 @@ public class ForStValueState<K, N, V> extends 
InternalValueState<K, N, V>
     }
 
     @Override
-    public byte[] serializeKey(ContextKey<K> contextKey) throws IOException {
+    public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException 
{
         return contextKey.getOrCreateSerializedKey(
                 ctxKey -> {
                     SerializedCompositeKeyBuilder<K> builder = 
serializedKeyBuilder.get();
                     builder.setKeyAndKeyGroup(ctxKey.getRawKey(), 
ctxKey.getKeyGroup());
-                    return builder.build();
+                    return builder.buildCompositeKeyNamespace(
+                            contextKey.getNamespace(this), 
namespaceSerializer.get());
                 });
     }
 
@@ -103,10 +109,9 @@ public class ForStValueState<K, N, V> extends 
InternalValueState<K, N, V>
 
     @SuppressWarnings("unchecked")
     @Override
-    public ForStDBGetRequest<ContextKey<K>, V> buildDBGetRequest(
-            StateRequest<?, ?, ?> stateRequest) {
+    public ForStDBGetRequest<K, N, V> buildDBGetRequest(StateRequest<?, ?, ?> 
stateRequest) {
         Preconditions.checkArgument(stateRequest.getRequestType() == 
StateRequestType.VALUE_GET);
-        ContextKey<K> contextKey =
+        ContextKey<K, N> contextKey =
                 new ContextKey<>((RecordContext<K>) 
stateRequest.getRecordContext());
         return ForStDBGetRequest.of(
                 contextKey, this, (InternalStateFuture<V>) 
stateRequest.getFuture());
@@ -114,12 +119,11 @@ public class ForStValueState<K, N, V> extends 
InternalValueState<K, N, V>
 
     @SuppressWarnings("unchecked")
     @Override
-    public ForStDBPutRequest<ContextKey<K>, V> buildDBPutRequest(
-            StateRequest<?, ?, ?> stateRequest) {
+    public ForStDBPutRequest<K, N, V> buildDBPutRequest(StateRequest<?, ?, ?> 
stateRequest) {
         Preconditions.checkArgument(
                 stateRequest.getRequestType() == StateRequestType.VALUE_UPDATE
                         || stateRequest.getRequestType() == 
StateRequestType.CLEAR);
-        ContextKey<K> contextKey =
+        ContextKey<K, N> contextKey =
                 new ContextKey<>((RecordContext<K>) 
stateRequest.getRecordContext());
         V value =
                 (stateRequest.getRequestType() == StateRequestType.CLEAR)
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java
index 3d4e59e3247..188d5c49d20 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java
@@ -34,7 +34,7 @@ public class ForStWriteBatchOperation implements 
ForStDBOperation {
 
     private final RocksDB db;
 
-    private final List<ForStDBPutRequest<?, ?>> batchRequest;
+    private final List<ForStDBPutRequest<?, ?, ?>> batchRequest;
 
     private final WriteOptions writeOptions;
 
@@ -42,7 +42,7 @@ public class ForStWriteBatchOperation implements 
ForStDBOperation {
 
     ForStWriteBatchOperation(
             RocksDB db,
-            List<ForStDBPutRequest<?, ?>> batchRequest,
+            List<ForStDBPutRequest<?, ?, ?>> batchRequest,
             WriteOptions writeOptions,
             Executor executor) {
         this.db = db;
@@ -57,7 +57,7 @@ public class ForStWriteBatchOperation implements 
ForStDBOperation {
                 () -> {
                     try (WriteBatch writeBatch =
                             new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
-                        for (ForStDBPutRequest<?, ?> request : batchRequest) {
+                        for (ForStDBPutRequest<?, ?, ?> request : 
batchRequest) {
                             if (request.valueIsNull()) {
                                 // put(key, null) == delete(key)
                                 writeBatch.delete(
@@ -71,12 +71,12 @@ public class ForStWriteBatchOperation implements 
ForStDBOperation {
                             }
                         }
                         db.write(writeOptions, writeBatch);
-                        for (ForStDBPutRequest<?, ?> request : batchRequest) {
+                        for (ForStDBPutRequest<?, ?, ?> request : 
batchRequest) {
                             request.completeStateFuture();
                         }
                     } catch (Exception e) {
                         String msg = "Error while write batch data to 
ForStDB.";
-                        for (ForStDBPutRequest<?, ?> request : batchRequest) {
+                        for (ForStDBPutRequest<?, ?, ?> request : 
batchRequest) {
                             // fail every state request in this batch
                             request.completeStateFutureExceptionally(msg, e);
                         }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
index fa09bfc46d7..3f31a110dab 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
@@ -95,7 +96,7 @@ public class ForStDBOperationTestBase {
         };
     }
 
-    protected ContextKey<Integer> buildContextKey(int i) {
+    protected ContextKey<Integer, Void> buildContextKey(int i) {
         int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i, 128);
         RecordContext<Integer> recordContext =
                 new RecordContext<>(i, i, t -> {}, keyGroup, new Epoch(0));
@@ -117,6 +118,7 @@ public class ForStDBOperationTestBase {
                 cf,
                 valueStateDescriptor,
                 serializedKeyBuilder,
+                () -> VoidSerializer.INSTANCE,
                 valueSerializerView,
                 valueDeserializerView);
     }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
index 2b24a65d650..910fab50643 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
@@ -38,7 +38,7 @@ public class ForStGeneralMultiGetOperationTest extends 
ForStDBOperationTestBase
                 buildForStValueState("test-multiGet-1");
         ForStValueState<Integer, Void, String> valueState2 =
                 buildForStValueState("test-multiGet-2");
-        List<ForStDBGetRequest<?, ?>> batchGetRequest = new ArrayList<>();
+        List<ForStDBGetRequest<?, ?, ?>> batchGetRequest = new ArrayList<>();
         List<Tuple2<String, TestStateFuture<String>>> resultCheckList = new 
ArrayList<>();
 
         int keyNum = 1000;
@@ -46,7 +46,7 @@ public class ForStGeneralMultiGetOperationTest extends 
ForStDBOperationTestBase
             TestStateFuture<String> future = new TestStateFuture<>();
             ForStValueState<Integer, Void, String> table =
                     ((i % 2 == 0) ? valueState1 : valueState2);
-            ForStDBGetRequest<ContextKey<Integer>, String> request =
+            ForStDBGetRequest<Integer, Void, String> request =
                     ForStDBGetRequest.of(buildContextKey(i), table, future);
             batchGetRequest.add(request);
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java
index dce11d12d77..9e377debd88 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java
@@ -38,7 +38,7 @@ public class ForStWriteBatchOperationTest extends 
ForStDBOperationTestBase {
                 buildForStValueState("test-write-batch-1");
         ForStValueState<Integer, Void, String> valueState2 =
                 buildForStValueState("test-write-batch-2");
-        List<ForStDBPutRequest<?, ?>> batchPutRequest = new ArrayList<>();
+        List<ForStDBPutRequest<?, ?, ?>> batchPutRequest = new ArrayList<>();
         int keyNum = 100;
         for (int i = 0; i < keyNum; i++) {
             batchPutRequest.add(
@@ -55,7 +55,7 @@ public class ForStWriteBatchOperationTest extends 
ForStDBOperationTestBase {
         writeBatchOperation.process().get();
 
         // check data correctness
-        for (ForStDBPutRequest<?, ?> request : batchPutRequest) {
+        for (ForStDBPutRequest<?, ?, ?> request : batchPutRequest) {
             byte[] keyBytes = request.buildSerializedKey();
             byte[] valueBytes = db.get(request.getColumnFamilyHandle(), 
keyBytes);
             assertArrayEquals(valueBytes, request.buildSerializedValue());
@@ -66,7 +66,7 @@ public class ForStWriteBatchOperationTest extends 
ForStDBOperationTestBase {
     public void testWriteBatchWithNullValue() throws Exception {
         ForStValueState<Integer, Void, String> valueState =
                 buildForStValueState("test-write-batch");
-        List<ForStDBPutRequest<?, ?>> batchPutRequest = new ArrayList<>();
+        List<ForStDBPutRequest<?, ?, ?>> batchPutRequest = new ArrayList<>();
         // 1. write some data without null value
         int keyNum = 100;
         for (int i = 0; i < keyNum; i++) {
@@ -103,7 +103,7 @@ public class ForStWriteBatchOperationTest extends 
ForStDBOperationTestBase {
         writeBatchOperation2.process().get();
 
         // 3.  check data correctness
-        for (ForStDBPutRequest<?, ?> request : batchPutRequest) {
+        for (ForStDBPutRequest<?, ?, ?> request : batchPutRequest) {
             byte[] keyBytes = request.buildSerializedKey();
             byte[] valueBytes = db.get(request.getColumnFamilyHandle(), 
keyBytes);
             if (valueBytes == null) {

Reply via email to