This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b07b0b4d994d961c9f3d5d2def1372cbbbc85cf3 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Wed Jul 17 17:42:44 2024 +0800 [FLINK-35858][Runtime/State] Add namespace in internal async states --- .../asyncprocessing/AsyncExecutionController.java | 8 ++ .../runtime/asyncprocessing/RecordContext.java | 18 +++ .../asyncprocessing/StateRequestHandler.java | 8 ++ .../runtime/state/v2/InternalAggregatingState.java | 2 +- .../flink/runtime/state/v2/InternalKeyedState.java | 7 +- .../flink/runtime/state/v2/InternalListState.java | 3 +- .../flink/runtime/state/v2/InternalMapState.java | 2 +- .../v2/InternalPartitionedState.java} | 20 +--- .../runtime/state/v2/InternalReducingState.java | 2 +- .../flink/runtime/state/v2/InternalValueState.java | 3 +- .../AsyncExecutionControllerTest.java | 128 +++++++++++++++++++-- .../state/v2/InternalAggregatingStateTest.java | 2 +- .../runtime/state/v2/InternalListStateTest.java | 3 +- .../runtime/state/v2/InternalMapStateTest.java | 2 +- .../state/v2/InternalReducingStateTest.java | 2 +- .../runtime/state/v2/InternalValueStateTest.java | 3 +- .../state/forst/ForStStateRequestClassifier.java | 12 +- .../apache/flink/state/forst/ForStValueState.java | 2 +- .../state/forst/ForStDBOperationTestBase.java | 10 +- .../forst/ForStGeneralMultiGetOperationTest.java | 9 +- .../flink/state/forst/ForStStateExecutorTest.java | 20 ++-- .../state/forst/ForStWriteBatchOperationTest.java | 9 +- 22 files changed, 219 insertions(+), 56 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index d06888ab6c4..d48208e452f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -25,12 +25,14 @@ import org.apache.flink.core.state.InternalStateFuture; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.v2.InternalPartitionedState; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Optional; @@ -255,6 +257,12 @@ public class AsyncExecutionController<K> implements StateRequestHandler { return stateFuture; } + @Override + public <N> void setCurrentNamespaceForState( + @Nonnull InternalPartitionedState<N> state, N namespace) { + currentContext.setNamespace(state, namespace); + } + <IN, OUT> void insertActiveBuffer(StateRequest<K, IN, OUT> request) { stateRequestsBuffer.enqueueToActive(request); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java index 86352ce1cf9..b9f48125c29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java @@ -19,9 +19,12 @@ package org.apache.flink.runtime.asyncprocessing; import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch; +import org.apache.flink.runtime.state.v2.InternalPartitionedState; import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.function.Consumer; @@ -56,6 +59,9 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun /** The keyGroup to which key belongs. */ private final int keyGroup; + /** The namespaces of states. Lazy initialization for saving memory. */ + private Map<InternalPartitionedState<?>, Object> namespaces = null; + /** * The extra context info which is used to hold customized data defined by state backend. The * state backend can use this field to cache some data that can be used multiple times in @@ -111,6 +117,18 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun return keyGroup; } + @SuppressWarnings("unchecked") + public <N> N getNamespace(InternalPartitionedState<N> state) { + return namespaces == null ? null : (N) namespaces.get(state); + } + + public <N> void setNamespace(InternalPartitionedState<N> state, N namespace) { + if (namespaces == null) { + namespaces = new HashMap<>(); + } + namespaces.put(state, namespace); + } + public void setExtra(Object extra) { this.extra = extra; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java index 1288ad8a01b..fc81f87f9fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java @@ -21,7 +21,9 @@ package org.apache.flink.runtime.asyncprocessing; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.runtime.state.v2.InternalPartitionedState; +import javax.annotation.Nonnull; import javax.annotation.Nullable; /** The handler which can process {@link StateRequest}. */ @@ -39,4 +41,10 @@ public interface StateRequestHandler { */ <IN, OUT> InternalStateFuture<OUT> handleRequest( @Nullable State state, StateRequestType type, @Nullable IN payload); + + /** + * Set current namespace for a state. See {@link + * InternalPartitionedState#setCurrentNamespace(Object)}. + */ + <N> void setCurrentNamespaceForState(@Nonnull InternalPartitionedState<N> state, N namespace); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalAggregatingState.java index 7f05cdbd4d2..971af6f7f7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalAggregatingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalAggregatingState.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestType; * @param <ACC> TThe type of the accumulator (intermediate aggregation state). * @param <OUT> The type of the values that are returned from the state. */ -public class InternalAggregatingState<K, IN, ACC, OUT> extends InternalKeyedState<K, ACC> +public class InternalAggregatingState<K, N, IN, ACC, OUT> extends InternalKeyedState<K, N, ACC> implements AggregatingState<IN, OUT> { protected final AggregateFunction<IN, ACC, OUT> aggregateFunction; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java index 2b1be2a6764..408ca80f6c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java @@ -37,7 +37,7 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestType; * @param <V> The type of values kept internally in state. */ @Internal -public abstract class InternalKeyedState<K, V> implements State { +public abstract class InternalKeyedState<K, N, V> implements InternalPartitionedState<N> { private final StateRequestHandler stateRequestHandler; @@ -64,6 +64,11 @@ public abstract class InternalKeyedState<K, V> implements State { return stateRequestHandler.handleRequest(this, stateRequestType, payload); } + @Override + public void setCurrentNamespace(N namespace) { + stateRequestHandler.setCurrentNamespaceForState(this, namespace); + } + @Override public final StateFuture<Void> asyncClear() { return handleRequest(StateRequestType.CLEAR, null); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java index 7bcc652c2b5..ac1c43005c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java @@ -32,7 +32,8 @@ import java.util.List; * @param <K> The type of key the state is associated to. * @param <V> The type of values kept internally in state. */ -public class InternalListState<K, V> extends InternalKeyedState<K, V> implements ListState<V> { +public class InternalListState<K, N, V> extends InternalKeyedState<K, N, V> + implements ListState<V> { public InternalListState( StateRequestHandler stateRequestHandler, ListStateDescriptor<V> stateDescriptor) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalMapState.java index b718a3eb687..d028f9d611c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalMapState.java @@ -34,7 +34,7 @@ import java.util.Map; * @param <UK> The type of user key of this state. * @param <V> The type of values kept internally in state. */ -public class InternalMapState<K, UK, V> extends InternalKeyedState<K, V> +public class InternalMapState<K, N, UK, V> extends InternalKeyedState<K, N, V> implements MapState<UK, V> { public InternalMapState( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalPartitionedState.java similarity index 57% copy from flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalPartitionedState.java index 1288ad8a01b..46e35b89838 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalPartitionedState.java @@ -16,27 +16,19 @@ * limitations under the License. */ -package org.apache.flink.runtime.asyncprocessing; +package org.apache.flink.runtime.state.v2; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.v2.State; -import org.apache.flink.core.state.InternalStateFuture; -import javax.annotation.Nullable; - -/** The handler which can process {@link StateRequest}. */ +/** A state that is partitioned into different namespaces. */ @Internal -public interface StateRequestHandler { +public interface InternalPartitionedState<N> extends State { /** - * Submit a {@link StateRequest} to this StateRequestHandler. + * Set current namespace and access state under specified namespace afterward. * - * @param state the state to request. Could be {@code null} if the type is {@link - * StateRequestType#SYNC_POINT}. - * @param type the type of this request. - * @param payload the payload input for this request. - * @return the state future. + * @param namespace the specified namespace */ - <IN, OUT> InternalStateFuture<OUT> handleRequest( - @Nullable State state, StateRequestType type, @Nullable IN payload); + void setCurrentNamespace(N namespace); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalReducingState.java index eed6eae0dec..64d538d6bb1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalReducingState.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestType; * @param <K> The type of key the state is associated to. * @param <V> The type of values kept internally in state. */ -public class InternalReducingState<K, V> extends InternalKeyedState<K, V> +public class InternalReducingState<K, N, V> extends InternalKeyedState<K, N, V> implements ReducingState<V> { protected final ReduceFunction<V> reduceFunction; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java index 2a9c643d17a..8c87685a62b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalValueState.java @@ -30,7 +30,8 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestType; * @param <K> The type of key the state is associated to. * @param <V> The type of values kept internally in state. */ -public class InternalValueState<K, V> extends InternalKeyedState<K, V> implements ValueState<V> { +public class InternalValueState<K, N, V> extends InternalKeyedState<K, N, V> + implements ValueState<V> { public InternalValueState( StateRequestHandler stateRequestHandler, ValueStateDescriptor<V> valueStateDescriptor) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index e98a66b82a0..53337d71740 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; import org.apache.flink.core.state.StateFutureUtils; @@ -45,6 +46,7 @@ import java.util.HashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.Supplier; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -232,6 +234,115 @@ class AsyncExecutionControllerTest { resourceRegistry.close(); } + @Test + void testNamespace() throws IOException { + final Consumer<String> userCode = + (r) -> { + valueState.setCurrentNamespace(r); + valueState + .asyncValue() + .thenCompose( + val -> { + int updated = (val == null ? 1 : (val + 1)); + return valueState + .asyncUpdate(updated) + .thenCompose( + o -> + StateFutureUtils.completedFuture( + updated)); + }) + .thenAccept(val -> output.set(val)); + }; + CloseableRegistry resourceRegistry = new CloseableRegistry(); + setup( + 100, + 10000L, + 1000, + new SyncMailboxExecutor(), + new TestAsyncFrameworkExceptionHandler(), + resourceRegistry); + // ============================ element1 ============================ + String record1 = "key1-r1"; + String key1 = "key1"; + // Simulate the wrapping in {@link RecordProcessorUtils#getRecordProcessor()}, wrapping the + // record and key with RecordContext. + RecordContext<String> recordContext1 = aec.buildContext(record1, key1); + aec.setCurrentContext(recordContext1); + // execute user code + userCode.accept(record1); + + // Single-step run. + // Firstly, the user code generates value get in active buffer. + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + assertThat(aec.inFlightRecordNum.get()).isEqualTo(1); + aec.triggerIfNeeded(true); + // After running, the value update is in active buffer. + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // Value update finishes. + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0); + assertThat(output.get()).isEqualTo(1); + assertThat(recordContext1.getReferenceCount()).isEqualTo(0); + assertThat(aec.inFlightRecordNum.get()).isEqualTo(0); + + // ============================ element 2 & 3(1) ============================ + String record2 = "key1-r2"; + String key2 = "key1"; + RecordContext<String> recordContext2 = aec.buildContext(record2, key2); + aec.setCurrentContext(recordContext2); + // execute user code + userCode.accept(record2); + + String record3 = "key1-r1"; + String key3 = "key1"; + RecordContext<String> recordContext3 = aec.buildContext(record3, key3); + aec.setCurrentContext(recordContext3); + // execute user code + userCode.accept(record3); + + // Single-step run. + // Firstly, the user code for record2 generates value get in active buffer, + // while user code for record3 generates value get in blocking buffer. + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); + assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + assertThat(aec.inFlightRecordNum.get()).isEqualTo(2); + aec.triggerIfNeeded(true); + // After running, the value update for record2 is in active buffer. + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); + assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + assertThat(aec.inFlightRecordNum.get()).isEqualTo(2); + aec.triggerIfNeeded(true); + // Value update for record2 finishes. The value get for record3 is migrated from blocking + // buffer to active buffer actively. + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + assertThat(aec.inFlightRecordNum.get()).isEqualTo(1); + assertThat(output.get()).isEqualTo(1); + assertThat(recordContext2.getReferenceCount()).isEqualTo(0); + assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0); + + // Let value get for record3 to run. + aec.triggerIfNeeded(true); + // After running, the value update for record3 is in active buffer. + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + assertThat(aec.inFlightRecordNum.get()).isEqualTo(1); + aec.triggerIfNeeded(true); + // Value update for record3 finishes. + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0); + assertThat(aec.inFlightRecordNum.get()).isEqualTo(0); + assertThat(output.get()).isEqualTo(2); + assertThat(recordContext3.getReferenceCount()).isEqualTo(0); + + resourceRegistry.close(); + } + @Test void testRecordsRunInOrder() throws IOException { CloseableRegistry resourceRegistry = new CloseableRegistry(); @@ -727,22 +838,22 @@ class AsyncExecutionControllerTest { /** Simulate the underlying state that is actually used to execute the request. */ static class TestUnderlyingState { - private final HashMap<String, Integer> hashMap; + private final HashMap<Tuple2<String, String>, Integer> hashMap; public TestUnderlyingState() { this.hashMap = new HashMap<>(); } - public Integer get(String key) { - return hashMap.get(key); + public Integer get(String key, String namespace) { + return hashMap.get(Tuple2.of(key, namespace)); } - public void update(String key, Integer val) { - hashMap.put(key, val); + public void update(String key, String namespace, Integer val) { + hashMap.put(Tuple2.of(key, namespace), val); } } - static class TestValueState extends InternalValueState<String, Integer> { + static class TestValueState extends InternalValueState<String, String, Integer> { private final TestUnderlyingState underlyingState; @@ -776,7 +887,9 @@ class AsyncExecutionControllerTest { Preconditions.checkState(request.getState() != null); TestValueState state = (TestValueState) request.getState(); Integer val = - state.underlyingState.get((String) request.getRecordContext().getKey()); + state.underlyingState.get( + (String) request.getRecordContext().getKey(), + (String) request.getRecordContext().getNamespace(state)); request.getFuture().complete(val); } else if (request.getRequestType() == StateRequestType.VALUE_UPDATE) { Preconditions.checkState(request.getState() != null); @@ -784,6 +897,7 @@ class AsyncExecutionControllerTest { state.underlyingState.update( (String) request.getRecordContext().getKey(), + (String) request.getRecordContext().getNamespace(state), (Integer) request.getPayload()); request.getFuture().complete(null); } else { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalAggregatingStateTest.java index 6e9724a6155..97410748189 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalAggregatingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalAggregatingStateTest.java @@ -55,7 +55,7 @@ class InternalAggregatingStateTest extends InternalKeyedStateTestBase { AggregatingStateDescriptor<Integer, Integer, Integer> descriptor = new AggregatingStateDescriptor<>( "testAggState", aggregator, BasicTypeInfo.INT_TYPE_INFO); - InternalAggregatingState<String, Integer, Integer, Integer> state = + InternalAggregatingState<String, Void, Integer, Integer, Integer> state = new InternalAggregatingState<>(aec, descriptor); aec.setCurrentContext(aec.buildContext("test", "test")); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java index 941092fcdb3..ccb4c977233 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java @@ -34,7 +34,8 @@ public class InternalListStateTest extends InternalKeyedStateTestBase { public void testEachOperation() { ListStateDescriptor<Integer> descriptor = new ListStateDescriptor<>("testState", BasicTypeInfo.INT_TYPE_INFO); - InternalListState<String, Integer> listState = new InternalListState<>(aec, descriptor); + InternalListState<String, Void, Integer> listState = + new InternalListState<>(aec, descriptor); aec.setCurrentContext(aec.buildContext("test", "test")); listState.asyncClear(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalMapStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalMapStateTest.java index 3eb7c652cce..3cfd214c0ae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalMapStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalMapStateTest.java @@ -36,7 +36,7 @@ public class InternalMapStateTest extends InternalKeyedStateTestBase { MapStateDescriptor<String, Integer> descriptor = new MapStateDescriptor<>( "testState", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); - InternalMapState<String, String, Integer> mapState = + InternalMapState<String, Void, String, Integer> mapState = new InternalMapState<>(aec, descriptor); aec.setCurrentContext(aec.buildContext("test", "test")); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalReducingStateTest.java index 80ccc1ede9a..6340952aad3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalReducingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalReducingStateTest.java @@ -33,7 +33,7 @@ public class InternalReducingStateTest extends InternalKeyedStateTestBase { ReduceFunction<Integer> reducer = Integer::sum; ReducingStateDescriptor<Integer> descriptor = new ReducingStateDescriptor<>("testState", reducer, BasicTypeInfo.INT_TYPE_INFO); - InternalReducingState<String, Integer> reducingState = + InternalReducingState<String, Void, Integer> reducingState = new InternalReducingState<>(aec, descriptor); aec.setCurrentContext(aec.buildContext("test", "test")); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalValueStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalValueStateTest.java index 801f264573e..d4754ce7f55 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalValueStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalValueStateTest.java @@ -31,7 +31,8 @@ public class InternalValueStateTest extends InternalKeyedStateTestBase { public void testEachOperation() { ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("testState", BasicTypeInfo.INT_TYPE_INFO); - InternalValueState<String, Integer> valueState = new InternalValueState<>(aec, descriptor); + InternalValueState<String, Void, Integer> valueState = + new InternalValueState<>(aec, descriptor); aec.setCurrentContext(aec.buildContext("test", "test")); valueState.asyncClear(); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java index f32b3b665af..42324eccb67 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java @@ -56,23 +56,23 @@ public class ForStStateRequestClassifier implements StateRequestContainer { switch (stateRequestType) { case VALUE_GET: { - ForStValueState<?, ?> forStValueState = - (ForStValueState<?, ?>) stateRequest.getState(); + ForStValueState<?, ?, ?> forStValueState = + (ForStValueState<?, ?, ?>) stateRequest.getState(); dbGetRequests.add(forStValueState.buildDBGetRequest(stateRequest)); return; } case VALUE_UPDATE: { - ForStValueState<?, ?> forStValueState = - (ForStValueState<?, ?>) stateRequest.getState(); + ForStValueState<?, ?, ?> forStValueState = + (ForStValueState<?, ?, ?>) stateRequest.getState(); dbPutRequests.add(forStValueState.buildDBPutRequest(stateRequest)); return; } case CLEAR: { if (stateRequest.getState() instanceof ForStValueState) { - ForStValueState<?, ?> forStValueState = - (ForStValueState<?, ?>) stateRequest.getState(); + ForStValueState<?, ?, ?> forStValueState = + (ForStValueState<?, ?, ?>) stateRequest.getState(); dbPutRequests.add(forStValueState.buildDBPutRequest(stateRequest)); return; } else { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java index 52f2e6d848c..153cdbc0fa1 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java @@ -42,7 +42,7 @@ import java.util.function.Supplier; * @param <K> The type of the key. * @param <V> The type of the value. */ -public class ForStValueState<K, V> extends InternalValueState<K, V> +public class ForStValueState<K, N, V> extends InternalValueState<K, N, V> implements ValueState<V>, ForStInnerTable<ContextKey<K>, V> { /** The column family which this internal value state belongs to. */ diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java index 73b914ebe52..fa09bfc46d7 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.asyncprocessing.StateRequestType; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; +import org.apache.flink.runtime.state.v2.InternalPartitionedState; import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.util.function.BiFunctionWithException; import org.apache.flink.util.function.FunctionWithException; @@ -45,6 +46,7 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.RocksDB; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.nio.file.Path; @@ -84,6 +86,12 @@ public class ForStDBOperationTestBase { @Nullable State state, StateRequestType type, @Nullable IN payload) { throw new UnsupportedOperationException(); } + + @Override + public <N> void setCurrentNamespaceForState( + @Nonnull InternalPartitionedState<N> state, N namespace) { + throw new UnsupportedOperationException(); + } }; } @@ -94,7 +102,7 @@ public class ForStDBOperationTestBase { return new ContextKey<>(recordContext); } - protected ForStValueState<Integer, String> buildForStValueState(String stateName) + protected ForStValueState<Integer, Void, String> buildForStValueState(String stateName) throws Exception { ColumnFamilyHandle cf = createColumnFamilyHandle(stateName); ValueStateDescriptor<String> valueStateDescriptor = diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java index 5b8263ec25e..2b24a65d650 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java @@ -34,15 +34,18 @@ public class ForStGeneralMultiGetOperationTest extends ForStDBOperationTestBase @Test public void testValueStateMultiGet() throws Exception { - ForStValueState<Integer, String> valueState1 = buildForStValueState("test-multiGet-1"); - ForStValueState<Integer, String> valueState2 = buildForStValueState("test-multiGet-2"); + ForStValueState<Integer, Void, String> valueState1 = + buildForStValueState("test-multiGet-1"); + ForStValueState<Integer, Void, String> valueState2 = + buildForStValueState("test-multiGet-2"); List<ForStDBGetRequest<?, ?>> batchGetRequest = new ArrayList<>(); List<Tuple2<String, TestStateFuture<String>>> resultCheckList = new ArrayList<>(); int keyNum = 1000; for (int i = 0; i < keyNum; i++) { TestStateFuture<String> future = new TestStateFuture<>(); - ForStValueState<Integer, String> table = ((i % 2 == 0) ? valueState1 : valueState2); + ForStValueState<Integer, Void, String> table = + ((i % 2 == 0) ? valueState1 : valueState2); ForStDBGetRequest<ContextKey<Integer>, String> request = ForStDBGetRequest.of(buildContextKey(i), table, future); batchGetRequest.add(request); diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java index e8717750329..95ea527fa2a 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java @@ -42,8 +42,8 @@ public class ForStStateExecutorTest extends ForStDBOperationTestBase { @SuppressWarnings("unchecked") public void testExecuteValueStateRequest() throws Exception { ForStStateExecutor forStStateExecutor = new ForStStateExecutor(4, db, new WriteOptions()); - ForStValueState<Integer, String> state1 = buildForStValueState("value-state-1"); - ForStValueState<Integer, String> state2 = buildForStValueState("value-state-2"); + ForStValueState<Integer, Void, String> state1 = buildForStValueState("value-state-1"); + ForStValueState<Integer, Void, String> state2 = buildForStValueState("value-state-2"); StateRequestContainer stateRequestContainer = forStStateExecutor.createStateRequestContainer(); @@ -52,7 +52,7 @@ public class ForStStateExecutorTest extends ForStDBOperationTestBase { // 1. Update value state: keyRange [0, keyNum) int keyNum = 1000; for (int i = 0; i < keyNum; i++) { - ForStValueState<Integer, String> state = (i % 2 == 0 ? state1 : state2); + ForStValueState<Integer, Void, String> state = (i % 2 == 0 ? state1 : state2); stateRequestContainer.offer( buildStateRequest(state, StateRequestType.VALUE_UPDATE, i, "test-" + i, i * 2)); } @@ -64,14 +64,14 @@ public class ForStStateExecutorTest extends ForStDBOperationTestBase { // 2. Get value state: keyRange [0, keyNum) // Update value state: keyRange [keyNum, keyNum + 100] for (int i = 0; i < keyNum; i++) { - ForStValueState<Integer, String> state = (i % 2 == 0 ? state1 : state2); + ForStValueState<Integer, Void, String> state = (i % 2 == 0 ? state1 : state2); StateRequest<?, ?, ?> getRequest = buildStateRequest(state, StateRequestType.VALUE_GET, i, null, i * 2); stateRequestContainer.offer(getRequest); checkList.add(getRequest); } for (int i = keyNum; i < keyNum + 100; i++) { - ForStValueState<Integer, String> state = (i % 2 == 0 ? state1 : state2); + ForStValueState<Integer, Void, String> state = (i % 2 == 0 ? state1 : state2); stateRequestContainer.offer( buildStateRequest(state, StateRequestType.VALUE_UPDATE, i, "test-" + i, i * 2)); } @@ -90,12 +90,12 @@ public class ForStStateExecutorTest extends ForStDBOperationTestBase { // Update state with null-value : keyRange [keyNum, keyNum + 100] stateRequestContainer = forStStateExecutor.createStateRequestContainer(); for (int i = keyNum - 100; i < keyNum; i++) { - ForStValueState<Integer, String> state = (i % 2 == 0 ? state1 : state2); + ForStValueState<Integer, Void, String> state = (i % 2 == 0 ? state1 : state2); stateRequestContainer.offer( buildStateRequest(state, StateRequestType.CLEAR, i, null, i * 2)); } for (int i = keyNum; i < keyNum + 100; i++) { - ForStValueState<Integer, String> state = (i % 2 == 0 ? state1 : state2); + ForStValueState<Integer, Void, String> state = (i % 2 == 0 ? state1 : state2); stateRequestContainer.offer( buildStateRequest(state, StateRequestType.VALUE_UPDATE, i, null, i * 2)); } @@ -105,7 +105,7 @@ public class ForStStateExecutorTest extends ForStDBOperationTestBase { stateRequestContainer = forStStateExecutor.createStateRequestContainer(); checkList.clear(); for (int i = keyNum - 100; i < keyNum + 100; i++) { - ForStValueState<Integer, String> state = (i % 2 == 0 ? state1 : state2); + ForStValueState<Integer, Void, String> state = (i % 2 == 0 ? state1 : state2); StateRequest<?, ?, ?> getRequest = buildStateRequest(state, StateRequestType.VALUE_GET, i, null, i * 2); stateRequestContainer.offer(getRequest); @@ -121,8 +121,8 @@ public class ForStStateExecutorTest extends ForStDBOperationTestBase { } @SuppressWarnings({"rawtypes", "unchecked"}) - private <K, V, R> StateRequest<?, ?, ?> buildStateRequest( - InternalKeyedState<K, V> innerTable, + private <K, N, V, R> StateRequest<?, ?, ?> buildStateRequest( + InternalKeyedState<K, N, V> innerTable, StateRequestType requestType, K key, V value, diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java index 19e11684fbf..dce11d12d77 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java @@ -34,8 +34,10 @@ public class ForStWriteBatchOperationTest extends ForStDBOperationTestBase { @Test public void testValueStateWriteBatch() throws Exception { - ForStValueState<Integer, String> valueState1 = buildForStValueState("test-write-batch-1"); - ForStValueState<Integer, String> valueState2 = buildForStValueState("test-write-batch-2"); + ForStValueState<Integer, Void, String> valueState1 = + buildForStValueState("test-write-batch-1"); + ForStValueState<Integer, Void, String> valueState2 = + buildForStValueState("test-write-batch-2"); List<ForStDBPutRequest<?, ?>> batchPutRequest = new ArrayList<>(); int keyNum = 100; for (int i = 0; i < keyNum; i++) { @@ -62,7 +64,8 @@ public class ForStWriteBatchOperationTest extends ForStDBOperationTestBase { @Test public void testWriteBatchWithNullValue() throws Exception { - ForStValueState<Integer, String> valueState = buildForStValueState("test-write-batch"); + ForStValueState<Integer, Void, String> valueState = + buildForStValueState("test-write-batch"); List<ForStDBPutRequest<?, ?>> batchPutRequest = new ArrayList<>(); // 1. write some data without null value int keyNum = 100;