This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2d74ee3966525be5ab3770a653c96a7d04d5f05a
Author: Stefan Richter <srich...@confluent.io>
AuthorDate: Thu Oct 19 14:31:10 2023 +0200

    [FLINK-33341][state] Refactoring: introduce common superclass for all 
IncrementalKeyedStateHandle.
---
 .../metadata/MetadataV2V3SerializerBase.java       |   2 +-
 .../state/AbstractIncrementalStateHandle.java      | 106 ++++++++++++++++++
 .../runtime/state/DirectoryKeyedStateHandle.java   | 124 ---------------------
 .../runtime/state/IncrementalKeyedStateHandle.java |   3 +
 .../state/IncrementalLocalKeyedStateHandle.java    | 122 ++++++++++----------
 .../state/IncrementalRemoteKeyedStateHandle.java   |  71 ++----------
 .../changelog/ChangelogStateBackendHandle.java     |   2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |   2 +-
 .../flink/runtime/state/ChangelogTestUtils.java    |   2 +-
 .../IncrementalRemoteKeyedStateHandleTest.java     |  18 +--
 .../RocksDBIncrementalRestoreOperation.java        |   6 +-
 11 files changed, 192 insertions(+), 266 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index db3b5dd22d0..bd91e8e1fbb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -341,7 +341,7 @@ public abstract class MetadataV2V3SerializerBase {
             
dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
             dos.writeLong(incrementalKeyedStateHandle.getCheckpointedSize());
 
-            
serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), 
dos);
+            
serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaDataStateHandle(),
 dos);
 
             
serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getSharedState(), 
dos);
             
serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getPrivateState(), 
dos);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java
new file mode 100644
index 00000000000..8c7ea74c33c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.UUID;
+
+/** Abstract superclass for all {@link IncrementalKeyedStateHandle}. */
+public abstract class AbstractIncrementalStateHandle implements 
IncrementalKeyedStateHandle {
+    private static final long serialVersionUID = 1L;
+
+    /** The checkpoint Id. */
+    protected final long checkpointId;
+
+    /**
+     * UUID to identify the backend which created this state handle. This is 
in creating the key for
+     * the {@link SharedStateRegistry}.
+     */
+    protected final UUID backendIdentifier;
+
+    /** The key-group range covered by this state handle. */
+    protected final KeyGroupRange keyGroupRange;
+
+    /** Shared state in the incremental checkpoint. */
+    protected final List<HandleAndLocalPath> sharedState;
+
+    /** Primary meta data state of the incremental checkpoint. */
+    protected final StreamStateHandle metaStateHandle;
+
+    /** Unique id for this state handle. */
+    protected final StateHandleID stateHandleId;
+
+    public AbstractIncrementalStateHandle(
+            UUID backendIdentifier,
+            KeyGroupRange keyGroupRange,
+            long checkpointId,
+            List<HandleAndLocalPath> sharedState,
+            StreamStateHandle metaStateHandle,
+            StateHandleID stateHandleId) {
+        this.checkpointId = checkpointId;
+        this.keyGroupRange = keyGroupRange;
+        this.backendIdentifier = backendIdentifier;
+        this.sharedState = sharedState;
+        this.metaStateHandle = metaStateHandle;
+        this.stateHandleId = stateHandleId;
+    }
+
+    @Override
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    @Nonnull
+    @Override
+    public UUID getBackendIdentifier() {
+        return backendIdentifier;
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyGroupRange;
+    }
+
+    @Nonnull
+    @Override
+    public List<HandleAndLocalPath> getSharedStateHandles() {
+        return sharedState;
+    }
+
+    @Nonnull
+    @Override
+    public StreamStateHandle getMetaDataStateHandle() {
+        return metaStateHandle;
+    }
+
+    @Override
+    public StateHandleID getStateHandleId() {
+        return stateHandleId;
+    }
+
+    @Override
+    public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+        return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(
+                        getKeyGroupRange().getIntersection(keyGroupRange))
+                ? null
+                : this;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java
deleted file mode 100644
index 3f922a2e968..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import javax.annotation.Nonnull;
-
-/**
- * This class is a keyed state handle based on a directory. It combines a 
{@link
- * DirectoryStateHandle} and a {@link KeyGroupRange}.
- */
-public class DirectoryKeyedStateHandle implements KeyedStateHandle {
-
-    private static final long serialVersionUID = 1L;
-
-    /** The directory state handle. */
-    @Nonnull private final DirectoryStateHandle directoryStateHandle;
-
-    /** The key-group range. */
-    @Nonnull private final KeyGroupRange keyGroupRange;
-
-    private final StateHandleID stateHandleId;
-
-    public DirectoryKeyedStateHandle(
-            @Nonnull DirectoryStateHandle directoryStateHandle,
-            @Nonnull KeyGroupRange keyGroupRange) {
-
-        this.directoryStateHandle = directoryStateHandle;
-        this.keyGroupRange = keyGroupRange;
-        this.stateHandleId = StateHandleID.randomStateHandleId();
-    }
-
-    @Nonnull
-    public DirectoryStateHandle getDirectoryStateHandle() {
-        return directoryStateHandle;
-    }
-
-    @Nonnull
-    @Override
-    public KeyGroupRange getKeyGroupRange() {
-        return keyGroupRange;
-    }
-
-    @Override
-    public void discardState() throws Exception {
-        directoryStateHandle.discardState();
-    }
-
-    @Override
-    public long getStateSize() {
-        return directoryStateHandle.getStateSize();
-    }
-
-    @Override
-    public long getCheckpointedSize() {
-        return getStateSize();
-    }
-
-    @Override
-    public KeyedStateHandle getIntersection(KeyGroupRange otherKeyGroupRange) {
-        return 
this.keyGroupRange.getIntersection(otherKeyGroupRange).getNumberOfKeyGroups() > 0
-                ? this
-                : null;
-    }
-
-    @Override
-    public StateHandleID getStateHandleId() {
-        return stateHandleId;
-    }
-
-    @Override
-    public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
-        // Nothing to do, this is for local use only.
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        DirectoryKeyedStateHandle that = (DirectoryKeyedStateHandle) o;
-
-        if (!getDirectoryStateHandle().equals(that.getDirectoryStateHandle())) 
{
-            return false;
-        }
-        return getKeyGroupRange().equals(that.getKeyGroupRange());
-    }
-
-    @Override
-    public int hashCode() {
-        int result = getDirectoryStateHandle().hashCode();
-        result = 31 * result + getKeyGroupRange().hashCode();
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "DirectoryKeyedStateHandle{"
-                + "directoryStateHandle="
-                + directoryStateHandle
-                + ", keyGroupRange="
-                + keyGroupRange
-                + '}';
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 9ce3cbd332f..f162efa936b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -42,6 +42,9 @@ public interface IncrementalKeyedStateHandle
     @Nonnull
     List<HandleAndLocalPath> getSharedStateHandles();
 
+    @Nonnull
+    StreamStateHandle getMetaDataStateHandle();
+
     /** A Holder of StreamStateHandle and the corresponding localPath. */
     final class HandleAndLocalPath implements Serializable {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
index d782cf886bd..f854c111c6e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
@@ -32,22 +32,11 @@ import java.util.UUID;
  * DirectoryStateHandle} that represents the directory of the native RocksDB 
snapshot, the key
  * groups, and a stream state handle for Flink's state meta data file.
  */
-public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle
-        implements IncrementalKeyedStateHandle {
+public class IncrementalLocalKeyedStateHandle extends 
AbstractIncrementalStateHandle {
 
     private static final long serialVersionUID = 1L;
 
-    /** Id of the checkpoint that created this state handle. */
-    @Nonnegative private final long checkpointId;
-
-    /** UUID to identify the backend which created this state handle. */
-    @Nonnull private final UUID backendIdentifier;
-
-    /** Handle to Flink's state meta data. */
-    @Nonnull private final StreamStateHandle metaDataState;
-
-    /** All shared state handles and the corresponding localPath used by the 
checkpoint. */
-    @Nonnull private final List<HandleAndLocalPath> sharedState;
+    private final DirectoryStateHandle directoryStateHandle;
 
     public IncrementalLocalKeyedStateHandle(
             @Nonnull UUID backendIdentifier,
@@ -57,21 +46,14 @@ public class IncrementalLocalKeyedStateHandle extends 
DirectoryKeyedStateHandle
             @Nonnull StreamStateHandle metaDataState,
             @Nonnull List<HandleAndLocalPath> sharedState) {
 
-        super(directoryStateHandle, keyGroupRange);
-        this.backendIdentifier = backendIdentifier;
-        this.checkpointId = checkpointId;
-        this.metaDataState = metaDataState;
-        this.sharedState = new ArrayList<>(sharedState);
-    }
-
-    @Nonnull
-    public StreamStateHandle getMetaDataState() {
-        return metaDataState;
-    }
-
-    @Override
-    public long getCheckpointId() {
-        return checkpointId;
+        super(
+                backendIdentifier,
+                keyGroupRange,
+                checkpointId,
+                new ArrayList<>(sharedState),
+                metaDataState,
+                StateHandleID.randomStateHandleId());
+        this.directoryStateHandle = directoryStateHandle;
     }
 
     @Override
@@ -81,52 +63,23 @@ public class IncrementalLocalKeyedStateHandle extends 
DirectoryKeyedStateHandle
                 checkpointId,
                 getDirectoryStateHandle(),
                 getKeyGroupRange(),
-                getMetaDataState(),
+                getMetaDataStateHandle(),
                 getSharedStateHandles());
     }
 
-    @Override
-    @Nonnull
-    public UUID getBackendIdentifier() {
-        return backendIdentifier;
-    }
-
-    @Override
-    @Nonnull
-    public List<HandleAndLocalPath> getSharedStateHandles() {
-        return sharedState;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        if (!super.equals(o)) {
-            return false;
-        }
-
-        IncrementalLocalKeyedStateHandle that = 
(IncrementalLocalKeyedStateHandle) o;
-
-        return getMetaDataState().equals(that.getMetaDataState());
-    }
-
     @Override
     public void discardState() throws Exception {
 
         Exception collectedEx = null;
 
         try {
-            super.discardState();
+            directoryStateHandle.discardState();
         } catch (Exception e) {
             collectedEx = e;
         }
 
         try {
-            metaDataState.discardState();
+            metaStateHandle.discardState();
         } catch (Exception e) {
             collectedEx = ExceptionUtils.firstOrSuppressed(e, collectedEx);
         }
@@ -138,22 +91,61 @@ public class IncrementalLocalKeyedStateHandle extends 
DirectoryKeyedStateHandle
 
     @Override
     public long getStateSize() {
-        return super.getStateSize() + metaDataState.getStateSize();
+        return directoryStateHandle.getStateSize() + 
metaStateHandle.getStateSize();
     }
 
     @Override
     public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + getMetaDataState().hashCode();
+        int result = directoryStateHandle.hashCode();
+        result = 31 * result + getKeyGroupRange().hashCode();
+        result = 31 * result + getMetaDataStateHandle().hashCode();
         return result;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+
+        IncrementalLocalKeyedStateHandle that = 
(IncrementalLocalKeyedStateHandle) o;
+
+        return getKeyGroupRange().equals(that.keyGroupRange)
+                && 
getMetaDataStateHandle().equals(that.getMetaDataStateHandle());
+    }
+
     @Override
     public String toString() {
         return "IncrementalLocalKeyedStateHandle{"
                 + "metaDataState="
-                + metaDataState
+                + metaStateHandle
                 + "} "
-                + super.toString();
+                + "DirectoryKeyedStateHandle{"
+                + "directoryStateHandle="
+                + directoryStateHandle
+                + ", keyGroupRange="
+                + keyGroupRange
+                + '}';
+    }
+
+    @Override
+    public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
+        // Nothing to do, this is for local use only.
+    }
+
+    @Override
+    public long getCheckpointedSize() {
+        return directoryStateHandle.getStateSize();
+    }
+
+    @Nonnull
+    public DirectoryStateHandle getDirectoryStateHandle() {
+        return directoryStateHandle;
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
index fd524d74f2a..86a5b59c168 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
@@ -56,7 +56,7 @@ import java.util.stream.Collectors;
  * should not be called from production code. This means this class is also 
not suited to serve as a
  * key, e.g. in hash maps.
  */
-public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateHandle {
+public class IncrementalRemoteKeyedStateHandle extends 
AbstractIncrementalStateHandle {
 
     public static final long UNKNOWN_CHECKPOINTED_SIZE = -1L;
 
@@ -65,31 +65,11 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
 
     private static final long serialVersionUID = -8328808513197388231L;
 
-    /**
-     * UUID to identify the backend which created this state handle. This is 
in creating the key for
-     * the {@link SharedStateRegistry}.
-     */
-    private final UUID backendIdentifier;
-
-    /** The key-group range covered by this state handle. */
-    private final KeyGroupRange keyGroupRange;
-
-    /** The checkpoint Id. */
-    private final long checkpointId;
-
-    /** Shared state in the incremental checkpoint. */
-    private final List<HandleAndLocalPath> sharedState;
-
     /** Private state in the incremental checkpoint. */
     private final List<HandleAndLocalPath> privateState;
 
-    /** Primary meta data state of the incremental checkpoint. */
-    private final StreamStateHandle metaStateHandle;
-
     private final long persistedSizeOfThisCheckpoint;
 
-    private final StateHandleID stateHandleId;
-
     /**
      * Once the shared states are registered, it is the {@link 
SharedStateRegistry}'s responsibility
      * to cleanup those shared states. But in the cases where the state handle 
is discarded before
@@ -147,18 +127,19 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
             StreamStateHandle metaStateHandle,
             long persistedSizeOfThisCheckpoint,
             StateHandleID stateHandleId) {
-        this.backendIdentifier = Preconditions.checkNotNull(backendIdentifier);
-        this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
-        this.checkpointId = checkpointId;
-        this.sharedState = Preconditions.checkNotNull(sharedState);
+        super(
+                Preconditions.checkNotNull(backendIdentifier),
+                Preconditions.checkNotNull(keyGroupRange),
+                checkpointId,
+                sharedState,
+                metaStateHandle,
+                stateHandleId);
         this.privateState = Preconditions.checkNotNull(privateState);
-        this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle);
         this.sharedStateRegistry = null;
         this.persistedSizeOfThisCheckpoint =
                 persistedSizeOfThisCheckpoint == UNKNOWN_CHECKPOINTED_SIZE
                         ? getStateSize()
                         : persistedSizeOfThisCheckpoint;
-        this.stateHandleId = stateHandleId;
     }
 
     public static IncrementalRemoteKeyedStateHandle restore(
@@ -181,16 +162,6 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
                 stateHandleId);
     }
 
-    @Override
-    public KeyGroupRange getKeyGroupRange() {
-        return keyGroupRange;
-    }
-
-    @Override
-    public long getCheckpointId() {
-        return checkpointId;
-    }
-
     @Override
     public CheckpointBoundKeyedStateHandle rebound(long checkpointId) {
         return new IncrementalRemoteKeyedStateHandle(
@@ -212,15 +183,6 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
         return privateState;
     }
 
-    public StreamStateHandle getMetaStateHandle() {
-        return metaStateHandle;
-    }
-
-    @Nonnull
-    public UUID getBackendIdentifier() {
-        return backendIdentifier;
-    }
-
     @Nonnull
     @Override
     public List<HandleAndLocalPath> getSharedStateHandles() {
@@ -231,19 +193,6 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
         return sharedStateRegistry;
     }
 
-    @Override
-    public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
-        return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(
-                        this.keyGroupRange.getIntersection(keyGroupRange))
-                ? null
-                : this;
-    }
-
-    @Override
-    public StateHandleID getStateHandleId() {
-        return stateHandleId;
-    }
-
     @Override
     public void discardState() throws Exception {
 
@@ -390,7 +339,7 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
         if (!getPrivateState().equals(that.getPrivateState())) {
             return false;
         }
-        return getMetaStateHandle().equals(that.getMetaStateHandle());
+        return getMetaDataStateHandle().equals(that.getMetaDataStateHandle());
     }
 
     /** This method should only be called in tests! This should never serve as 
key in a hash map. */
@@ -402,7 +351,7 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
         result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() 
>>> 32));
         result = 31 * result + getSharedState().hashCode();
         result = 31 * result + getPrivateState().hashCode();
-        result = 31 * result + getMetaStateHandle().hashCode();
+        result = 31 * result + getMetaDataStateHandle().hashCode();
         result = 31 * result + getStateHandleId().hashCode();
         return result;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
index 2d786c44d0d..6c5c7feae06 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
@@ -174,7 +174,7 @@ public interface ChangelogStateBackendHandle
 
                 StreamStateHandle castMetaStateHandle =
                         restoreFileStateHandle(
-                                
incrementalRemoteKeyedStateHandle.getMetaStateHandle());
+                                
incrementalRemoteKeyedStateHandle.getMetaDataStateHandle());
                 List<HandleAndLocalPath> castSharedStates =
                         
incrementalRemoteKeyedStateHandle.getSharedState().stream()
                                 .map(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 49a22a4d435..1174d9ee18b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2987,7 +2987,7 @@ class CheckpointCoordinatorTest {
                                         handleAndLocalPath.getHandle(), 
TernaryBoolean.FALSE);
                             }
 
-                            
verify(incrementalKeyedStateHandle.getMetaStateHandle(), never())
+                            
verify(incrementalKeyedStateHandle.getMetaDataStateHandle(), never())
                                     .discardState();
                         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java
index 890010e1f6d..1329665ea21 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java
@@ -80,7 +80,7 @@ public class ChangelogTestUtils {
                     stateHandle.getCheckpointId(),
                     stateHandle.getSharedState(),
                     stateHandle.getPrivateState(),
-                    stateHandle.getMetaStateHandle(),
+                    stateHandle.getMetaDataStateHandle(),
                     stateHandle.getCheckpointedSize(),
                     stateHandle.getStateHandleId());
             this.stateHandle = stateHandle;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
index 9b12b3a43a1..1422bc8bcf0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
@@ -61,7 +61,7 @@ class IncrementalRemoteKeyedStateHandleTest {
             verifyDiscard(handleAndLocalPath.getHandle(), TernaryBoolean.TRUE);
         }
 
-        verify(stateHandle.getMetaStateHandle()).discardState();
+        verify(stateHandle.getMetaDataStateHandle()).discardState();
     }
 
     /**
@@ -130,8 +130,8 @@ class IncrementalRemoteKeyedStateHandleTest {
             verify(handleAndLocalPath.getHandle(), times(0)).discardState();
         }
 
-        verify(stateHandle1.getMetaStateHandle(), times(1)).discardState();
-        verify(stateHandle2.getMetaStateHandle(), times(0)).discardState();
+        verify(stateHandle1.getMetaDataStateHandle(), times(1)).discardState();
+        verify(stateHandle2.getMetaDataStateHandle(), times(0)).discardState();
 
         // We discard the second
         stateHandle2.discardState();
@@ -146,8 +146,8 @@ class IncrementalRemoteKeyedStateHandleTest {
             verifyDiscard(handleAndLocalPath.getHandle(), TernaryBoolean.TRUE);
         }
 
-        verify(stateHandle1.getMetaStateHandle(), times(1)).discardState();
-        verify(stateHandle2.getMetaStateHandle(), times(1)).discardState();
+        verify(stateHandle1.getMetaDataStateHandle(), times(1)).discardState();
+        verify(stateHandle2.getMetaDataStateHandle(), times(1)).discardState();
     }
 
     /**
@@ -176,7 +176,7 @@ class IncrementalRemoteKeyedStateHandleTest {
 
         // Everything should be discarded for this handle
         stateHandleZ.discardState();
-        verify(stateHandleZ.getMetaStateHandle(), times(1)).discardState();
+        verify(stateHandleZ.getMetaDataStateHandle(), times(1)).discardState();
 
         // Close the first registry
         stateRegistryA.close();
@@ -188,16 +188,16 @@ class IncrementalRemoteKeyedStateHandleTest {
 
         // Private state should still get discarded
         stateHandleY.discardState();
-        verify(stateHandleY.getMetaStateHandle(), times(1)).discardState();
+        verify(stateHandleY.getMetaDataStateHandle(), times(1)).discardState();
 
         // This should still be unaffected
-        verify(stateHandleX.getMetaStateHandle(), never()).discardState();
+        verify(stateHandleX.getMetaDataStateHandle(), never()).discardState();
 
         // We re-register the handle with a new registry
         SharedStateRegistry sharedStateRegistryB = spy(new 
SharedStateRegistryImpl());
         stateHandleX.registerSharedStates(sharedStateRegistryB, 0L);
         stateHandleX.discardState();
-        verify(stateHandleX.getMetaStateHandle(), times(1)).discardState();
+        verify(stateHandleX.getMetaDataStateHandle(), times(1)).discardState();
 
         // Should be completely discarded because it is tracked through the 
new registry
         sharedStateRegistryB.unregisterUnusedState(1L);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 3c36c754f3b..a62bbb4a70b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -252,7 +252,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                         stateHandle.getCheckpointId(),
                         new 
DirectoryStateHandle(downloadedState.getDownloadDestination()),
                         stateHandle.getKeyGroupRange(),
-                        stateHandle.getMetaStateHandle(),
+                        stateHandle.getMetaDataStateHandle(),
                         stateHandle.getSharedState()));
     }
 
@@ -260,7 +260,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
     private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle 
localKeyedStateHandle)
             throws Exception {
         KeyedBackendSerializationProxy<K> serializationProxy =
-                readMetaData(localKeyedStateHandle.getMetaDataState());
+                readMetaData(localKeyedStateHandle.getMetaDataStateHandle());
         List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
                 serializationProxy.getStateMetaInfoSnapshots();
 
@@ -474,7 +474,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             StateHandleDownloadSpec downloadRequest) throws Exception {
 
         KeyedBackendSerializationProxy<K> serializationProxy =
-                
readMetaData(downloadRequest.getStateHandle().getMetaStateHandle());
+                
readMetaData(downloadRequest.getStateHandle().getMetaDataStateHandle());
         // read meta data
         List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
                 serializationProxy.getStateMetaInfoSnapshots();

Reply via email to