This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d4294c59e6f2ec8702f53916ea49cf23f6db8961 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Thu Jul 18 19:13:20 2024 +0800 [FLINK-35858][State/ForSt] Support state namespace --- .../org/apache/flink/state/forst/ContextKey.java | 11 ++++++++--- .../flink/state/forst/ForStDBGetRequest.java | 13 +++++++------ .../flink/state/forst/ForStDBPutRequest.java | 17 ++++++++++------- .../state/forst/ForStGeneralMultiGetOperation.java | 6 +++--- .../apache/flink/state/forst/ForStInnerTable.java | 8 ++++---- .../flink/state/forst/ForStKeyedStateBackend.java | 1 + .../flink/state/forst/ForStStateExecutor.java | 4 ++-- .../state/forst/ForStStateRequestClassifier.java | 8 ++++---- .../apache/flink/state/forst/ForStValueState.java | 22 +++++++++++++--------- .../state/forst/ForStWriteBatchOperation.java | 10 +++++----- .../state/forst/ForStDBOperationTestBase.java | 4 +++- .../forst/ForStGeneralMultiGetOperationTest.java | 4 ++-- .../state/forst/ForStWriteBatchOperationTest.java | 8 ++++---- 13 files changed, 66 insertions(+), 50 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java index 05dd2f58796..9f73cefcc6e 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java @@ -19,6 +19,7 @@ package org.apache.flink.state.forst; import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.runtime.state.v2.InternalPartitionedState; import org.apache.flink.util.function.FunctionWithException; import javax.annotation.concurrent.ThreadSafe; @@ -32,7 +33,7 @@ import java.util.Objects; * @param <K> The type of the raw key. */ @ThreadSafe -public class ContextKey<K> { +public class ContextKey<K, N> { private final RecordContext<K> recordContext; @@ -48,6 +49,10 @@ public class ContextKey<K> { return recordContext.getKeyGroup(); } + public N getNamespace(InternalPartitionedState<N> state) { + return recordContext.getNamespace(state); + } + /** * Get the serialized key. If the cached serialized key within {@code RecordContext#payload} is * null, the provided serialization function will be called, and the serialization result will @@ -57,7 +62,7 @@ public class ContextKey<K> { * @return the serialized bytes. */ public byte[] getOrCreateSerializedKey( - FunctionWithException<ContextKey<K>, byte[], IOException> serializeKeyFunc) + FunctionWithException<ContextKey<K, N>, byte[], IOException> serializeKeyFunc) throws IOException { if (recordContext.getExtra() != null) { return (byte[]) recordContext.getExtra(); @@ -84,7 +89,7 @@ public class ContextKey<K> { if (o == null || getClass() != o.getClass()) { return false; } - ContextKey<?> that = (ContextKey<?>) o; + ContextKey<?, ?> that = (ContextKey<?, ?>) o; return Objects.equals(recordContext, that.recordContext); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java index 11868defb17..24c699cc018 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java @@ -30,13 +30,14 @@ import java.io.IOException; * @param <K> The type of key in get access request. * @param <V> The type of value returned by get request. */ -public class ForStDBGetRequest<K, V> { +public class ForStDBGetRequest<K, N, V> { - private final K key; - private final ForStInnerTable<K, V> table; + private final ContextKey<K, N> key; + private final ForStInnerTable<K, N, V> table; private final InternalStateFuture<V> future; - private ForStDBGetRequest(K key, ForStInnerTable<K, V> table, InternalStateFuture<V> future) { + private ForStDBGetRequest( + ContextKey<K, N> key, ForStInnerTable<K, N, V> table, InternalStateFuture<V> future) { this.key = key; this.table = table; this.future = future; @@ -63,8 +64,8 @@ public class ForStDBGetRequest<K, V> { future.completeExceptionally(message, ex); } - static <K, V> ForStDBGetRequest<K, V> of( - K key, ForStInnerTable<K, V> table, InternalStateFuture<V> future) { + static <K, N, V> ForStDBGetRequest<K, N, V> of( + ContextKey<K, N> key, ForStInnerTable<K, N, V> table, InternalStateFuture<V> future) { return new ForStDBGetRequest<>(key, table, future); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java index cc868b8ebb3..bfe1b15d8cc 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java @@ -32,15 +32,18 @@ import java.io.IOException; * @param <K> The type of key in put access request. * @param <V> The type of value in put access request. */ -public class ForStDBPutRequest<K, V> { +public class ForStDBPutRequest<K, N, V> { - private final K key; + private final ContextKey<K, N> key; @Nullable private final V value; - private final ForStInnerTable<K, V> table; + private final ForStInnerTable<K, N, V> table; private final InternalStateFuture<Void> future; private ForStDBPutRequest( - K key, V value, ForStInnerTable<K, V> table, InternalStateFuture<Void> future) { + ContextKey<K, N> key, + V value, + ForStInnerTable<K, N, V> table, + InternalStateFuture<Void> future) { this.key = key; this.value = value; this.table = table; @@ -76,10 +79,10 @@ public class ForStDBPutRequest<K, V> { * If the value of the ForStDBPutRequest is null, then the request will signify the deletion of * the data associated with that key. */ - static <K, V> ForStDBPutRequest<K, V> of( - K key, + static <K, N, V> ForStDBPutRequest<K, N, V> of( + ContextKey<K, N> key, @Nullable V value, - ForStInnerTable<K, V> table, + ForStInnerTable<K, N, V> table, InternalStateFuture<Void> future) { return new ForStDBPutRequest<>(key, value, table, future); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java index 77885d1a55a..ed7abbf7df8 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java @@ -38,12 +38,12 @@ public class ForStGeneralMultiGetOperation implements ForStDBOperation { private final RocksDB db; - private final List<ForStDBGetRequest<?, ?>> batchRequest; + private final List<ForStDBGetRequest<?, ?, ?>> batchRequest; private final Executor executor; ForStGeneralMultiGetOperation( - RocksDB db, List<ForStDBGetRequest<?, ?>> batchRequest, Executor executor) { + RocksDB db, List<ForStDBGetRequest<?, ?, ?>> batchRequest, Executor executor) { this.db = db; this.batchRequest = batchRequest; this.executor = executor; @@ -58,7 +58,7 @@ public class ForStGeneralMultiGetOperation implements ForStDBOperation { AtomicReference<Exception> error = new AtomicReference<>(); AtomicInteger counter = new AtomicInteger(batchRequest.size()); for (int i = 0; i < batchRequest.size(); i++) { - ForStDBGetRequest<?, ?> request = batchRequest.get(i); + ForStDBGetRequest<?, ?, ?> request = batchRequest.get(i); executor.execute( () -> { try { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java index 5d36b72097b..e8b763ee816 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java @@ -34,7 +34,7 @@ import java.io.IOException; * @param <K> The key type of the table. * @param <V> The value type of the table. */ -public interface ForStInnerTable<K, V> { +public interface ForStInnerTable<K, N, V> { /** Get the columnFamily handle corresponding to table. */ ColumnFamilyHandle getColumnFamilyHandle(); @@ -46,7 +46,7 @@ public interface ForStInnerTable<K, V> { * @return the key bytes * @throws IOException Thrown if the serialization encountered an I/O related error. */ - byte[] serializeKey(K key) throws IOException; + byte[] serializeKey(ContextKey<K, N> key) throws IOException; /** * Serialize the given value to the outputView. @@ -73,7 +73,7 @@ public interface ForStInnerTable<K, V> { * @param stateRequest The given stateRequest. * @return The corresponding ForSt GetRequest. */ - ForStDBGetRequest<K, V> buildDBGetRequest(StateRequest<?, ?, ?> stateRequest); + ForStDBGetRequest<K, N, V> buildDBGetRequest(StateRequest<?, ?, ?> stateRequest); /** * Build a {@link ForStDBPutRequest} that belong to {@code ForStInnerTable} with the given @@ -82,5 +82,5 @@ public interface ForStInnerTable<K, V> { * @param stateRequest The given stateRequest. * @return The corresponding ForSt PutRequest. */ - ForStDBPutRequest<K, V> buildDBPutRequest(StateRequest<?, ?, ?> stateRequest); + ForStDBPutRequest<K, N, V> buildDBPutRequest(StateRequest<?, ?, ?> stateRequest); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index e80cd18b2d2..10bfb819cf3 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -154,6 +154,7 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend { columnFamilyHandle, (ValueStateDescriptor<SV>) stateDesc, serializedKeyBuilder, + namespaceSerializer::duplicate, valueSerializerView, valueDeserializerView); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java index e96ee32010d..7973ab05f3b 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java @@ -83,7 +83,7 @@ public class ForStStateExecutor implements StateExecutor { () -> { long startTime = System.currentTimeMillis(); List<CompletableFuture<Void>> futures = new ArrayList<>(2); - List<ForStDBPutRequest<?, ?>> putRequests = + List<ForStDBPutRequest<?, ?, ?>> putRequests = stateRequestClassifier.pollDbPutRequests(); if (!putRequests.isEmpty()) { ForStWriteBatchOperation writeOperations = @@ -92,7 +92,7 @@ public class ForStStateExecutor implements StateExecutor { futures.add(writeOperations.process()); } - List<ForStDBGetRequest<?, ?>> getRequests = + List<ForStDBGetRequest<?, ?, ?>> getRequests = stateRequestClassifier.pollDbGetRequests(); if (!getRequests.isEmpty()) { ForStGeneralMultiGetOperation getOperations = diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java index 42324eccb67..1cfc3ca4c8f 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java @@ -31,9 +31,9 @@ import java.util.List; */ public class ForStStateRequestClassifier implements StateRequestContainer { - private final List<ForStDBGetRequest<?, ?>> dbGetRequests; + private final List<ForStDBGetRequest<?, ?, ?>> dbGetRequests; - private final List<ForStDBPutRequest<?, ?>> dbPutRequests; + private final List<ForStDBPutRequest<?, ?, ?>> dbPutRequests; public ForStStateRequestClassifier() { this.dbGetRequests = new ArrayList<>(); @@ -88,11 +88,11 @@ public class ForStStateRequestClassifier implements StateRequestContainer { } } - public List<ForStDBGetRequest<?, ?>> pollDbGetRequests() { + public List<ForStDBGetRequest<?, ?, ?>> pollDbGetRequests() { return dbGetRequests; } - public List<ForStDBPutRequest<?, ?>> pollDbPutRequests() { + public List<ForStDBPutRequest<?, ?, ?>> pollDbPutRequests() { return dbPutRequests; } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java index 153cdbc0fa1..d6bfb97b184 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java @@ -19,6 +19,7 @@ package org.apache.flink.state.forst; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.state.InternalStateFuture; @@ -43,7 +44,7 @@ import java.util.function.Supplier; * @param <V> The type of the value. */ public class ForStValueState<K, N, V> extends InternalValueState<K, N, V> - implements ValueState<V>, ForStInnerTable<ContextKey<K>, V> { + implements ValueState<V>, ForStInnerTable<K, N, V> { /** The column family which this internal value state belongs to. */ private final ColumnFamilyHandle columnFamilyHandle; @@ -51,6 +52,8 @@ public class ForStValueState<K, N, V> extends InternalValueState<K, N, V> /** The serialized key builder which should be thread-safe. */ private final ThreadLocal<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder; + private final ThreadLocal<TypeSerializer<N>> namespaceSerializer; + /** The data outputStream used for value serializer, which should be thread-safe. */ private final ThreadLocal<DataOutputSerializer> valueSerializerView; @@ -62,11 +65,13 @@ public class ForStValueState<K, N, V> extends InternalValueState<K, N, V> ColumnFamilyHandle columnFamily, ValueStateDescriptor<V> valueStateDescriptor, Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilderInitializer, + Supplier<TypeSerializer<N>> namespaceSerializerInitializer, Supplier<DataOutputSerializer> valueSerializerViewInitializer, Supplier<DataInputDeserializer> valueDeserializerViewInitializer) { super(stateRequestHandler, valueStateDescriptor); this.columnFamilyHandle = columnFamily; this.serializedKeyBuilder = ThreadLocal.withInitial(serializedKeyBuilderInitializer); + this.namespaceSerializer = ThreadLocal.withInitial(namespaceSerializerInitializer); this.valueSerializerView = ThreadLocal.withInitial(valueSerializerViewInitializer); this.valueDeserializerView = ThreadLocal.withInitial(valueDeserializerViewInitializer); } @@ -77,12 +82,13 @@ public class ForStValueState<K, N, V> extends InternalValueState<K, N, V> } @Override - public byte[] serializeKey(ContextKey<K> contextKey) throws IOException { + public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException { return contextKey.getOrCreateSerializedKey( ctxKey -> { SerializedCompositeKeyBuilder<K> builder = serializedKeyBuilder.get(); builder.setKeyAndKeyGroup(ctxKey.getRawKey(), ctxKey.getKeyGroup()); - return builder.build(); + return builder.buildCompositeKeyNamespace( + contextKey.getNamespace(this), namespaceSerializer.get()); }); } @@ -103,10 +109,9 @@ public class ForStValueState<K, N, V> extends InternalValueState<K, N, V> @SuppressWarnings("unchecked") @Override - public ForStDBGetRequest<ContextKey<K>, V> buildDBGetRequest( - StateRequest<?, ?, ?> stateRequest) { + public ForStDBGetRequest<K, N, V> buildDBGetRequest(StateRequest<?, ?, ?> stateRequest) { Preconditions.checkArgument(stateRequest.getRequestType() == StateRequestType.VALUE_GET); - ContextKey<K> contextKey = + ContextKey<K, N> contextKey = new ContextKey<>((RecordContext<K>) stateRequest.getRecordContext()); return ForStDBGetRequest.of( contextKey, this, (InternalStateFuture<V>) stateRequest.getFuture()); @@ -114,12 +119,11 @@ public class ForStValueState<K, N, V> extends InternalValueState<K, N, V> @SuppressWarnings("unchecked") @Override - public ForStDBPutRequest<ContextKey<K>, V> buildDBPutRequest( - StateRequest<?, ?, ?> stateRequest) { + public ForStDBPutRequest<K, N, V> buildDBPutRequest(StateRequest<?, ?, ?> stateRequest) { Preconditions.checkArgument( stateRequest.getRequestType() == StateRequestType.VALUE_UPDATE || stateRequest.getRequestType() == StateRequestType.CLEAR); - ContextKey<K> contextKey = + ContextKey<K, N> contextKey = new ContextKey<>((RecordContext<K>) stateRequest.getRecordContext()); V value = (stateRequest.getRequestType() == StateRequestType.CLEAR) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java index 3d4e59e3247..188d5c49d20 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java @@ -34,7 +34,7 @@ public class ForStWriteBatchOperation implements ForStDBOperation { private final RocksDB db; - private final List<ForStDBPutRequest<?, ?>> batchRequest; + private final List<ForStDBPutRequest<?, ?, ?>> batchRequest; private final WriteOptions writeOptions; @@ -42,7 +42,7 @@ public class ForStWriteBatchOperation implements ForStDBOperation { ForStWriteBatchOperation( RocksDB db, - List<ForStDBPutRequest<?, ?>> batchRequest, + List<ForStDBPutRequest<?, ?, ?>> batchRequest, WriteOptions writeOptions, Executor executor) { this.db = db; @@ -57,7 +57,7 @@ public class ForStWriteBatchOperation implements ForStDBOperation { () -> { try (WriteBatch writeBatch = new WriteBatch(batchRequest.size() * PER_RECORD_ESTIMATE_BYTES)) { - for (ForStDBPutRequest<?, ?> request : batchRequest) { + for (ForStDBPutRequest<?, ?, ?> request : batchRequest) { if (request.valueIsNull()) { // put(key, null) == delete(key) writeBatch.delete( @@ -71,12 +71,12 @@ public class ForStWriteBatchOperation implements ForStDBOperation { } } db.write(writeOptions, writeBatch); - for (ForStDBPutRequest<?, ?> request : batchRequest) { + for (ForStDBPutRequest<?, ?, ?> request : batchRequest) { request.completeStateFuture(); } } catch (Exception e) { String msg = "Error while write batch data to ForStDB."; - for (ForStDBPutRequest<?, ?> request : batchRequest) { + for (ForStDBPutRequest<?, ?, ?> request : batchRequest) { // fail every state request in this batch request.completeStateFutureExceptionally(msg, e); } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java index fa09bfc46d7..3f31a110dab 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; @@ -95,7 +96,7 @@ public class ForStDBOperationTestBase { }; } - protected ContextKey<Integer> buildContextKey(int i) { + protected ContextKey<Integer, Void> buildContextKey(int i) { int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i, 128); RecordContext<Integer> recordContext = new RecordContext<>(i, i, t -> {}, keyGroup, new Epoch(0)); @@ -117,6 +118,7 @@ public class ForStDBOperationTestBase { cf, valueStateDescriptor, serializedKeyBuilder, + () -> VoidSerializer.INSTANCE, valueSerializerView, valueDeserializerView); } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java index 2b24a65d650..910fab50643 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java @@ -38,7 +38,7 @@ public class ForStGeneralMultiGetOperationTest extends ForStDBOperationTestBase buildForStValueState("test-multiGet-1"); ForStValueState<Integer, Void, String> valueState2 = buildForStValueState("test-multiGet-2"); - List<ForStDBGetRequest<?, ?>> batchGetRequest = new ArrayList<>(); + List<ForStDBGetRequest<?, ?, ?>> batchGetRequest = new ArrayList<>(); List<Tuple2<String, TestStateFuture<String>>> resultCheckList = new ArrayList<>(); int keyNum = 1000; @@ -46,7 +46,7 @@ public class ForStGeneralMultiGetOperationTest extends ForStDBOperationTestBase TestStateFuture<String> future = new TestStateFuture<>(); ForStValueState<Integer, Void, String> table = ((i % 2 == 0) ? valueState1 : valueState2); - ForStDBGetRequest<ContextKey<Integer>, String> request = + ForStDBGetRequest<Integer, Void, String> request = ForStDBGetRequest.of(buildContextKey(i), table, future); batchGetRequest.add(request); diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java index dce11d12d77..9e377debd88 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java @@ -38,7 +38,7 @@ public class ForStWriteBatchOperationTest extends ForStDBOperationTestBase { buildForStValueState("test-write-batch-1"); ForStValueState<Integer, Void, String> valueState2 = buildForStValueState("test-write-batch-2"); - List<ForStDBPutRequest<?, ?>> batchPutRequest = new ArrayList<>(); + List<ForStDBPutRequest<?, ?, ?>> batchPutRequest = new ArrayList<>(); int keyNum = 100; for (int i = 0; i < keyNum; i++) { batchPutRequest.add( @@ -55,7 +55,7 @@ public class ForStWriteBatchOperationTest extends ForStDBOperationTestBase { writeBatchOperation.process().get(); // check data correctness - for (ForStDBPutRequest<?, ?> request : batchPutRequest) { + for (ForStDBPutRequest<?, ?, ?> request : batchPutRequest) { byte[] keyBytes = request.buildSerializedKey(); byte[] valueBytes = db.get(request.getColumnFamilyHandle(), keyBytes); assertArrayEquals(valueBytes, request.buildSerializedValue()); @@ -66,7 +66,7 @@ public class ForStWriteBatchOperationTest extends ForStDBOperationTestBase { public void testWriteBatchWithNullValue() throws Exception { ForStValueState<Integer, Void, String> valueState = buildForStValueState("test-write-batch"); - List<ForStDBPutRequest<?, ?>> batchPutRequest = new ArrayList<>(); + List<ForStDBPutRequest<?, ?, ?>> batchPutRequest = new ArrayList<>(); // 1. write some data without null value int keyNum = 100; for (int i = 0; i < keyNum; i++) { @@ -103,7 +103,7 @@ public class ForStWriteBatchOperationTest extends ForStDBOperationTestBase { writeBatchOperation2.process().get(); // 3. check data correctness - for (ForStDBPutRequest<?, ?> request : batchPutRequest) { + for (ForStDBPutRequest<?, ?, ?> request : batchPutRequest) { byte[] keyBytes = request.buildSerializedKey(); byte[] valueBytes = db.get(request.getColumnFamilyHandle(), keyBytes); if (valueBytes == null) {