This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bdb4f752ad [FLINK-34078][state]Move InternalKeyContext classes from 
o.a.f.runtime.state.heap to o.a.f.runtime.state package
6bdb4f752ad is described below

commit 6bdb4f752adb2b43dbadd8ad4fffcb4c00568dd3
Author: Jinzhong Li <lijinzhong2...@gmail.com>
AuthorDate: Mon Jan 15 18:29:56 2024 +0800

    [FLINK-34078][state]Move InternalKeyContext classes from 
o.a.f.runtime.state.heap to o.a.f.runtime.state package
---
 .../org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java  | 4 ++--
 .../org/apache/flink/runtime/state/AbstractKeyedStateBackend.java    | 1 -
 .../apache/flink/runtime/state/{heap => }/InternalKeyContext.java    | 3 +--
 .../flink/runtime/state/{heap => }/InternalKeyContextImpl.java       | 5 +----
 .../org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java   | 1 +
 .../org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java   | 1 +
 .../flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java       | 2 ++
 .../flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java       | 1 +
 .../org/apache/flink/runtime/state/heap/HeapRestoreOperation.java    | 1 +
 .../flink/runtime/state/heap/HeapSavepointRestoreOperation.java      | 1 +
 .../main/java/org/apache/flink/runtime/state/heap/StateTable.java    | 1 +
 .../java/org/apache/flink/runtime/state/heap/StateTableFactory.java  | 1 +
 .../apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java   | 2 ++
 .../apache/flink/runtime/state/heap/InternalKeyContextImplTest.java  | 1 +
 .../org/apache/flink/runtime/state/heap/MockInternalKeyContext.java  | 2 ++
 .../apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java   | 2 +-
 .../flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java   | 2 +-
 .../org/apache/flink/state/changelog/AbstractStateChangeLogger.java  | 2 +-
 .../org/apache/flink/state/changelog/ChangelogAggregatingState.java  | 2 +-
 .../java/org/apache/flink/state/changelog/ChangelogListState.java    | 2 +-
 .../java/org/apache/flink/state/changelog/ChangelogMapState.java     | 2 +-
 .../org/apache/flink/state/changelog/ChangelogReducingState.java     | 2 +-
 .../java/org/apache/flink/state/changelog/ChangelogStateFactory.java | 2 +-
 .../java/org/apache/flink/state/changelog/ChangelogValueState.java   | 2 +-
 .../org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java    | 2 +-
 .../flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java    | 2 +-
 .../flink/state/changelog/restore/AggregatingStateChangeApplier.java | 2 +-
 .../flink/state/changelog/restore/ChangelogApplierFactory.java       | 2 +-
 .../flink/state/changelog/restore/ChangelogApplierFactoryImpl.java   | 2 +-
 .../apache/flink/state/changelog/restore/KvStateChangeApplier.java   | 2 +-
 .../apache/flink/state/changelog/restore/ListStateChangeApplier.java | 2 +-
 .../apache/flink/state/changelog/restore/MapStateChangeApplier.java  | 2 +-
 .../flink/state/changelog/restore/ReducingStateChangeApplier.java    | 2 +-
 .../flink/state/changelog/restore/ValueStateChangeApplier.java       | 2 +-
 .../org/apache/flink/state/changelog/ChangelogListStateTest.java     | 2 +-
 .../java/org/apache/flink/state/changelog/ChangelogMapStateTest.java | 2 +-
 .../apache/flink/state/changelog/KvStateChangeLoggerImplTest.java    | 2 +-
 .../state/changelog/PriorityQueueStateChangeLoggerImplTest.java      | 2 +-
 .../org/apache/flink/state/changelog/StateChangeLoggerTestBase.java  | 2 +-
 .../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java      | 2 +-
 .../contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java     | 4 ++--
 .../org/apache/flink/streaming/runtime/tasks/TestStateBackend.java   | 4 ++--
 42 files changed, 47 insertions(+), 38 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
index 41db35aefd6..334bd3b75fe 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
@@ -36,6 +36,8 @@ import 
org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.Keyed;
@@ -49,8 +51,6 @@ import 
org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.ttl.mock.MockRestoreOperation;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index f9c26725093..6225c1dac12 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -28,7 +28,6 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SnapshotType;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalKeyContext.java
similarity index 95%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalKeyContext.java
index 667f65fbd60..b4155f3b47c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalKeyContext.java
@@ -16,10 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.heap;
+package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.KeyGroupRange;
 
 import javax.annotation.Nonnull;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalKeyContextImpl.java
similarity index 93%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalKeyContextImpl.java
index 67e794ac959..6c0977448bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalKeyContextImpl.java
@@ -16,10 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+package org.apache.flink.runtime.state;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index 7227bf31fd6..787b4031ba0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 
 import javax.annotation.Nonnull;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 7edcfe45db5..daabc9302c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.HeapPriorityQueuesManager;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedStateFunction;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
index 6db0f7e4ad5..0406f1bd60d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
@@ -24,6 +24,8 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
 import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
index 8badfd28c70..b9bfee0e9a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.KeyExtractorFunction;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.Keyed;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
index 89fe3e88281..fcf0777452c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
index d52b7093556..984b4de31bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.ListDelimitedSerializer;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 9120da41581..af48a7f4e08 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.IterableStateSnapshot;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableFactory.java
index c6bed4997fd..617d716e1a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 
 /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
index 1c53e309c3d..537c0edd37f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshot;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java
index 77f718573bd..bade8448f4e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.KeyGroupRange;
 
 import org.junit.jupiter.api.Test;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/MockInternalKeyContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/MockInternalKeyContext.java
index de1d7e4efe2..4f9fd6e368a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/MockInternalKeyContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/MockInternalKeyContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index b8d5bc1d130..807356afa32 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.KeyExtractorFunction;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
@@ -46,7 +47,6 @@ import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTran
 import org.apache.flink.runtime.state.StateSnapshotTransformers;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
 import org.apache.flink.runtime.state.ttl.TtlStateFactory;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
index 47383ed9760..14f36ecfd2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import 
org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
index fa1af755752..0f626135429 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
@@ -19,11 +19,11 @@ package org.apache.flink.state.changelog;
 
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
 import org.apache.flink.util.function.ThrowingConsumer;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
index 2fa97124108..780f49be291 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
@@ -20,8 +20,8 @@ package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.State;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
index 9d077971ba7..d1df87c8765 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
@@ -21,8 +21,8 @@ package org.apache.flink.state.changelog;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
index f2829655203..0beebacc9e3 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java
index 068754ddcf5..c4ff3ec6f44 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java
@@ -20,8 +20,8 @@ package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.State;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateFactory.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateFactory.java
index 52aaa7fad3f..aa0ee677732 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateFactory.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateFactory.java
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.FlinkRuntimeException;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java
index d37026b7128..84caa7b3aa3 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java
@@ -20,8 +20,8 @@ package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.changelog.StateChange;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
index cf346295969..04a8b659fed 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
@@ -21,10 +21,10 @@ import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
index 0f348b7c1d8..b1785e32822 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
@@ -19,10 +19,10 @@ package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 
 import java.io.IOException;
 
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java
index aabe8cddc76..d990cb507a1 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java
@@ -18,7 +18,7 @@
 package org.apache.flink.state.changelog.restore;
 
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.state.changelog.StateChangeOperation;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java
index 7c6c0598de9..12a864bbdc8 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java
@@ -19,8 +19,8 @@ package org.apache.flink.state.changelog.restore;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java
index b11df22bfb2..7c4476e3673 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java
@@ -18,8 +18,8 @@
 package org.apache.flink.state.changelog.restore;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java
index 0fe8b995f3c..19248f179c9 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java
@@ -18,8 +18,8 @@
 package org.apache.flink.state.changelog.restore;
 
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalMergingState;
 import org.apache.flink.state.changelog.StateChangeOperation;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
index 1f7b05e127b..aaac5341f3e 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
@@ -19,7 +19,7 @@ package org.apache.flink.state.changelog.restore;
 
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.state.changelog.StateChangeOperation;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java
index b498329c5ba..6eb64213488 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java
@@ -19,7 +19,7 @@ package org.apache.flink.state.changelog.restore;
 
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.state.changelog.StateChangeOperation;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java
index 80aeaf9b383..9bd94238e65 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java
@@ -18,7 +18,7 @@
 package org.apache.flink.state.changelog.restore;
 
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.state.changelog.StateChangeOperation;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
index c32db7d906a..8525451b1bf 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
@@ -18,7 +18,7 @@
 package org.apache.flink.state.changelog.restore;
 
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.state.changelog.StateChangeOperation;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java
index 9a7420b80e4..16f58c48fdc 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java
index f28befc060f..058e96ad3f1 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
index 50a60707465..523401c5569 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.state.changelog;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 
 import java.io.IOException;
 import java.util.Collections;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
index d41b447bd11..ebb0a509561 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
@@ -18,8 +18,8 @@
 package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 
 /** {@link PriorityQueueStateChangeLoggerImpl} test. */
 public class PriorityQueueStateChangeLoggerImplTest extends 
StateChangeLoggerTestBase<Void> {
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
index bd57c8a3680..38efdebb255 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -18,10 +18,10 @@
 package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 
 import org.junit.Test;
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 9766ebde385..c91b63963f6 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.HeapPriorityQueuesManager;
+import org.apache.flink.runtime.state.InternalKeyContext;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -58,7 +59,6 @@ import 
org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.FileUtils;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 6d2ca01344d..f33ce8f0fba 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -39,6 +39,8 @@ import 
org.apache.flink.runtime.state.BackendBuildingException;
 import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
@@ -48,8 +50,6 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.FileUtils;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
index 20da063f10c..ab2a0629fcc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -46,8 +48,6 @@ import org.apache.flink.runtime.state.SnapshotStrategy;
 import org.apache.flink.runtime.state.SnapshotStrategyRunner;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
-import org.apache.flink.runtime.state.heap.InternalKeyContext;
-import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 

Reply via email to