This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3e6842261b05b934544cb8d2e61c756cf2562ba2
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Mon Apr 29 15:54:44 2024 +0800

    [FLINK-35262][state] Bridge between AsyncKeyedStateBackend and 
AsyncExecutionController
---
 .../asyncprocessing/AsyncExecutionController.java  |   4 +-
 .../runtime/state/AsyncKeyedStateBackend.java      |  31 ++++-
 .../runtime/state/v2/DefaultKeyedStateStoreV2.java |  45 ++++++++
 .../flink/runtime/state/v2/KeyedStateStoreV2.java  |  46 ++++++++
 .../AsyncExecutionControllerTest.java              |  84 +++++---------
 .../asyncprocessing/MockStateExecutor.java         |   6 +-
 .../flink/runtime/state/StateBackendTestUtils.java | 108 ++++++++++++++++++
 .../api/operators/AbstractStreamOperator.java      |   4 +-
 .../api/operators/AbstractStreamOperatorV2.java    |   2 +-
 .../api/operators/StreamOperatorStateContext.java  |   7 ++
 .../api/operators/StreamOperatorStateHandler.java  |  28 +++++
 .../operators/StreamTaskStateInitializerImpl.java  | 127 ++++++++++++++-------
 .../api/operators/StreamingRuntimeContext.java     |  25 ++++
 .../AbstractAsyncStateStreamOperator.java          |  63 +++++++---
 .../AbstractAsyncStateStreamOperatorV2.java        |  26 +++--
 .../InternalTimerServiceAsyncImplTest.java         |   2 +-
 .../api/operators/StreamingRuntimeContextTest.java |  55 +++++++--
 .../AbstractAsyncStateStreamOperatorTest.java      |  28 +++--
 .../AbstractAsyncStateStreamOperatorV2Test.java    |  26 +++--
 .../streaming/runtime/tasks/StreamTaskTest.java    |   6 +
 .../util/AbstractStreamOperatorTestHarness.java    |  11 ++
 21 files changed, 579 insertions(+), 155 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index 83c18a70be0..fefc310d1f9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -320,8 +320,8 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
     }
 
     @VisibleForTesting
-    public void setStateExecutor(StateExecutor stateExecutor) {
-        this.stateExecutor = stateExecutor;
+    public StateExecutor getStateExecutor() {
+        return stateExecutor;
     }
 
     @VisibleForTesting
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
index 99c48f4abad..70cdfbef767 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
@@ -19,15 +19,40 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.runtime.asyncprocessing.StateExecutor;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
+import org.apache.flink.runtime.state.v2.StateDescriptor;
 import org.apache.flink.util.Disposable;
 
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+
 /**
  * An async keyed state backend provides methods supporting to access keyed 
state asynchronously and
  * in batch.
  */
 @Internal
-public interface AsyncKeyedStateBackend extends Disposable {
+public interface AsyncKeyedStateBackend extends Disposable, Closeable {
+
+    /**
+     * Initializes with some contexts.
+     *
+     * @param stateRequestHandler which handles state request.
+     */
+    void setup(@Nonnull StateRequestHandler stateRequestHandler);
+
+    /**
+     * Creates and returns a new state.
+     *
+     * @param stateDesc The {@code StateDescriptor} that contains the name of 
the state.
+     * @param <SV> The type of the stored state value.
+     * @param <S> The type of the public API state.
+     * @throws Exception Exceptions may occur during initialization of the 
state.
+     */
+    @Nonnull
+    <SV, S extends State> S createState(@Nonnull StateDescriptor<SV> 
stateDesc) throws Exception;
 
     /**
      * Creates a {@code StateExecutor} which supports to execute a batch of 
state requests
@@ -36,5 +61,9 @@ public interface AsyncKeyedStateBackend extends Disposable {
      * @return a {@code StateExecutor} which supports to execute a batch of 
state requests
      *     asynchronously.
      */
+    @Nonnull
     StateExecutor createStateExecutor();
+
+    @Override
+    void dispose();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStoreV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStoreV2.java
new file mode 100644
index 00000000000..7bc2b58f2f1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStoreV2.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+/** Default implementation of KeyedStateStoreV2. */
+public class DefaultKeyedStateStoreV2 implements KeyedStateStoreV2 {
+
+    private final AsyncKeyedStateBackend asyncKeyedStateBackend;
+
+    public DefaultKeyedStateStoreV2(@Nonnull AsyncKeyedStateBackend 
asyncKeyedStateBackend) {
+        this.asyncKeyedStateBackend = 
Preconditions.checkNotNull(asyncKeyedStateBackend);
+    }
+
+    @Override
+    public <T> ValueState<T> getValueState(@Nonnull ValueStateDescriptor<T> 
stateProperties) {
+        Preconditions.checkNotNull(stateProperties, "The state properties must 
not be null");
+        try {
+            return asyncKeyedStateBackend.createState(stateProperties);
+        } catch (Exception e) {
+            throw new RuntimeException("Error while getting state", e);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStoreV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStoreV2.java
new file mode 100644
index 00000000000..0c7c27b429f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStoreV2.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.ValueState;
+
+import javax.annotation.Nonnull;
+
+/** This interface contains methods for registering {@link State}. */
+@Internal
+public interface KeyedStateStoreV2 {
+
+    /**
+     * Gets a handle to the system's {@link ValueState}. The key/value state 
is only accessible if
+     * the function is executed on a KeyedStream. On each access, the state 
exposes the value for
+     * the key of the element currently processed by the function. Each 
function may have multiple
+     * partitioned states, addressed with different names.
+     *
+     * <p>Because the scope of each value is the key of the currently 
processed element, and the
+     * elements are distributed by the Flink runtime, the system can 
transparently scale out and
+     * redistribute the state and KeyedStream.
+     *
+     * @param stateProperties The descriptor defining the properties of the 
state.
+     * @param <T> The type of value stored in the state.
+     * @return The partitioned state object.
+     */
+    <T> ValueState<T> getValueState(@Nonnull ValueStateDescriptor<T> 
stateProperties);
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index ee83d5f7c79..09fcf2636e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.asyncprocessing;
 
+import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.state.StateFutureUtils;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
-import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendTestUtils;
 import org.apache.flink.runtime.state.v2.InternalValueState;
 import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
 import org.apache.flink.util.Preconditions;
@@ -37,13 +37,13 @@ import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 /** Test for {@link AsyncExecutionController}. */
 class AsyncExecutionControllerTest {
     AsyncExecutionController aec;
-    TestUnderlyingState underlyingState;
     AtomicInteger output;
     TestValueState valueState;
 
@@ -63,16 +63,36 @@ class AsyncExecutionControllerTest {
             };
 
     void setup(int batchSize, long timeout, int maxInFlight) {
+        StateExecutor stateExecutor = new TestStateExecutor();
+        ValueStateDescriptor<Integer> stateDescriptor =
+                new ValueStateDescriptor<>("test-value-state", 
BasicTypeInfo.INT_TYPE_INFO);
+        Supplier<State> stateSupplier =
+                () -> new TestValueState(aec, new TestUnderlyingState(), 
stateDescriptor);
+        StateBackend testAsyncStateBackend =
+                StateBackendTestUtils.buildAsyncStateBackend(stateSupplier, 
stateExecutor);
+        
assertThat(testAsyncStateBackend.supportsAsyncKeyedStateBackend()).isTrue();
+        AsyncKeyedStateBackend asyncKeyedStateBackend;
+        try {
+            asyncKeyedStateBackend = 
testAsyncStateBackend.createAsyncKeyedStateBackend(null);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
         aec =
                 new AsyncExecutionController<>(
                         new SyncMailboxExecutor(),
-                        createStateExecutor(),
+                        stateExecutor,
                         128,
                         batchSize,
                         timeout,
                         maxInFlight);
-        underlyingState = new TestUnderlyingState();
-        valueState = new TestValueState(aec, underlyingState);
+        asyncKeyedStateBackend.setup(aec);
+
+        try {
+            valueState = asyncKeyedStateBackend.createState(stateDescriptor);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
         output = new AtomicInteger();
     }
 
@@ -466,12 +486,6 @@ class AsyncExecutionControllerTest {
         
assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isTrue();
     }
 
-    private StateExecutor createStateExecutor() {
-        TestAsyncStateBackend testAsyncStateBackend = new 
TestAsyncStateBackend();
-        
assertThat(testAsyncStateBackend.supportsAsyncKeyedStateBackend()).isTrue();
-        return 
testAsyncStateBackend.createAsyncKeyedStateBackend(null).createStateExecutor();
-    }
-
     /** Simulate the underlying state that is actually used to execute the 
request. */
     static class TestUnderlyingState {
 
@@ -495,53 +509,15 @@ class AsyncExecutionControllerTest {
         private final TestUnderlyingState underlyingState;
 
         public TestValueState(
-                AsyncExecutionController<String> aec, TestUnderlyingState 
underlyingState) {
-            super(aec, new ValueStateDescriptor<>("test-value-state", 
BasicTypeInfo.INT_TYPE_INFO));
+                StateRequestHandler stateRequestHandler,
+                TestUnderlyingState underlyingState,
+                ValueStateDescriptor<Integer> stateDescriptor) {
+            super(stateRequestHandler, stateDescriptor);
             this.underlyingState = underlyingState;
             
assertThat(this.getValueSerializer()).isEqualTo(IntSerializer.INSTANCE);
         }
     }
 
-    /**
-     * A brief implementation of {@link StateBackend} which illustrates the 
interaction between AEC
-     * and StateBackend.
-     */
-    static class TestAsyncStateBackend implements StateBackend {
-
-        @Override
-        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
-                KeyedStateBackendParameters<K> parameters) throws Exception {
-            throw new UnsupportedOperationException("Don't support 
createKeyedStateBackend yet");
-        }
-
-        @Override
-        public OperatorStateBackend createOperatorStateBackend(
-                OperatorStateBackendParameters parameters) throws Exception {
-            throw new UnsupportedOperationException("Don't support 
createOperatorStateBackend yet");
-        }
-
-        @Override
-        public boolean supportsAsyncKeyedStateBackend() {
-            return true;
-        }
-
-        @Override
-        public <K> AsyncKeyedStateBackend createAsyncKeyedStateBackend(
-                KeyedStateBackendParameters<K> parameters) {
-            return new AsyncKeyedStateBackend() {
-                @Override
-                public StateExecutor createStateExecutor() {
-                    return new TestStateExecutor();
-                }
-
-                @Override
-                public void dispose() {
-                    // do nothing
-                }
-            };
-        }
-    }
-
     /**
      * A brief implementation of {@link StateExecutor}, to illustrate the 
interaction between AEC
      * and StateExecutor.
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/MockStateExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java
similarity index 83%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/MockStateExecutor.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java
index b052717ca2f..b36c1da8426 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/MockStateExecutor.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java
@@ -16,12 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+package org.apache.flink.runtime.asyncprocessing;
 
-import org.apache.flink.runtime.asyncprocessing.MockStateRequestContainer;
-import org.apache.flink.runtime.asyncprocessing.StateExecutor;
-import org.apache.flink.runtime.asyncprocessing.StateRequest;
-import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
 import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.CompletableFuture;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
index 836b4ab03c0..e1e0e5cb49c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
@@ -22,7 +22,11 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.asyncprocessing.MockStateExecutor;
+import org.apache.flink.runtime.asyncprocessing.StateExecutor;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.util.function.FunctionWithException;
 
@@ -31,6 +35,7 @@ import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.concurrent.RunnableFuture;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 /** This class contains test utils of {@link StateBackend} */
@@ -44,6 +49,109 @@ public class StateBackendTestUtils {
         return new ApplyingSnapshotStateBackend(delegatedStataBackend, 
snapshotResultFunction);
     }
 
+    public static StateBackend buildAsyncStateBackend(StateBackend 
delegatedSyncStateBackend) {
+        return new TestAsyncStateBackend(delegatedSyncStateBackend)
+                .setInnerState(null)
+                .setStateExecutor(new MockStateExecutor());
+    }
+
+    public static StateBackend buildAsyncStateBackend(
+            Supplier<org.apache.flink.api.common.state.v2.State> 
innerStateSupplier,
+            StateExecutor stateExecutor) {
+        return new TestAsyncStateBackend(new HashMapStateBackend())
+                .setInnerState(innerStateSupplier)
+                .setStateExecutor(stateExecutor);
+    }
+
+    private static class TestAsyncStateBackend implements StateBackend {
+
+        private final StateBackend delegatedStateBackend;
+        private Supplier<org.apache.flink.api.common.state.v2.State> 
innerStateSupplier;
+        private StateExecutor stateExecutor;
+
+        public TestAsyncStateBackend(StateBackend delegatedStateBackend) {
+            this.delegatedStateBackend = delegatedStateBackend;
+        }
+
+        public TestAsyncStateBackend setInnerState(
+                Supplier<org.apache.flink.api.common.state.v2.State> 
innerStateSupplier) {
+            this.innerStateSupplier = innerStateSupplier;
+            return this;
+        }
+
+        public TestAsyncStateBackend setStateExecutor(StateExecutor 
stateExecutor) {
+            this.stateExecutor = stateExecutor;
+            return this;
+        }
+
+        @Override
+        public boolean supportsAsyncKeyedStateBackend() {
+            return true;
+        }
+
+        @Override
+        public <K> AsyncKeyedStateBackend createAsyncKeyedStateBackend(
+                KeyedStateBackendParameters<K> parameters) throws Exception {
+            return delegatedStateBackend.supportsAsyncKeyedStateBackend()
+                    ? 
delegatedStateBackend.createAsyncKeyedStateBackend(parameters)
+                    : new TestAsyncKeyedStateBackend(innerStateSupplier, 
stateExecutor);
+        }
+
+        @Override
+        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
+                KeyedStateBackendParameters<K> parameters) throws Exception {
+            return delegatedStateBackend.createKeyedStateBackend(parameters);
+        }
+
+        @Override
+        public OperatorStateBackend createOperatorStateBackend(
+                OperatorStateBackendParameters parameters) throws Exception {
+            return 
delegatedStateBackend.createOperatorStateBackend(parameters);
+        }
+    }
+
+    private static class TestAsyncKeyedStateBackend implements 
AsyncKeyedStateBackend {
+
+        private final Supplier<org.apache.flink.api.common.state.v2.State> 
innerStateSupplier;
+        private final StateExecutor stateExecutor;
+
+        public TestAsyncKeyedStateBackend(
+                Supplier<org.apache.flink.api.common.state.v2.State> 
innerStateSupplier,
+                StateExecutor stateExecutor) {
+            this.innerStateSupplier = innerStateSupplier;
+            this.stateExecutor = stateExecutor;
+        }
+
+        @Override
+        public void setup(@Nonnull StateRequestHandler stateRequestHandler) {
+            // do nothing
+        }
+
+        @Nonnull
+        @Override
+        @SuppressWarnings("unchecked")
+        public <SV, S extends org.apache.flink.api.common.state.v2.State> S 
createState(
+                @Nonnull org.apache.flink.runtime.state.v2.StateDescriptor<SV> 
stateDesc) {
+            return (S) innerStateSupplier.get();
+        }
+
+        @Nonnull
+        @Override
+        public StateExecutor createStateExecutor() {
+            return stateExecutor;
+        }
+
+        @Override
+        public void dispose() {
+            // do nothing
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+    }
+
     /** Wrapper of state backend which supports apply the snapshot result. */
     private static class ApplyingSnapshotStateBackend extends 
AbstractStateBackend {
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 6b97f02700a..eda7c254fd8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -138,7 +138,7 @@ public abstract class AbstractStreamOperator<OUT>
      */
     protected transient KeySelector<?, ?> stateKeySelector2;
 
-    private transient StreamOperatorStateHandler stateHandler;
+    protected transient StreamOperatorStateHandler stateHandler;
 
     protected transient InternalTimeServiceManager<?> timeServiceManager;
 
@@ -253,7 +253,7 @@ public abstract class AbstractStreamOperator<OUT>
     }
 
     @Override
-    public final void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
+    public void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
             throws Exception {
 
         final TypeSerializer<?> keySerializer =
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 25ca861fd94..20441bcffa5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -102,7 +102,7 @@ public abstract class AbstractStreamOperatorV2<OUT>
     protected final ProcessingTimeService processingTimeService;
     protected final RecordAttributes[] lastRecordAttributes;
 
-    private StreamOperatorStateHandler stateHandler;
+    protected StreamOperatorStateHandler stateHandler;
     protected InternalTimeServiceManager<?> timeServiceManager;
 
     public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, 
int numberOfInputs) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
index 695129ca852..485f431f3d5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
 import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -54,6 +55,12 @@ public interface StreamOperatorStateContext {
      */
     CheckpointableKeyedStateBackend<?> keyedStateBackend();
 
+    /**
+     * Returns the async keyed state backend for the stream operator. This 
method returns null for
+     * operators which don't support async keyed state backend.
+     */
+    AsyncKeyedStateBackend asyncKeyedStateBackend();
+
     /**
      * Returns the internal timer service manager for the stream operator. 
This method returns null
      * for non-keyed operators.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
index eb8a01b91eb..5068170328d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SavepointType;
 import org.apache.flink.runtime.checkpoint.SnapshotType;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.DefaultKeyedStateStore;
@@ -53,6 +54,8 @@ import 
org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.v2.DefaultKeyedStateStoreV2;
+import org.apache.flink.runtime.state.v2.KeyedStateStoreV2;
 import org.apache.flink.util.CloseableIterable;
 import org.apache.flink.util.IOUtils;
 
@@ -80,6 +83,10 @@ public class StreamOperatorStateHandler {
 
     protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
 
+    @Nullable private final AsyncKeyedStateBackend asyncKeyedStateBackend;
+
+    @Nullable private final KeyedStateStoreV2 keyedStateStoreV2;
+
     /** Backend for keyed state. This might be empty if we're not on a keyed 
stream. */
     @Nullable private final CheckpointableKeyedStateBackend<?> 
keyedStateBackend;
 
@@ -112,6 +119,12 @@ public class StreamOperatorStateHandler {
         } else {
             keyedStateStore = null;
         }
+
+        this.asyncKeyedStateBackend = context.asyncKeyedStateBackend();
+        this.keyedStateStoreV2 =
+                asyncKeyedStateBackend != null
+                        ? new DefaultKeyedStateStoreV2(asyncKeyedStateBackend)
+                        : null;
     }
 
     public void initializeOperatorState(CheckpointedStreamOperator 
streamOperator)
@@ -152,12 +165,18 @@ public class StreamOperatorStateHandler {
             if (closeableRegistry.unregisterCloseable(keyedStateBackend)) {
                 closer.register(keyedStateBackend);
             }
+            if (closeableRegistry.unregisterCloseable(asyncKeyedStateBackend)) 
{
+                closer.register(asyncKeyedStateBackend);
+            }
             if (operatorStateBackend != null) {
                 closer.register(operatorStateBackend::dispose);
             }
             if (keyedStateBackend != null) {
                 closer.register(keyedStateBackend::dispose);
             }
+            if (asyncKeyedStateBackend != null) {
+                closer.register(asyncKeyedStateBackend::dispose);
+            }
         }
     }
 
@@ -325,6 +344,11 @@ public class StreamOperatorStateHandler {
         return (KeyedStateBackend<K>) keyedStateBackend;
     }
 
+    @Nullable
+    public AsyncKeyedStateBackend getAsyncKeyedStateBackend() {
+        return asyncKeyedStateBackend;
+    }
+
     public OperatorStateBackend getOperatorStateBackend() {
         return operatorStateBackend;
     }
@@ -400,6 +424,10 @@ public class StreamOperatorStateHandler {
         return Optional.ofNullable(keyedStateStore);
     }
 
+    public Optional<KeyedStateStoreV2> getKeyedStateStoreV2() {
+        return Optional.ofNullable(keyedStateStoreV2);
+    }
+
     /** Custom state handling hooks to be invoked by {@link 
StreamOperatorStateHandler}. */
     public interface CheckpointedStreamOperator {
         void initializeState(StateInitializationContext context) throws 
Exception;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index 3e3a745c12a..65c04aef826 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManager
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -56,6 +57,7 @@ import 
org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
 import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.Disposable;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.clock.SystemClock;
 
@@ -64,6 +66,7 @@ import org.apache.commons.io.IOUtils;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -171,6 +174,7 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                 taskStateManager.prioritizedOperatorState(operatorID);
 
         CheckpointableKeyedStateBackend<?> keyedStatedBackend = null;
+        AsyncKeyedStateBackend asyncKeyedStateBackend = null;
         OperatorStateBackend operatorStateBackend = null;
         CloseableIterable<KeyGroupStatePartitionStreamProvider> 
rawKeyedStateInputs = null;
         CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs 
= null;
@@ -182,15 +186,30 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
         try {
 
             // -------------- Keyed State Backend --------------
-            keyedStatedBackend =
-                    keyedStatedBackend(
-                            keySerializer,
-                            operatorIdentifierText,
-                            prioritizedOperatorSubtaskStates,
-                            streamTaskCloseableRegistry,
-                            metricGroup,
-                            managedMemoryFraction,
-                            statsCollector);
+            // TODO: Support KeyedStateBackend for AsyncKeyedStateBackend to 
unify the logic
+            if (stateBackend.supportsAsyncKeyedStateBackend()) {
+                asyncKeyedStateBackend =
+                        keyedStatedBackend(
+                                keySerializer,
+                                operatorIdentifierText,
+                                prioritizedOperatorSubtaskStates,
+                                streamTaskCloseableRegistry,
+                                metricGroup,
+                                managedMemoryFraction,
+                                statsCollector,
+                                StateBackend::createAsyncKeyedStateBackend);
+            } else {
+                keyedStatedBackend =
+                        keyedStatedBackend(
+                                keySerializer,
+                                operatorIdentifierText,
+                                prioritizedOperatorSubtaskStates,
+                                streamTaskCloseableRegistry,
+                                metricGroup,
+                                managedMemoryFraction,
+                                statsCollector,
+                                StateBackend::createKeyedStateBackend);
+            }
 
             // -------------- Operator State Backend --------------
             operatorStateBackend =
@@ -244,6 +263,7 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
             } else {
                 timeServiceManager = null;
             }
+            // TODO: Support Timer for AsyncKeyedStateBackend
 
             // Add stats for input channel and result partition state
             Stream.concat(
@@ -269,6 +289,7 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                     prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),
                     operatorStateBackend,
                     keyedStatedBackend,
+                    asyncKeyedStateBackend,
                     timeServiceManager,
                     rawOperatorStateInputs,
                     rawKeyedStateInputs);
@@ -283,6 +304,14 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                 keyedStatedBackend.dispose();
             }
 
+            if (asyncKeyedStateBackend != null) {
+                if 
(streamTaskCloseableRegistry.unregisterCloseable(asyncKeyedStateBackend)) {
+                    IOUtils.closeQuietly(asyncKeyedStateBackend);
+                }
+                // release resource (e.g native resource)
+                asyncKeyedStateBackend.dispose();
+            }
+
             if (operatorStateBackend != null) {
                 if 
(streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
                     IOUtils.closeQuietly(operatorStateBackend);
@@ -364,14 +393,15 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
         }
     }
 
-    protected <K> CheckpointableKeyedStateBackend<K> keyedStatedBackend(
+    protected <K, R extends Disposable & Closeable> R keyedStatedBackend(
             TypeSerializer<K> keySerializer,
             String operatorIdentifierText,
             PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,
             CloseableRegistry backendCloseableRegistry,
             MetricGroup metricGroup,
             double managedMemoryFraction,
-            StateObject.StateObjectSizeStatsCollector statsCollector)
+            StateObject.StateObjectSizeStatsCollector statsCollector,
+            KeyedStateBackendCreator<K, R> keyedStateBackendCreator)
             throws Exception {
 
         if (keySerializer == null) {
@@ -395,35 +425,33 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
         // input stream opened for serDe during restore.
         CloseableRegistry cancelStreamRegistryForRestore = new 
CloseableRegistry();
         
backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);
-        BackendRestorerProcedure<CheckpointableKeyedStateBackend<K>, 
KeyedStateHandle>
-                backendRestorer =
-                        new BackendRestorerProcedure<>(
-                                (stateHandles) -> {
-                                    KeyedStateBackendParametersImpl<K> 
parameters =
-                                            new 
KeyedStateBackendParametersImpl<>(
-                                                    environment,
-                                                    environment.getJobID(),
-                                                    operatorIdentifierText,
-                                                    keySerializer,
-                                                    
taskInfo.getMaxNumberOfParallelSubtasks(),
-                                                    keyGroupRange,
-                                                    
environment.getTaskKvStateRegistry(),
-                                                    ttlTimeProvider,
-                                                    metricGroup,
-                                                    
initializationMetrics::addDurationMetric,
-                                                    stateHandles,
-                                                    
cancelStreamRegistryForRestore,
-                                                    managedMemoryFraction);
-                                    return 
loadStateBackendFromKeyedStateHandles(
-                                                    stateBackend,
-                                                    environment
-                                                            
.getUserCodeClassLoader()
-                                                            .asClassLoader(),
-                                                    stateHandles)
-                                            
.createKeyedStateBackend(parameters);
-                                },
-                                backendCloseableRegistry,
-                                logDescription);
+        BackendRestorerProcedure<R, KeyedStateHandle> backendRestorer =
+                new BackendRestorerProcedure<>(
+                        (stateHandles) -> {
+                            KeyedStateBackendParametersImpl<K> parameters =
+                                    new KeyedStateBackendParametersImpl<>(
+                                            environment,
+                                            environment.getJobID(),
+                                            operatorIdentifierText,
+                                            keySerializer,
+                                            
taskInfo.getMaxNumberOfParallelSubtasks(),
+                                            keyGroupRange,
+                                            
environment.getTaskKvStateRegistry(),
+                                            ttlTimeProvider,
+                                            metricGroup,
+                                            
initializationMetrics::addDurationMetric,
+                                            stateHandles,
+                                            cancelStreamRegistryForRestore,
+                                            managedMemoryFraction);
+                            return keyedStateBackendCreator.create(
+                                    loadStateBackendFromKeyedStateHandles(
+                                            stateBackend,
+                                            
environment.getUserCodeClassLoader().asClassLoader(),
+                                            stateHandles),
+                                    parameters);
+                        },
+                        backendCloseableRegistry,
+                        logDescription);
 
         try {
             return backendRestorer.createAndRestore(
@@ -436,6 +464,17 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
         }
     }
 
+    /** Functional interface to create the keyed state backend. */
+    @FunctionalInterface
+    protected interface KeyedStateBackendCreator<K, R extends Disposable & 
Closeable> {
+
+        /** Create the keyed state backend. */
+        R create(
+                StateBackend stateBackend,
+                StateBackend.KeyedStateBackendParameters<K> 
keyedStateBackendParameters)
+                throws Exception;
+    }
+
     protected CloseableIterable<StatePartitionStreamProvider> 
rawOperatorStateInputs(
             @Nonnull Iterator<StateObjectCollection<OperatorStateHandle>> 
restoreStateAlternatives,
             @Nonnull StateObject.StateObjectSizeStatsCollector statsCollector) 
{
@@ -733,6 +772,7 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
 
         private final OperatorStateBackend operatorStateBackend;
         private final CheckpointableKeyedStateBackend<?> keyedStateBackend;
+        private final AsyncKeyedStateBackend asyncKeyedStateBackend;
         private final InternalTimeServiceManager<?> internalTimeServiceManager;
 
         private final CloseableIterable<StatePartitionStreamProvider> 
rawOperatorStateInputs;
@@ -742,6 +782,7 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                 @Nullable Long restoredCheckpointId,
                 OperatorStateBackend operatorStateBackend,
                 CheckpointableKeyedStateBackend<?> keyedStateBackend,
+                AsyncKeyedStateBackend asyncKeyedStateBackend,
                 InternalTimeServiceManager<?> internalTimeServiceManager,
                 CloseableIterable<StatePartitionStreamProvider> 
rawOperatorStateInputs,
                 CloseableIterable<KeyGroupStatePartitionStreamProvider> 
rawKeyedStateInputs) {
@@ -749,6 +790,7 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
             this.restoredCheckpointId = restoredCheckpointId;
             this.operatorStateBackend = operatorStateBackend;
             this.keyedStateBackend = keyedStateBackend;
+            this.asyncKeyedStateBackend = asyncKeyedStateBackend;
             this.internalTimeServiceManager = internalTimeServiceManager;
             this.rawOperatorStateInputs = rawOperatorStateInputs;
             this.rawKeyedStateInputs = rawKeyedStateInputs;
@@ -766,6 +808,11 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
             return keyedStateBackend;
         }
 
+        @Override
+        public AsyncKeyedStateBackend asyncKeyedStateBackend() {
+            return asyncKeyedStateBackend;
+        }
+
         @Override
         public OperatorStateBackend operatorStateBackend() {
             return operatorStateBackend;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 75bed241a4c..2e82d1c2b08 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.state.v2.KeyedStateStoreV2;
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -69,6 +70,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
     private final String operatorUniqueID;
     private final ProcessingTimeService processingTimeService;
     private @Nullable KeyedStateStore keyedStateStore;
+    private @Nullable KeyedStateStoreV2 keyedStateStoreV2;
     private final ExternalResourceInfoProvider externalResourceInfoProvider;
 
     @VisibleForTesting
@@ -114,6 +116,10 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
         this.keyedStateStore = keyedStateStore;
     }
 
+    public void setKeyedStateStoreV2(@Nullable KeyedStateStoreV2 
keyedStateStoreV2) {
+        this.keyedStateStoreV2 = keyedStateStoreV2;
+    }
+
     // ------------------------------------------------------------------------
 
     /**
@@ -242,6 +248,25 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
         return keyedStateStore;
     }
 
+    // TODO: Reconstruct this after StateManager is ready in FLIP-410.
+    public <T> org.apache.flink.api.common.state.v2.ValueState<T> 
getValueState(
+            org.apache.flink.runtime.state.v2.ValueStateDescriptor<T> 
stateProperties) {
+        KeyedStateStoreV2 keyedStateStoreV2 =
+                checkPreconditionsAndGetKeyedStateStoreV2(stateProperties);
+        return keyedStateStoreV2.getValueState(stateProperties);
+    }
+
+    private KeyedStateStoreV2 checkPreconditionsAndGetKeyedStateStoreV2(
+            org.apache.flink.runtime.state.v2.StateDescriptor<?> 
stateDescriptor) {
+        checkNotNull(stateDescriptor, "The state properties must not be null");
+        checkNotNull(
+                keyedStateStoreV2,
+                String.format(
+                        "Keyed state '%s' with type %s can only be used on a 
'keyed stream', i.e., after a 'keyBy()' operation.",
+                        stateDescriptor.getStateId(), 
stateDescriptor.getType()));
+        return keyedStateStoreV2;
+    }
+
     // ------------------ expose (read only) relevant information from the 
stream config -------- //
 
     /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
index b23aa74c7f8..99439de600b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
@@ -27,15 +27,15 @@ import 
org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.asyncprocessing.RecordContext;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
-import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -43,6 +43,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.function.ThrowingConsumer;
 import org.apache.flink.util.function.ThrowingRunnable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -61,11 +62,11 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> 
extends AbstractStre
 
     /** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
     @Override
-    public void setup(
-            StreamTask<?, ?> containingTask,
-            StreamConfig config,
-            Output<StreamRecord<OUT>> output) {
-        super.setup(containingTask, config, output);
+    public void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
+            throws Exception {
+        super.initializeState(streamTaskStateManager);
+        
getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null));
+        final StreamTask<?, ?> containingTask = 
checkNotNull(getContainingTask());
         final Environment environment = containingTask.getEnvironment();
         final MailboxExecutor mailboxExecutor = 
environment.getMainMailboxExecutor();
         final int maxParallelism = 
environment.getTaskInfo().getMaxNumberOfParallelSubtasks();
@@ -74,15 +75,22 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> 
extends AbstractStre
         final int asyncBufferSize = 
environment.getExecutionConfig().getAsyncStateBufferSize();
         final long asyncBufferTimeout =
                 environment.getExecutionConfig().getAsyncStateBufferTimeout();
-        // TODO: initial state executor and set state executor for aec
-        this.asyncExecutionController =
-                new AsyncExecutionController(
-                        mailboxExecutor,
-                        null,
-                        maxParallelism,
-                        asyncBufferSize,
-                        asyncBufferTimeout,
-                        inFlightRecordsLimit);
+
+        AsyncKeyedStateBackend asyncKeyedStateBackend = 
stateHandler.getAsyncKeyedStateBackend();
+        if (asyncKeyedStateBackend != null) {
+            this.asyncExecutionController =
+                    new AsyncExecutionController(
+                            mailboxExecutor,
+                            asyncKeyedStateBackend.createStateExecutor(),
+                            maxParallelism,
+                            asyncBufferSize,
+                            asyncBufferTimeout,
+                            inFlightRecordsLimit);
+            asyncKeyedStateBackend.setup(asyncExecutionController);
+        } else if (stateHandler.getKeyedStateBackend() != null) {
+            throw new UnsupportedOperationException(
+                    "Current State Backend doesn't support async access, 
AsyncExecutionController could not work");
+        }
     }
 
     @Override
@@ -220,6 +228,29 @@ public abstract class 
AbstractAsyncStateStreamOperator<OUT> extends AbstractStre
                 (AsyncExecutionController<K>) asyncExecutionController);
     }
 
+    @Override
+    @SuppressWarnings("unchecked")
+    public void setKeyContextElement1(StreamRecord record) throws Exception {
+        super.setKeyContextElement1(record);
+        if (stateKeySelector1 != null) {
+            setAsyncKeyedContextElement(record, stateKeySelector1);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void setKeyContextElement2(StreamRecord record) throws Exception {
+        super.setKeyContextElement2(record);
+        if (stateKeySelector2 != null) {
+            setAsyncKeyedContextElement(record, stateKeySelector2);
+        }
+    }
+
+    @Override
+    public Object getCurrentKey() {
+        return currentProcessingContext.getKey();
+    }
+
     @VisibleForTesting
     AsyncExecutionController<?> getAsyncExecutionController() {
         return asyncExecutionController;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
index 2c769af2918..98b542a2964 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.asyncprocessing.RecordContext;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
@@ -70,19 +71,28 @@ public abstract class 
AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt
     public final void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
             throws Exception {
         super.initializeState(streamTaskStateManager);
+        
getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null));
 
         final int inFlightRecordsLimit = 
getExecutionConfig().getAsyncInflightRecordsLimit();
         final int asyncBufferSize = 
getExecutionConfig().getAsyncStateBufferSize();
         final long asyncBufferTimeout = 
getExecutionConfig().getAsyncStateBufferTimeout();
         int maxParallelism = getExecutionConfig().getMaxParallelism();
-        this.asyncExecutionController =
-                new AsyncExecutionController(
-                        mailboxExecutor,
-                        null,
-                        maxParallelism,
-                        asyncBufferSize,
-                        asyncBufferTimeout,
-                        inFlightRecordsLimit);
+
+        AsyncKeyedStateBackend asyncKeyedStateBackend = 
stateHandler.getAsyncKeyedStateBackend();
+        if (asyncKeyedStateBackend != null) {
+            this.asyncExecutionController =
+                    new AsyncExecutionController(
+                            mailboxExecutor,
+                            asyncKeyedStateBackend.createStateExecutor(),
+                            maxParallelism,
+                            asyncBufferSize,
+                            asyncBufferTimeout,
+                            inFlightRecordsLimit);
+            asyncKeyedStateBackend.setup(asyncExecutionController);
+        } else if (stateHandler.getKeyedStateBackend() != null) {
+            throw new UnsupportedOperationException(
+                    "Current State Backend doesn't support async access, 
AsyncExecutionController could not work");
+        }
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
index 26e23fb5db3..985b4a2b9fd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.MockStateExecutor;
 import org.apache.flink.runtime.asyncprocessing.RecordContext;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
@@ -30,7 +31,6 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.PriorityQueueSetFactory;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
-import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.MockStateExecutor;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 24e4bf5d209..58f68674390 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.SerializerFactory;
+import org.apache.flink.api.common.serialization.SerializerConfig;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -51,6 +52,7 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
 import org.apache.flink.runtime.state.DefaultKeyedStateStore;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
@@ -59,6 +61,7 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.runtime.state.v2.DefaultKeyedStateStoreV2;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -245,6 +248,31 @@ class StreamingRuntimeContextTest {
         assertThat(value.iterator()).isExhausted();
     }
 
+    @Test
+    void testAsyncValueStateInstantiation() throws Exception {
+
+        final ExecutionConfig config = new ExecutionConfig();
+        SerializerConfig serializerConfig = config.getSerializerConfig();
+        serializerConfig.registerKryoType(Path.class);
+
+        final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
+
+        StreamingRuntimeContext context = 
createRuntimeContext(descriptorCapture, config);
+        org.apache.flink.runtime.state.v2.ValueStateDescriptor<TaskInfo> descr 
=
+                new org.apache.flink.runtime.state.v2.ValueStateDescriptor<>(
+                        "name", TypeInformation.of(TaskInfo.class), 
serializerConfig);
+        context.getValueState(descr);
+
+        org.apache.flink.runtime.state.v2.ValueStateDescriptor<?> 
descrIntercepted =
+                (org.apache.flink.runtime.state.v2.ValueStateDescriptor<?>) 
descriptorCapture.get();
+        TypeSerializer<?> serializer = descrIntercepted.getSerializer();
+
+        // check that the Path class is really registered, i.e., the execution 
config was applied
+        assertThat(serializer).isInstanceOf(KryoSerializer.class);
+        assertThat(((KryoSerializer<?>) 
serializer).getKryo().getRegistration(Path.class).getId())
+                .isPositive();
+    }
+
     // ------------------------------------------------------------------------
     //
     // ------------------------------------------------------------------------
@@ -308,6 +336,8 @@ class StreamingRuntimeContextTest {
 
         KeyedStateBackend keyedStateBackend = mock(KeyedStateBackend.class);
 
+        AsyncKeyedStateBackend asyncKeyedStateBackend = 
mock(AsyncKeyedStateBackend.class);
+
         DefaultKeyedStateStore keyedStateStore =
                 new DefaultKeyedStateStore(
                         keyedStateBackend,
@@ -321,21 +351,28 @@ class StreamingRuntimeContextTest {
                         });
 
         doAnswer(
-                        new Answer<Object>() {
-
-                            @Override
-                            public Object answer(InvocationOnMock 
invocationOnMock)
-                                    throws Throwable {
-                                ref.set(invocationOnMock.getArguments()[2]);
-                                return null;
-                            }
-                        })
+                        (Answer<Object>)
+                                invocationOnMock -> {
+                                    
ref.set(invocationOnMock.getArguments()[2]);
+                                    return null;
+                                })
                 .when(keyedStateBackend)
                 .getPartitionedState(
                         Matchers.any(), any(TypeSerializer.class), 
any(StateDescriptor.class));
 
+        doAnswer(
+                        (Answer<Object>)
+                                invocationOnMock -> {
+                                    
ref.set(invocationOnMock.getArguments()[0]);
+                                    return null;
+                                })
+                .when(asyncKeyedStateBackend)
+                
.createState(any(org.apache.flink.runtime.state.v2.StateDescriptor.class));
+
         operator.initializeState(streamTaskStateManager);
         operator.getRuntimeContext().setKeyedStateStore(keyedStateStore);
+        operator.getRuntimeContext()
+                .setKeyedStateStoreV2(new 
DefaultKeyedStateStoreV2(asyncKeyedStateBackend));
 
         return operator;
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java
index 17e36fb3af3..e4946b8de00 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
@@ -39,12 +40,14 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.function.ThrowingConsumer;
 
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static 
org.apache.flink.runtime.state.StateBackendTestUtils.buildAsyncStateBackend;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Basic tests for {@link AbstractAsyncStateStreamOperator}. */
@@ -55,13 +58,17 @@ public class AbstractAsyncStateStreamOperatorTest {
                     int maxParalelism, int numSubtasks, int subtaskIndex, 
ElementOrder elementOrder)
                     throws Exception {
         TestOperator testOperator = new TestOperator(elementOrder);
-        return new KeyedOneInputStreamOperatorTestHarness<>(
-                testOperator,
-                new TestKeySelector(),
-                BasicTypeInfo.INT_TYPE_INFO,
-                maxParalelism,
-                numSubtasks,
-                subtaskIndex);
+        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String>
+                testHarness =
+                        new KeyedOneInputStreamOperatorTestHarness<>(
+                                testOperator,
+                                new TestKeySelector(),
+                                BasicTypeInfo.INT_TYPE_INFO,
+                                maxParalelism,
+                                numSubtasks,
+                                subtaskIndex);
+        testHarness.setStateBackend(buildAsyncStateBackend(new 
HashMapStateBackend()));
+        return testHarness;
     }
 
     @Test
@@ -75,6 +82,11 @@ public class AbstractAsyncStateStreamOperatorTest {
                             ((AbstractAsyncStateStreamOperator) 
testHarness.getOperator())
                                     .getAsyncExecutionController())
                     .isNotNull();
+            assertThat(
+                            ((AbstractAsyncStateStreamOperator) 
testHarness.getOperator())
+                                    .getAsyncExecutionController()
+                                    .getStateExecutor())
+                    .isNotNull();
         }
     }
 
@@ -150,7 +162,6 @@ public class AbstractAsyncStateStreamOperatorTest {
             AsyncExecutionController asyncExecutionController =
                     ((AbstractAsyncStateStreamOperator) 
testHarness.getOperator())
                             .getAsyncExecutionController();
-            asyncExecutionController.setStateExecutor(new MockStateExecutor());
             ((AbstractAsyncStateStreamOperator<String>) 
testHarness.getOperator())
                     .setAsyncKeyedContextElement(
                             new StreamRecord<>(Tuple2.of(5, "5")), new 
TestKeySelector());
@@ -171,6 +182,7 @@ public class AbstractAsyncStateStreamOperatorTest {
         }
     }
 
+    @Disabled("Support Timer for AsyncKeyedStateBackend")
     @Test
     void testTimerServiceIsAsync() throws Exception {
         try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String>
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java
index be004e711ff..863c09fd88d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
 import org.apache.flink.streaming.api.operators.AbstractInput;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
@@ -55,6 +56,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static 
org.apache.flink.runtime.state.StateBackendTestUtils.buildAsyncStateBackend;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Basic tests for {@link AbstractAsyncStateStreamOperatorV2}. */
@@ -64,13 +66,17 @@ public class AbstractAsyncStateStreamOperatorV2Test {
             createTestHarness(
                     int maxParalelism, int numSubtasks, int subtaskIndex, 
ElementOrder elementOrder)
                     throws Exception {
-        return new KeyedOneInputStreamOperatorV2TestHarness<>(
-                new TestOperatorFactory(elementOrder),
-                new AbstractAsyncStateStreamOperatorTest.TestKeySelector(),
-                BasicTypeInfo.INT_TYPE_INFO,
-                maxParalelism,
-                numSubtasks,
-                subtaskIndex);
+        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, 
String>, String>
+                testHarness =
+                        new KeyedOneInputStreamOperatorV2TestHarness<>(
+                                new TestOperatorFactory(elementOrder),
+                                new TestKeySelector(),
+                                BasicTypeInfo.INT_TYPE_INFO,
+                                maxParalelism,
+                                numSubtasks,
+                                subtaskIndex);
+        testHarness.setStateBackend(buildAsyncStateBackend(new 
HashMapStateBackend()));
+        return testHarness;
     }
 
     @Test
@@ -84,6 +90,11 @@ public class AbstractAsyncStateStreamOperatorV2Test {
                             ((AbstractAsyncStateStreamOperatorV2) 
testHarness.getBaseOperator())
                                     .getAsyncExecutionController())
                     .isNotNull();
+            assertThat(
+                            ((AbstractAsyncStateStreamOperatorV2) 
testHarness.getBaseOperator())
+                                    .getAsyncExecutionController()
+                                    .getStateExecutor())
+                    .isNotNull();
         }
     }
 
@@ -162,7 +173,6 @@ public class AbstractAsyncStateStreamOperatorV2Test {
                     CheckpointStorageLocationReference.getDefault();
             AsyncExecutionController asyncExecutionController =
                     testOperator.getAsyncExecutionController();
-            asyncExecutionController.setStateExecutor(new MockStateExecutor());
             testOperator.setAsyncKeyedContextElement(
                     new StreamRecord<>(Tuple2.of(5, "5")), new 
TestKeySelector());
             asyncExecutionController.handleRequest(null, 
StateRequestType.VALUE_GET, null);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index ccf6ab98311..70f8d46fa96 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -76,6 +76,7 @@ import 
org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.DoneFuture;
@@ -2353,6 +2354,11 @@ public class StreamTaskTest {
                         return controller.keyedStateBackend();
                     }
 
+                    @Override
+                    public AsyncKeyedStateBackend asyncKeyedStateBackend() {
+                        return controller.asyncKeyedStateBackend();
+                    }
+
                     @Override
                     public InternalTimeServiceManager<?> 
internalTimerServiceManager() {
                         InternalTimeServiceManager<?> timeServiceManager =
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index f31dee84ad8..b2e7a6b169e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -50,7 +50,9 @@ import 
org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendTestUtils;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.ttl.MockTtlTimeProvider;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -73,6 +75,7 @@ import 
org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -303,6 +306,14 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
         ttlTimeProvider = new MockTtlTimeProvider();
         ttlTimeProvider.setCurrentTimestamp(0);
 
+        if (operator instanceof AsyncStateProcessing
+                || (factory instanceof SimpleOperatorFactory
+                        && ((SimpleOperatorFactory<OUT>) factory).getOperator()
+                                instanceof AsyncStateProcessing)) {
+            setStateBackend(
+                    StateBackendTestUtils.buildAsyncStateBackend(new 
HashMapStateBackend()));
+        }
+
         this.streamTaskStateInitializer =
                 createStreamTaskStateManager(
                         environment, stateBackend, ttlTimeProvider, 
timeServiceManagerProvider);

Reply via email to