This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3e6842261b05b934544cb8d2e61c756cf2562ba2 Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Mon Apr 29 15:54:44 2024 +0800 [FLINK-35262][state] Bridge between AsyncKeyedStateBackend and AsyncExecutionController --- .../asyncprocessing/AsyncExecutionController.java | 4 +- .../runtime/state/AsyncKeyedStateBackend.java | 31 ++++- .../runtime/state/v2/DefaultKeyedStateStoreV2.java | 45 ++++++++ .../flink/runtime/state/v2/KeyedStateStoreV2.java | 46 ++++++++ .../AsyncExecutionControllerTest.java | 84 +++++--------- .../asyncprocessing/MockStateExecutor.java | 6 +- .../flink/runtime/state/StateBackendTestUtils.java | 108 ++++++++++++++++++ .../api/operators/AbstractStreamOperator.java | 4 +- .../api/operators/AbstractStreamOperatorV2.java | 2 +- .../api/operators/StreamOperatorStateContext.java | 7 ++ .../api/operators/StreamOperatorStateHandler.java | 28 +++++ .../operators/StreamTaskStateInitializerImpl.java | 127 ++++++++++++++------- .../api/operators/StreamingRuntimeContext.java | 25 ++++ .../AbstractAsyncStateStreamOperator.java | 63 +++++++--- .../AbstractAsyncStateStreamOperatorV2.java | 26 +++-- .../InternalTimerServiceAsyncImplTest.java | 2 +- .../api/operators/StreamingRuntimeContextTest.java | 55 +++++++-- .../AbstractAsyncStateStreamOperatorTest.java | 28 +++-- .../AbstractAsyncStateStreamOperatorV2Test.java | 26 +++-- .../streaming/runtime/tasks/StreamTaskTest.java | 6 + .../util/AbstractStreamOperatorTestHarness.java | 11 ++ 21 files changed, 579 insertions(+), 155 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 83c18a70be0..fefc310d1f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -320,8 +320,8 @@ public class AsyncExecutionController<K> implements StateRequestHandler { } @VisibleForTesting - public void setStateExecutor(StateExecutor stateExecutor) { - this.stateExecutor = stateExecutor; + public StateExecutor getStateExecutor() { + return stateExecutor; } @VisibleForTesting diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java index 99c48f4abad..70cdfbef767 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java @@ -19,15 +19,40 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.State; import org.apache.flink.runtime.asyncprocessing.StateExecutor; +import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; +import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.util.Disposable; +import javax.annotation.Nonnull; + +import java.io.Closeable; + /** * An async keyed state backend provides methods supporting to access keyed state asynchronously and * in batch. */ @Internal -public interface AsyncKeyedStateBackend extends Disposable { +public interface AsyncKeyedStateBackend extends Disposable, Closeable { + + /** + * Initializes with some contexts. + * + * @param stateRequestHandler which handles state request. + */ + void setup(@Nonnull StateRequestHandler stateRequestHandler); + + /** + * Creates and returns a new state. + * + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * @param <SV> The type of the stored state value. + * @param <S> The type of the public API state. + * @throws Exception Exceptions may occur during initialization of the state. + */ + @Nonnull + <SV, S extends State> S createState(@Nonnull StateDescriptor<SV> stateDesc) throws Exception; /** * Creates a {@code StateExecutor} which supports to execute a batch of state requests @@ -36,5 +61,9 @@ public interface AsyncKeyedStateBackend extends Disposable { * @return a {@code StateExecutor} which supports to execute a batch of state requests * asynchronously. */ + @Nonnull StateExecutor createStateExecutor(); + + @Override + void dispose(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStoreV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStoreV2.java new file mode 100644 index 00000000000..7bc2b58f2f1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStoreV2.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +/** Default implementation of KeyedStateStoreV2. */ +public class DefaultKeyedStateStoreV2 implements KeyedStateStoreV2 { + + private final AsyncKeyedStateBackend asyncKeyedStateBackend; + + public DefaultKeyedStateStoreV2(@Nonnull AsyncKeyedStateBackend asyncKeyedStateBackend) { + this.asyncKeyedStateBackend = Preconditions.checkNotNull(asyncKeyedStateBackend); + } + + @Override + public <T> ValueState<T> getValueState(@Nonnull ValueStateDescriptor<T> stateProperties) { + Preconditions.checkNotNull(stateProperties, "The state properties must not be null"); + try { + return asyncKeyedStateBackend.createState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStoreV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStoreV2.java new file mode 100644 index 00000000000..0c7c27b429f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStoreV2.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.ValueState; + +import javax.annotation.Nonnull; + +/** This interface contains methods for registering {@link State}. */ +@Internal +public interface KeyedStateStoreV2 { + + /** + * Gets a handle to the system's {@link ValueState}. The key/value state is only accessible if + * the function is executed on a KeyedStream. On each access, the state exposes the value for + * the key of the element currently processed by the function. Each function may have multiple + * partitioned states, addressed with different names. + * + * <p>Because the scope of each value is the key of the currently processed element, and the + * elements are distributed by the Flink runtime, the system can transparently scale out and + * redistribute the state and KeyedStream. + * + * @param stateProperties The descriptor defining the properties of the state. + * @param <T> The type of value stored in the state. + * @return The partitioned state object. + */ + <T> ValueState<T> getValueState(@Nonnull ValueStateDescriptor<T> stateProperties); +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index ee83d5f7c79..09fcf2636e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -18,15 +18,15 @@ package org.apache.flink.runtime.asyncprocessing; +import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.state.StateFutureUtils; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; -import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; -import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateBackendTestUtils; import org.apache.flink.runtime.state.v2.InternalValueState; import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.util.Preconditions; @@ -37,13 +37,13 @@ import java.util.HashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; /** Test for {@link AsyncExecutionController}. */ class AsyncExecutionControllerTest { AsyncExecutionController aec; - TestUnderlyingState underlyingState; AtomicInteger output; TestValueState valueState; @@ -63,16 +63,36 @@ class AsyncExecutionControllerTest { }; void setup(int batchSize, long timeout, int maxInFlight) { + StateExecutor stateExecutor = new TestStateExecutor(); + ValueStateDescriptor<Integer> stateDescriptor = + new ValueStateDescriptor<>("test-value-state", BasicTypeInfo.INT_TYPE_INFO); + Supplier<State> stateSupplier = + () -> new TestValueState(aec, new TestUnderlyingState(), stateDescriptor); + StateBackend testAsyncStateBackend = + StateBackendTestUtils.buildAsyncStateBackend(stateSupplier, stateExecutor); + assertThat(testAsyncStateBackend.supportsAsyncKeyedStateBackend()).isTrue(); + AsyncKeyedStateBackend asyncKeyedStateBackend; + try { + asyncKeyedStateBackend = testAsyncStateBackend.createAsyncKeyedStateBackend(null); + } catch (Exception e) { + throw new RuntimeException(e); + } aec = new AsyncExecutionController<>( new SyncMailboxExecutor(), - createStateExecutor(), + stateExecutor, 128, batchSize, timeout, maxInFlight); - underlyingState = new TestUnderlyingState(); - valueState = new TestValueState(aec, underlyingState); + asyncKeyedStateBackend.setup(aec); + + try { + valueState = asyncKeyedStateBackend.createState(stateDescriptor); + } catch (Exception e) { + throw new RuntimeException(e); + } + output = new AtomicInteger(); } @@ -466,12 +486,6 @@ class AsyncExecutionControllerTest { assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isTrue(); } - private StateExecutor createStateExecutor() { - TestAsyncStateBackend testAsyncStateBackend = new TestAsyncStateBackend(); - assertThat(testAsyncStateBackend.supportsAsyncKeyedStateBackend()).isTrue(); - return testAsyncStateBackend.createAsyncKeyedStateBackend(null).createStateExecutor(); - } - /** Simulate the underlying state that is actually used to execute the request. */ static class TestUnderlyingState { @@ -495,53 +509,15 @@ class AsyncExecutionControllerTest { private final TestUnderlyingState underlyingState; public TestValueState( - AsyncExecutionController<String> aec, TestUnderlyingState underlyingState) { - super(aec, new ValueStateDescriptor<>("test-value-state", BasicTypeInfo.INT_TYPE_INFO)); + StateRequestHandler stateRequestHandler, + TestUnderlyingState underlyingState, + ValueStateDescriptor<Integer> stateDescriptor) { + super(stateRequestHandler, stateDescriptor); this.underlyingState = underlyingState; assertThat(this.getValueSerializer()).isEqualTo(IntSerializer.INSTANCE); } } - /** - * A brief implementation of {@link StateBackend} which illustrates the interaction between AEC - * and StateBackend. - */ - static class TestAsyncStateBackend implements StateBackend { - - @Override - public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend( - KeyedStateBackendParameters<K> parameters) throws Exception { - throw new UnsupportedOperationException("Don't support createKeyedStateBackend yet"); - } - - @Override - public OperatorStateBackend createOperatorStateBackend( - OperatorStateBackendParameters parameters) throws Exception { - throw new UnsupportedOperationException("Don't support createOperatorStateBackend yet"); - } - - @Override - public boolean supportsAsyncKeyedStateBackend() { - return true; - } - - @Override - public <K> AsyncKeyedStateBackend createAsyncKeyedStateBackend( - KeyedStateBackendParameters<K> parameters) { - return new AsyncKeyedStateBackend() { - @Override - public StateExecutor createStateExecutor() { - return new TestStateExecutor(); - } - - @Override - public void dispose() { - // do nothing - } - }; - } - } - /** * A brief implementation of {@link StateExecutor}, to illustrate the interaction between AEC * and StateExecutor. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/MockStateExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java similarity index 83% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/MockStateExecutor.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java index b052717ca2f..b36c1da8426 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/MockStateExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java @@ -16,12 +16,8 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.asyncprocessing; +package org.apache.flink.runtime.asyncprocessing; -import org.apache.flink.runtime.asyncprocessing.MockStateRequestContainer; -import org.apache.flink.runtime.asyncprocessing.StateExecutor; -import org.apache.flink.runtime.asyncprocessing.StateRequest; -import org.apache.flink.runtime.asyncprocessing.StateRequestContainer; import org.apache.flink.util.Preconditions; import java.util.concurrent.CompletableFuture; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java index 836b4ab03c0..e1e0e5cb49c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java @@ -22,7 +22,11 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.asyncprocessing.MockStateExecutor; +import org.apache.flink.runtime.asyncprocessing.StateExecutor; +import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.util.function.FunctionWithException; @@ -31,6 +35,7 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.io.Serializable; import java.util.concurrent.RunnableFuture; +import java.util.function.Supplier; import java.util.stream.Stream; /** This class contains test utils of {@link StateBackend} */ @@ -44,6 +49,109 @@ public class StateBackendTestUtils { return new ApplyingSnapshotStateBackend(delegatedStataBackend, snapshotResultFunction); } + public static StateBackend buildAsyncStateBackend(StateBackend delegatedSyncStateBackend) { + return new TestAsyncStateBackend(delegatedSyncStateBackend) + .setInnerState(null) + .setStateExecutor(new MockStateExecutor()); + } + + public static StateBackend buildAsyncStateBackend( + Supplier<org.apache.flink.api.common.state.v2.State> innerStateSupplier, + StateExecutor stateExecutor) { + return new TestAsyncStateBackend(new HashMapStateBackend()) + .setInnerState(innerStateSupplier) + .setStateExecutor(stateExecutor); + } + + private static class TestAsyncStateBackend implements StateBackend { + + private final StateBackend delegatedStateBackend; + private Supplier<org.apache.flink.api.common.state.v2.State> innerStateSupplier; + private StateExecutor stateExecutor; + + public TestAsyncStateBackend(StateBackend delegatedStateBackend) { + this.delegatedStateBackend = delegatedStateBackend; + } + + public TestAsyncStateBackend setInnerState( + Supplier<org.apache.flink.api.common.state.v2.State> innerStateSupplier) { + this.innerStateSupplier = innerStateSupplier; + return this; + } + + public TestAsyncStateBackend setStateExecutor(StateExecutor stateExecutor) { + this.stateExecutor = stateExecutor; + return this; + } + + @Override + public boolean supportsAsyncKeyedStateBackend() { + return true; + } + + @Override + public <K> AsyncKeyedStateBackend createAsyncKeyedStateBackend( + KeyedStateBackendParameters<K> parameters) throws Exception { + return delegatedStateBackend.supportsAsyncKeyedStateBackend() + ? delegatedStateBackend.createAsyncKeyedStateBackend(parameters) + : new TestAsyncKeyedStateBackend(innerStateSupplier, stateExecutor); + } + + @Override + public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend( + KeyedStateBackendParameters<K> parameters) throws Exception { + return delegatedStateBackend.createKeyedStateBackend(parameters); + } + + @Override + public OperatorStateBackend createOperatorStateBackend( + OperatorStateBackendParameters parameters) throws Exception { + return delegatedStateBackend.createOperatorStateBackend(parameters); + } + } + + private static class TestAsyncKeyedStateBackend implements AsyncKeyedStateBackend { + + private final Supplier<org.apache.flink.api.common.state.v2.State> innerStateSupplier; + private final StateExecutor stateExecutor; + + public TestAsyncKeyedStateBackend( + Supplier<org.apache.flink.api.common.state.v2.State> innerStateSupplier, + StateExecutor stateExecutor) { + this.innerStateSupplier = innerStateSupplier; + this.stateExecutor = stateExecutor; + } + + @Override + public void setup(@Nonnull StateRequestHandler stateRequestHandler) { + // do nothing + } + + @Nonnull + @Override + @SuppressWarnings("unchecked") + public <SV, S extends org.apache.flink.api.common.state.v2.State> S createState( + @Nonnull org.apache.flink.runtime.state.v2.StateDescriptor<SV> stateDesc) { + return (S) innerStateSupplier.get(); + } + + @Nonnull + @Override + public StateExecutor createStateExecutor() { + return stateExecutor; + } + + @Override + public void dispose() { + // do nothing + } + + @Override + public void close() { + // do nothing + } + } + /** Wrapper of state backend which supports apply the snapshot result. */ private static class ApplyingSnapshotStateBackend extends AbstractStateBackend { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 6b97f02700a..eda7c254fd8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -138,7 +138,7 @@ public abstract class AbstractStreamOperator<OUT> */ protected transient KeySelector<?, ?> stateKeySelector2; - private transient StreamOperatorStateHandler stateHandler; + protected transient StreamOperatorStateHandler stateHandler; protected transient InternalTimeServiceManager<?> timeServiceManager; @@ -253,7 +253,7 @@ public abstract class AbstractStreamOperator<OUT> } @Override - public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) + public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { final TypeSerializer<?> keySerializer = diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index 25ca861fd94..20441bcffa5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -102,7 +102,7 @@ public abstract class AbstractStreamOperatorV2<OUT> protected final ProcessingTimeService processingTimeService; protected final RecordAttributes[] lastRecordAttributes; - private StreamOperatorStateHandler stateHandler; + protected StreamOperatorStateHandler stateHandler; protected InternalTimeServiceManager<?> timeServiceManager; public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int numberOfInputs) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java index 695129ca852..485f431f3d5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.OperatorStateBackend; @@ -54,6 +55,12 @@ public interface StreamOperatorStateContext { */ CheckpointableKeyedStateBackend<?> keyedStateBackend(); + /** + * Returns the async keyed state backend for the stream operator. This method returns null for + * operators which don't support async keyed state backend. + */ + AsyncKeyedStateBackend asyncKeyedStateBackend(); + /** * Returns the internal timer service manager for the stream operator. This method returns null * for non-keyed operators. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java index eb8a01b91eb..5068170328d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.SavepointType; import org.apache.flink.runtime.checkpoint.SnapshotType; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.DefaultKeyedStateStore; @@ -53,6 +54,8 @@ import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.runtime.state.StatePartitionStreamProvider; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.v2.DefaultKeyedStateStoreV2; +import org.apache.flink.runtime.state.v2.KeyedStateStoreV2; import org.apache.flink.util.CloseableIterable; import org.apache.flink.util.IOUtils; @@ -80,6 +83,10 @@ public class StreamOperatorStateHandler { protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + @Nullable private final AsyncKeyedStateBackend asyncKeyedStateBackend; + + @Nullable private final KeyedStateStoreV2 keyedStateStoreV2; + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ @Nullable private final CheckpointableKeyedStateBackend<?> keyedStateBackend; @@ -112,6 +119,12 @@ public class StreamOperatorStateHandler { } else { keyedStateStore = null; } + + this.asyncKeyedStateBackend = context.asyncKeyedStateBackend(); + this.keyedStateStoreV2 = + asyncKeyedStateBackend != null + ? new DefaultKeyedStateStoreV2(asyncKeyedStateBackend) + : null; } public void initializeOperatorState(CheckpointedStreamOperator streamOperator) @@ -152,12 +165,18 @@ public class StreamOperatorStateHandler { if (closeableRegistry.unregisterCloseable(keyedStateBackend)) { closer.register(keyedStateBackend); } + if (closeableRegistry.unregisterCloseable(asyncKeyedStateBackend)) { + closer.register(asyncKeyedStateBackend); + } if (operatorStateBackend != null) { closer.register(operatorStateBackend::dispose); } if (keyedStateBackend != null) { closer.register(keyedStateBackend::dispose); } + if (asyncKeyedStateBackend != null) { + closer.register(asyncKeyedStateBackend::dispose); + } } } @@ -325,6 +344,11 @@ public class StreamOperatorStateHandler { return (KeyedStateBackend<K>) keyedStateBackend; } + @Nullable + public AsyncKeyedStateBackend getAsyncKeyedStateBackend() { + return asyncKeyedStateBackend; + } + public OperatorStateBackend getOperatorStateBackend() { return operatorStateBackend; } @@ -400,6 +424,10 @@ public class StreamOperatorStateHandler { return Optional.ofNullable(keyedStateStore); } + public Optional<KeyedStateStoreV2> getKeyedStateStoreV2() { + return Optional.ofNullable(keyedStateStoreV2); + } + /** Custom state handling hooks to be invoked by {@link StreamOperatorStateHandler}. */ public interface CheckpointedStreamOperator { void initializeState(StateInitializationContext context) throws Exception; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 3e3a745c12a..65c04aef826 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManager import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; @@ -56,6 +57,7 @@ import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.Disposable; import org.apache.flink.util.Preconditions; import org.apache.flink.util.clock.SystemClock; @@ -64,6 +66,7 @@ import org.apache.commons.io.IOUtils; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -171,6 +174,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize taskStateManager.prioritizedOperatorState(operatorID); CheckpointableKeyedStateBackend<?> keyedStatedBackend = null; + AsyncKeyedStateBackend asyncKeyedStateBackend = null; OperatorStateBackend operatorStateBackend = null; CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null; CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null; @@ -182,15 +186,30 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize try { // -------------- Keyed State Backend -------------- - keyedStatedBackend = - keyedStatedBackend( - keySerializer, - operatorIdentifierText, - prioritizedOperatorSubtaskStates, - streamTaskCloseableRegistry, - metricGroup, - managedMemoryFraction, - statsCollector); + // TODO: Support KeyedStateBackend for AsyncKeyedStateBackend to unify the logic + if (stateBackend.supportsAsyncKeyedStateBackend()) { + asyncKeyedStateBackend = + keyedStatedBackend( + keySerializer, + operatorIdentifierText, + prioritizedOperatorSubtaskStates, + streamTaskCloseableRegistry, + metricGroup, + managedMemoryFraction, + statsCollector, + StateBackend::createAsyncKeyedStateBackend); + } else { + keyedStatedBackend = + keyedStatedBackend( + keySerializer, + operatorIdentifierText, + prioritizedOperatorSubtaskStates, + streamTaskCloseableRegistry, + metricGroup, + managedMemoryFraction, + statsCollector, + StateBackend::createKeyedStateBackend); + } // -------------- Operator State Backend -------------- operatorStateBackend = @@ -244,6 +263,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize } else { timeServiceManager = null; } + // TODO: Support Timer for AsyncKeyedStateBackend // Add stats for input channel and result partition state Stream.concat( @@ -269,6 +289,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize prioritizedOperatorSubtaskStates.getRestoredCheckpointId(), operatorStateBackend, keyedStatedBackend, + asyncKeyedStateBackend, timeServiceManager, rawOperatorStateInputs, rawKeyedStateInputs); @@ -283,6 +304,14 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize keyedStatedBackend.dispose(); } + if (asyncKeyedStateBackend != null) { + if (streamTaskCloseableRegistry.unregisterCloseable(asyncKeyedStateBackend)) { + IOUtils.closeQuietly(asyncKeyedStateBackend); + } + // release resource (e.g native resource) + asyncKeyedStateBackend.dispose(); + } + if (operatorStateBackend != null) { if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) { IOUtils.closeQuietly(operatorStateBackend); @@ -364,14 +393,15 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize } } - protected <K> CheckpointableKeyedStateBackend<K> keyedStatedBackend( + protected <K, R extends Disposable & Closeable> R keyedStatedBackend( TypeSerializer<K> keySerializer, String operatorIdentifierText, PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates, CloseableRegistry backendCloseableRegistry, MetricGroup metricGroup, double managedMemoryFraction, - StateObject.StateObjectSizeStatsCollector statsCollector) + StateObject.StateObjectSizeStatsCollector statsCollector, + KeyedStateBackendCreator<K, R> keyedStateBackendCreator) throws Exception { if (keySerializer == null) { @@ -395,35 +425,33 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize // input stream opened for serDe during restore. CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry(); backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore); - BackendRestorerProcedure<CheckpointableKeyedStateBackend<K>, KeyedStateHandle> - backendRestorer = - new BackendRestorerProcedure<>( - (stateHandles) -> { - KeyedStateBackendParametersImpl<K> parameters = - new KeyedStateBackendParametersImpl<>( - environment, - environment.getJobID(), - operatorIdentifierText, - keySerializer, - taskInfo.getMaxNumberOfParallelSubtasks(), - keyGroupRange, - environment.getTaskKvStateRegistry(), - ttlTimeProvider, - metricGroup, - initializationMetrics::addDurationMetric, - stateHandles, - cancelStreamRegistryForRestore, - managedMemoryFraction); - return loadStateBackendFromKeyedStateHandles( - stateBackend, - environment - .getUserCodeClassLoader() - .asClassLoader(), - stateHandles) - .createKeyedStateBackend(parameters); - }, - backendCloseableRegistry, - logDescription); + BackendRestorerProcedure<R, KeyedStateHandle> backendRestorer = + new BackendRestorerProcedure<>( + (stateHandles) -> { + KeyedStateBackendParametersImpl<K> parameters = + new KeyedStateBackendParametersImpl<>( + environment, + environment.getJobID(), + operatorIdentifierText, + keySerializer, + taskInfo.getMaxNumberOfParallelSubtasks(), + keyGroupRange, + environment.getTaskKvStateRegistry(), + ttlTimeProvider, + metricGroup, + initializationMetrics::addDurationMetric, + stateHandles, + cancelStreamRegistryForRestore, + managedMemoryFraction); + return keyedStateBackendCreator.create( + loadStateBackendFromKeyedStateHandles( + stateBackend, + environment.getUserCodeClassLoader().asClassLoader(), + stateHandles), + parameters); + }, + backendCloseableRegistry, + logDescription); try { return backendRestorer.createAndRestore( @@ -436,6 +464,17 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize } } + /** Functional interface to create the keyed state backend. */ + @FunctionalInterface + protected interface KeyedStateBackendCreator<K, R extends Disposable & Closeable> { + + /** Create the keyed state backend. */ + R create( + StateBackend stateBackend, + StateBackend.KeyedStateBackendParameters<K> keyedStateBackendParameters) + throws Exception; + } + protected CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs( @Nonnull Iterator<StateObjectCollection<OperatorStateHandle>> restoreStateAlternatives, @Nonnull StateObject.StateObjectSizeStatsCollector statsCollector) { @@ -733,6 +772,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize private final OperatorStateBackend operatorStateBackend; private final CheckpointableKeyedStateBackend<?> keyedStateBackend; + private final AsyncKeyedStateBackend asyncKeyedStateBackend; private final InternalTimeServiceManager<?> internalTimeServiceManager; private final CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs; @@ -742,6 +782,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize @Nullable Long restoredCheckpointId, OperatorStateBackend operatorStateBackend, CheckpointableKeyedStateBackend<?> keyedStateBackend, + AsyncKeyedStateBackend asyncKeyedStateBackend, InternalTimeServiceManager<?> internalTimeServiceManager, CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs, CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs) { @@ -749,6 +790,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize this.restoredCheckpointId = restoredCheckpointId; this.operatorStateBackend = operatorStateBackend; this.keyedStateBackend = keyedStateBackend; + this.asyncKeyedStateBackend = asyncKeyedStateBackend; this.internalTimeServiceManager = internalTimeServiceManager; this.rawOperatorStateInputs = rawOperatorStateInputs; this.rawKeyedStateInputs = rawKeyedStateInputs; @@ -766,6 +808,11 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize return keyedStateBackend; } + @Override + public AsyncKeyedStateBackend asyncKeyedStateBackend() { + return asyncKeyedStateBackend; + } + @Override public OperatorStateBackend operatorStateBackend() { return operatorStateBackend; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 75bed241a4c..2e82d1c2b08 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.state.v2.KeyedStateStoreV2; import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -69,6 +70,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { private final String operatorUniqueID; private final ProcessingTimeService processingTimeService; private @Nullable KeyedStateStore keyedStateStore; + private @Nullable KeyedStateStoreV2 keyedStateStoreV2; private final ExternalResourceInfoProvider externalResourceInfoProvider; @VisibleForTesting @@ -114,6 +116,10 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { this.keyedStateStore = keyedStateStore; } + public void setKeyedStateStoreV2(@Nullable KeyedStateStoreV2 keyedStateStoreV2) { + this.keyedStateStoreV2 = keyedStateStoreV2; + } + // ------------------------------------------------------------------------ /** @@ -242,6 +248,25 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { return keyedStateStore; } + // TODO: Reconstruct this after StateManager is ready in FLIP-410. + public <T> org.apache.flink.api.common.state.v2.ValueState<T> getValueState( + org.apache.flink.runtime.state.v2.ValueStateDescriptor<T> stateProperties) { + KeyedStateStoreV2 keyedStateStoreV2 = + checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); + return keyedStateStoreV2.getValueState(stateProperties); + } + + private KeyedStateStoreV2 checkPreconditionsAndGetKeyedStateStoreV2( + org.apache.flink.runtime.state.v2.StateDescriptor<?> stateDescriptor) { + checkNotNull(stateDescriptor, "The state properties must not be null"); + checkNotNull( + keyedStateStoreV2, + String.format( + "Keyed state '%s' with type %s can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.", + stateDescriptor.getStateId(), stateDescriptor.getType())); + return keyedStateStoreV2; + } + // ------------------ expose (read only) relevant information from the stream config -------- // /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java index b23aa74c7f8..99439de600b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java @@ -27,15 +27,15 @@ import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyedStateBackend; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; -import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -43,6 +43,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; +import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** @@ -61,11 +62,11 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre /** Initialize necessary state components for {@link AbstractStreamOperator}. */ @Override - public void setup( - StreamTask<?, ?> containingTask, - StreamConfig config, - Output<StreamRecord<OUT>> output) { - super.setup(containingTask, config, output); + public void initializeState(StreamTaskStateInitializer streamTaskStateManager) + throws Exception { + super.initializeState(streamTaskStateManager); + getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null)); + final StreamTask<?, ?> containingTask = checkNotNull(getContainingTask()); final Environment environment = containingTask.getEnvironment(); final MailboxExecutor mailboxExecutor = environment.getMainMailboxExecutor(); final int maxParallelism = environment.getTaskInfo().getMaxNumberOfParallelSubtasks(); @@ -74,15 +75,22 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre final int asyncBufferSize = environment.getExecutionConfig().getAsyncStateBufferSize(); final long asyncBufferTimeout = environment.getExecutionConfig().getAsyncStateBufferTimeout(); - // TODO: initial state executor and set state executor for aec - this.asyncExecutionController = - new AsyncExecutionController( - mailboxExecutor, - null, - maxParallelism, - asyncBufferSize, - asyncBufferTimeout, - inFlightRecordsLimit); + + AsyncKeyedStateBackend asyncKeyedStateBackend = stateHandler.getAsyncKeyedStateBackend(); + if (asyncKeyedStateBackend != null) { + this.asyncExecutionController = + new AsyncExecutionController( + mailboxExecutor, + asyncKeyedStateBackend.createStateExecutor(), + maxParallelism, + asyncBufferSize, + asyncBufferTimeout, + inFlightRecordsLimit); + asyncKeyedStateBackend.setup(asyncExecutionController); + } else if (stateHandler.getKeyedStateBackend() != null) { + throw new UnsupportedOperationException( + "Current State Backend doesn't support async access, AsyncExecutionController could not work"); + } } @Override @@ -220,6 +228,29 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre (AsyncExecutionController<K>) asyncExecutionController); } + @Override + @SuppressWarnings("unchecked") + public void setKeyContextElement1(StreamRecord record) throws Exception { + super.setKeyContextElement1(record); + if (stateKeySelector1 != null) { + setAsyncKeyedContextElement(record, stateKeySelector1); + } + } + + @Override + @SuppressWarnings("unchecked") + public void setKeyContextElement2(StreamRecord record) throws Exception { + super.setKeyContextElement2(record); + if (stateKeySelector2 != null) { + setAsyncKeyedContextElement(record, stateKeySelector2); + } + } + + @Override + public Object getCurrentKey() { + return currentProcessingContext.getKey(); + } + @VisibleForTesting AsyncExecutionController<?> getAsyncExecutionController() { return asyncExecutionController; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java index 2c769af2918..98b542a2964 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; @@ -70,19 +71,28 @@ public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { super.initializeState(streamTaskStateManager); + getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null)); final int inFlightRecordsLimit = getExecutionConfig().getAsyncInflightRecordsLimit(); final int asyncBufferSize = getExecutionConfig().getAsyncStateBufferSize(); final long asyncBufferTimeout = getExecutionConfig().getAsyncStateBufferTimeout(); int maxParallelism = getExecutionConfig().getMaxParallelism(); - this.asyncExecutionController = - new AsyncExecutionController( - mailboxExecutor, - null, - maxParallelism, - asyncBufferSize, - asyncBufferTimeout, - inFlightRecordsLimit); + + AsyncKeyedStateBackend asyncKeyedStateBackend = stateHandler.getAsyncKeyedStateBackend(); + if (asyncKeyedStateBackend != null) { + this.asyncExecutionController = + new AsyncExecutionController( + mailboxExecutor, + asyncKeyedStateBackend.createStateExecutor(), + maxParallelism, + asyncBufferSize, + asyncBufferTimeout, + inFlightRecordsLimit); + asyncKeyedStateBackend.setup(asyncExecutionController); + } else if (stateHandler.getKeyedStateBackend() != null) { + throw new UnsupportedOperationException( + "Current State Backend doesn't support async access, AsyncExecutionController could not work"); + } } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java index 26e23fb5db3..985b4a2b9fd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.MockStateExecutor; import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.asyncprocessing.StateRequestType; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; @@ -30,7 +31,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; -import org.apache.flink.streaming.runtime.operators.asyncprocessing.MockStateExecutor; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index 24e4bf5d209..58f68674390 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.SerializerFactory; +import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -51,6 +52,7 @@ import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.DefaultKeyedStateStore; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackend; @@ -59,6 +61,7 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.state.v2.DefaultKeyedStateStoreV2; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -245,6 +248,31 @@ class StreamingRuntimeContextTest { assertThat(value.iterator()).isExhausted(); } + @Test + void testAsyncValueStateInstantiation() throws Exception { + + final ExecutionConfig config = new ExecutionConfig(); + SerializerConfig serializerConfig = config.getSerializerConfig(); + serializerConfig.registerKryoType(Path.class); + + final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); + + StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); + org.apache.flink.runtime.state.v2.ValueStateDescriptor<TaskInfo> descr = + new org.apache.flink.runtime.state.v2.ValueStateDescriptor<>( + "name", TypeInformation.of(TaskInfo.class), serializerConfig); + context.getValueState(descr); + + org.apache.flink.runtime.state.v2.ValueStateDescriptor<?> descrIntercepted = + (org.apache.flink.runtime.state.v2.ValueStateDescriptor<?>) descriptorCapture.get(); + TypeSerializer<?> serializer = descrIntercepted.getSerializer(); + + // check that the Path class is really registered, i.e., the execution config was applied + assertThat(serializer).isInstanceOf(KryoSerializer.class); + assertThat(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId()) + .isPositive(); + } + // ------------------------------------------------------------------------ // // ------------------------------------------------------------------------ @@ -308,6 +336,8 @@ class StreamingRuntimeContextTest { KeyedStateBackend keyedStateBackend = mock(KeyedStateBackend.class); + AsyncKeyedStateBackend asyncKeyedStateBackend = mock(AsyncKeyedStateBackend.class); + DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore( keyedStateBackend, @@ -321,21 +351,28 @@ class StreamingRuntimeContextTest { }); doAnswer( - new Answer<Object>() { - - @Override - public Object answer(InvocationOnMock invocationOnMock) - throws Throwable { - ref.set(invocationOnMock.getArguments()[2]); - return null; - } - }) + (Answer<Object>) + invocationOnMock -> { + ref.set(invocationOnMock.getArguments()[2]); + return null; + }) .when(keyedStateBackend) .getPartitionedState( Matchers.any(), any(TypeSerializer.class), any(StateDescriptor.class)); + doAnswer( + (Answer<Object>) + invocationOnMock -> { + ref.set(invocationOnMock.getArguments()[0]); + return null; + }) + .when(asyncKeyedStateBackend) + .createState(any(org.apache.flink.runtime.state.v2.StateDescriptor.class)); + operator.initializeState(streamTaskStateManager); operator.getRuntimeContext().setKeyedStateStore(keyedStateStore); + operator.getRuntimeContext() + .setKeyedStateStoreV2(new DefaultKeyedStateStoreV2(asyncKeyedStateBackend)); return operator; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java index 17e36fb3af3..e4946b8de00 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl; @@ -39,12 +40,14 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.function.ThrowingConsumer; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.flink.runtime.state.StateBackendTestUtils.buildAsyncStateBackend; import static org.assertj.core.api.Assertions.assertThat; /** Basic tests for {@link AbstractAsyncStateStreamOperator}. */ @@ -55,13 +58,17 @@ public class AbstractAsyncStateStreamOperatorTest { int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) throws Exception { TestOperator testOperator = new TestOperator(elementOrder); - return new KeyedOneInputStreamOperatorTestHarness<>( - testOperator, - new TestKeySelector(), - BasicTypeInfo.INT_TYPE_INFO, - maxParalelism, - numSubtasks, - subtaskIndex); + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> + testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + testOperator, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + maxParalelism, + numSubtasks, + subtaskIndex); + testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend())); + return testHarness; } @Test @@ -75,6 +82,11 @@ public class AbstractAsyncStateStreamOperatorTest { ((AbstractAsyncStateStreamOperator) testHarness.getOperator()) .getAsyncExecutionController()) .isNotNull(); + assertThat( + ((AbstractAsyncStateStreamOperator) testHarness.getOperator()) + .getAsyncExecutionController() + .getStateExecutor()) + .isNotNull(); } } @@ -150,7 +162,6 @@ public class AbstractAsyncStateStreamOperatorTest { AsyncExecutionController asyncExecutionController = ((AbstractAsyncStateStreamOperator) testHarness.getOperator()) .getAsyncExecutionController(); - asyncExecutionController.setStateExecutor(new MockStateExecutor()); ((AbstractAsyncStateStreamOperator<String>) testHarness.getOperator()) .setAsyncKeyedContextElement( new StreamRecord<>(Tuple2.of(5, "5")), new TestKeySelector()); @@ -171,6 +182,7 @@ public class AbstractAsyncStateStreamOperatorTest { } } + @Disabled("Support Timer for AsyncKeyedStateBackend") @Test void testTimerServiceIsAsync() throws Exception { try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java index be004e711ff..863c09fd88d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; import org.apache.flink.streaming.api.operators.AbstractInput; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; @@ -55,6 +56,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.flink.runtime.state.StateBackendTestUtils.buildAsyncStateBackend; import static org.assertj.core.api.Assertions.assertThat; /** Basic tests for {@link AbstractAsyncStateStreamOperatorV2}. */ @@ -64,13 +66,17 @@ public class AbstractAsyncStateStreamOperatorV2Test { createTestHarness( int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) throws Exception { - return new KeyedOneInputStreamOperatorV2TestHarness<>( - new TestOperatorFactory(elementOrder), - new AbstractAsyncStateStreamOperatorTest.TestKeySelector(), - BasicTypeInfo.INT_TYPE_INFO, - maxParalelism, - numSubtasks, - subtaskIndex); + KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> + testHarness = + new KeyedOneInputStreamOperatorV2TestHarness<>( + new TestOperatorFactory(elementOrder), + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + maxParalelism, + numSubtasks, + subtaskIndex); + testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend())); + return testHarness; } @Test @@ -84,6 +90,11 @@ public class AbstractAsyncStateStreamOperatorV2Test { ((AbstractAsyncStateStreamOperatorV2) testHarness.getBaseOperator()) .getAsyncExecutionController()) .isNotNull(); + assertThat( + ((AbstractAsyncStateStreamOperatorV2) testHarness.getBaseOperator()) + .getAsyncExecutionController() + .getStateExecutor()) + .isNotNull(); } } @@ -162,7 +173,6 @@ public class AbstractAsyncStateStreamOperatorV2Test { CheckpointStorageLocationReference.getDefault(); AsyncExecutionController asyncExecutionController = testOperator.getAsyncExecutionController(); - asyncExecutionController.setStateExecutor(new MockStateExecutor()); testOperator.setAsyncKeyedContextElement( new StreamRecord<>(Tuple2.of(5, "5")), new TestKeySelector()); asyncExecutionController.handleRequest(null, StateRequestType.VALUE_GET, null); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index ccf6ab98311..70f8d46fa96 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -76,6 +76,7 @@ import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.DoneFuture; @@ -2353,6 +2354,11 @@ public class StreamTaskTest { return controller.keyedStateBackend(); } + @Override + public AsyncKeyedStateBackend asyncKeyedStateBackend() { + return controller.asyncKeyedStateBackend(); + } + @Override public InternalTimeServiceManager<?> internalTimerServiceManager() { InternalTimeServiceManager<?> timeServiceManager = diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index f31dee84ad8..b2e7a6b169e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -50,7 +50,9 @@ import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateBackendTestUtils; import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.ttl.MockTtlTimeProvider; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; @@ -73,6 +75,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -303,6 +306,14 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { ttlTimeProvider = new MockTtlTimeProvider(); ttlTimeProvider.setCurrentTimestamp(0); + if (operator instanceof AsyncStateProcessing + || (factory instanceof SimpleOperatorFactory + && ((SimpleOperatorFactory<OUT>) factory).getOperator() + instanceof AsyncStateProcessing)) { + setStateBackend( + StateBackendTestUtils.buildAsyncStateBackend(new HashMapStateBackend())); + } + this.streamTaskStateInitializer = createStreamTaskStateManager( environment, stateBackend, ttlTimeProvider, timeServiceManagerProvider);