This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2d74ee3966525be5ab3770a653c96a7d04d5f05a Author: Stefan Richter <srich...@confluent.io> AuthorDate: Thu Oct 19 14:31:10 2023 +0200 [FLINK-33341][state] Refactoring: introduce common superclass for all IncrementalKeyedStateHandle. --- .../metadata/MetadataV2V3SerializerBase.java | 2 +- .../state/AbstractIncrementalStateHandle.java | 106 ++++++++++++++++++ .../runtime/state/DirectoryKeyedStateHandle.java | 124 --------------------- .../runtime/state/IncrementalKeyedStateHandle.java | 3 + .../state/IncrementalLocalKeyedStateHandle.java | 122 ++++++++++---------- .../state/IncrementalRemoteKeyedStateHandle.java | 71 ++---------- .../changelog/ChangelogStateBackendHandle.java | 2 +- .../checkpoint/CheckpointCoordinatorTest.java | 2 +- .../flink/runtime/state/ChangelogTestUtils.java | 2 +- .../IncrementalRemoteKeyedStateHandleTest.java | 18 +-- .../RocksDBIncrementalRestoreOperation.java | 6 +- 11 files changed, 192 insertions(+), 266 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index db3b5dd22d0..bd91e8e1fbb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -341,7 +341,7 @@ public abstract class MetadataV2V3SerializerBase { dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); dos.writeLong(incrementalKeyedStateHandle.getCheckpointedSize()); - serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), dos); + serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaDataStateHandle(), dos); serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getSharedState(), dos); serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getPrivateState(), dos); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java new file mode 100644 index 00000000000..8c7ea74c33c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nonnull; + +import java.util.List; +import java.util.UUID; + +/** Abstract superclass for all {@link IncrementalKeyedStateHandle}. */ +public abstract class AbstractIncrementalStateHandle implements IncrementalKeyedStateHandle { + private static final long serialVersionUID = 1L; + + /** The checkpoint Id. */ + protected final long checkpointId; + + /** + * UUID to identify the backend which created this state handle. This is in creating the key for + * the {@link SharedStateRegistry}. + */ + protected final UUID backendIdentifier; + + /** The key-group range covered by this state handle. */ + protected final KeyGroupRange keyGroupRange; + + /** Shared state in the incremental checkpoint. */ + protected final List<HandleAndLocalPath> sharedState; + + /** Primary meta data state of the incremental checkpoint. */ + protected final StreamStateHandle metaStateHandle; + + /** Unique id for this state handle. */ + protected final StateHandleID stateHandleId; + + public AbstractIncrementalStateHandle( + UUID backendIdentifier, + KeyGroupRange keyGroupRange, + long checkpointId, + List<HandleAndLocalPath> sharedState, + StreamStateHandle metaStateHandle, + StateHandleID stateHandleId) { + this.checkpointId = checkpointId; + this.keyGroupRange = keyGroupRange; + this.backendIdentifier = backendIdentifier; + this.sharedState = sharedState; + this.metaStateHandle = metaStateHandle; + this.stateHandleId = stateHandleId; + } + + @Override + public long getCheckpointId() { + return checkpointId; + } + + @Nonnull + @Override + public UUID getBackendIdentifier() { + return backendIdentifier; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + @Nonnull + @Override + public List<HandleAndLocalPath> getSharedStateHandles() { + return sharedState; + } + + @Nonnull + @Override + public StreamStateHandle getMetaDataStateHandle() { + return metaStateHandle; + } + + @Override + public StateHandleID getStateHandleId() { + return stateHandleId; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals( + getKeyGroupRange().getIntersection(keyGroupRange)) + ? null + : this; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java deleted file mode 100644 index 3f922a2e968..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import javax.annotation.Nonnull; - -/** - * This class is a keyed state handle based on a directory. It combines a {@link - * DirectoryStateHandle} and a {@link KeyGroupRange}. - */ -public class DirectoryKeyedStateHandle implements KeyedStateHandle { - - private static final long serialVersionUID = 1L; - - /** The directory state handle. */ - @Nonnull private final DirectoryStateHandle directoryStateHandle; - - /** The key-group range. */ - @Nonnull private final KeyGroupRange keyGroupRange; - - private final StateHandleID stateHandleId; - - public DirectoryKeyedStateHandle( - @Nonnull DirectoryStateHandle directoryStateHandle, - @Nonnull KeyGroupRange keyGroupRange) { - - this.directoryStateHandle = directoryStateHandle; - this.keyGroupRange = keyGroupRange; - this.stateHandleId = StateHandleID.randomStateHandleId(); - } - - @Nonnull - public DirectoryStateHandle getDirectoryStateHandle() { - return directoryStateHandle; - } - - @Nonnull - @Override - public KeyGroupRange getKeyGroupRange() { - return keyGroupRange; - } - - @Override - public void discardState() throws Exception { - directoryStateHandle.discardState(); - } - - @Override - public long getStateSize() { - return directoryStateHandle.getStateSize(); - } - - @Override - public long getCheckpointedSize() { - return getStateSize(); - } - - @Override - public KeyedStateHandle getIntersection(KeyGroupRange otherKeyGroupRange) { - return this.keyGroupRange.getIntersection(otherKeyGroupRange).getNumberOfKeyGroups() > 0 - ? this - : null; - } - - @Override - public StateHandleID getStateHandleId() { - return stateHandleId; - } - - @Override - public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { - // Nothing to do, this is for local use only. - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - DirectoryKeyedStateHandle that = (DirectoryKeyedStateHandle) o; - - if (!getDirectoryStateHandle().equals(that.getDirectoryStateHandle())) { - return false; - } - return getKeyGroupRange().equals(that.getKeyGroupRange()); - } - - @Override - public int hashCode() { - int result = getDirectoryStateHandle().hashCode(); - result = 31 * result + getKeyGroupRange().hashCode(); - return result; - } - - @Override - public String toString() { - return "DirectoryKeyedStateHandle{" - + "directoryStateHandle=" - + directoryStateHandle - + ", keyGroupRange=" - + keyGroupRange - + '}'; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java index 9ce3cbd332f..f162efa936b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java @@ -42,6 +42,9 @@ public interface IncrementalKeyedStateHandle @Nonnull List<HandleAndLocalPath> getSharedStateHandles(); + @Nonnull + StreamStateHandle getMetaDataStateHandle(); + /** A Holder of StreamStateHandle and the corresponding localPath. */ final class HandleAndLocalPath implements Serializable { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java index d782cf886bd..f854c111c6e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java @@ -32,22 +32,11 @@ import java.util.UUID; * DirectoryStateHandle} that represents the directory of the native RocksDB snapshot, the key * groups, and a stream state handle for Flink's state meta data file. */ -public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle - implements IncrementalKeyedStateHandle { +public class IncrementalLocalKeyedStateHandle extends AbstractIncrementalStateHandle { private static final long serialVersionUID = 1L; - /** Id of the checkpoint that created this state handle. */ - @Nonnegative private final long checkpointId; - - /** UUID to identify the backend which created this state handle. */ - @Nonnull private final UUID backendIdentifier; - - /** Handle to Flink's state meta data. */ - @Nonnull private final StreamStateHandle metaDataState; - - /** All shared state handles and the corresponding localPath used by the checkpoint. */ - @Nonnull private final List<HandleAndLocalPath> sharedState; + private final DirectoryStateHandle directoryStateHandle; public IncrementalLocalKeyedStateHandle( @Nonnull UUID backendIdentifier, @@ -57,21 +46,14 @@ public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle @Nonnull StreamStateHandle metaDataState, @Nonnull List<HandleAndLocalPath> sharedState) { - super(directoryStateHandle, keyGroupRange); - this.backendIdentifier = backendIdentifier; - this.checkpointId = checkpointId; - this.metaDataState = metaDataState; - this.sharedState = new ArrayList<>(sharedState); - } - - @Nonnull - public StreamStateHandle getMetaDataState() { - return metaDataState; - } - - @Override - public long getCheckpointId() { - return checkpointId; + super( + backendIdentifier, + keyGroupRange, + checkpointId, + new ArrayList<>(sharedState), + metaDataState, + StateHandleID.randomStateHandleId()); + this.directoryStateHandle = directoryStateHandle; } @Override @@ -81,52 +63,23 @@ public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle checkpointId, getDirectoryStateHandle(), getKeyGroupRange(), - getMetaDataState(), + getMetaDataStateHandle(), getSharedStateHandles()); } - @Override - @Nonnull - public UUID getBackendIdentifier() { - return backendIdentifier; - } - - @Override - @Nonnull - public List<HandleAndLocalPath> getSharedStateHandles() { - return sharedState; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - - IncrementalLocalKeyedStateHandle that = (IncrementalLocalKeyedStateHandle) o; - - return getMetaDataState().equals(that.getMetaDataState()); - } - @Override public void discardState() throws Exception { Exception collectedEx = null; try { - super.discardState(); + directoryStateHandle.discardState(); } catch (Exception e) { collectedEx = e; } try { - metaDataState.discardState(); + metaStateHandle.discardState(); } catch (Exception e) { collectedEx = ExceptionUtils.firstOrSuppressed(e, collectedEx); } @@ -138,22 +91,61 @@ public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle @Override public long getStateSize() { - return super.getStateSize() + metaDataState.getStateSize(); + return directoryStateHandle.getStateSize() + metaStateHandle.getStateSize(); } @Override public int hashCode() { - int result = super.hashCode(); - result = 31 * result + getMetaDataState().hashCode(); + int result = directoryStateHandle.hashCode(); + result = 31 * result + getKeyGroupRange().hashCode(); + result = 31 * result + getMetaDataStateHandle().hashCode(); return result; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + IncrementalLocalKeyedStateHandle that = (IncrementalLocalKeyedStateHandle) o; + + return getKeyGroupRange().equals(that.keyGroupRange) + && getMetaDataStateHandle().equals(that.getMetaDataStateHandle()); + } + @Override public String toString() { return "IncrementalLocalKeyedStateHandle{" + "metaDataState=" - + metaDataState + + metaStateHandle + "} " - + super.toString(); + + "DirectoryKeyedStateHandle{" + + "directoryStateHandle=" + + directoryStateHandle + + ", keyGroupRange=" + + keyGroupRange + + '}'; + } + + @Override + public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { + // Nothing to do, this is for local use only. + } + + @Override + public long getCheckpointedSize() { + return directoryStateHandle.getStateSize(); + } + + @Nonnull + public DirectoryStateHandle getDirectoryStateHandle() { + return directoryStateHandle; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java index fd524d74f2a..86a5b59c168 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java @@ -56,7 +56,7 @@ import java.util.stream.Collectors; * should not be called from production code. This means this class is also not suited to serve as a * key, e.g. in hash maps. */ -public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateHandle { +public class IncrementalRemoteKeyedStateHandle extends AbstractIncrementalStateHandle { public static final long UNKNOWN_CHECKPOINTED_SIZE = -1L; @@ -65,31 +65,11 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH private static final long serialVersionUID = -8328808513197388231L; - /** - * UUID to identify the backend which created this state handle. This is in creating the key for - * the {@link SharedStateRegistry}. - */ - private final UUID backendIdentifier; - - /** The key-group range covered by this state handle. */ - private final KeyGroupRange keyGroupRange; - - /** The checkpoint Id. */ - private final long checkpointId; - - /** Shared state in the incremental checkpoint. */ - private final List<HandleAndLocalPath> sharedState; - /** Private state in the incremental checkpoint. */ private final List<HandleAndLocalPath> privateState; - /** Primary meta data state of the incremental checkpoint. */ - private final StreamStateHandle metaStateHandle; - private final long persistedSizeOfThisCheckpoint; - private final StateHandleID stateHandleId; - /** * Once the shared states are registered, it is the {@link SharedStateRegistry}'s responsibility * to cleanup those shared states. But in the cases where the state handle is discarded before @@ -147,18 +127,19 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH StreamStateHandle metaStateHandle, long persistedSizeOfThisCheckpoint, StateHandleID stateHandleId) { - this.backendIdentifier = Preconditions.checkNotNull(backendIdentifier); - this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); - this.checkpointId = checkpointId; - this.sharedState = Preconditions.checkNotNull(sharedState); + super( + Preconditions.checkNotNull(backendIdentifier), + Preconditions.checkNotNull(keyGroupRange), + checkpointId, + sharedState, + metaStateHandle, + stateHandleId); this.privateState = Preconditions.checkNotNull(privateState); - this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle); this.sharedStateRegistry = null; this.persistedSizeOfThisCheckpoint = persistedSizeOfThisCheckpoint == UNKNOWN_CHECKPOINTED_SIZE ? getStateSize() : persistedSizeOfThisCheckpoint; - this.stateHandleId = stateHandleId; } public static IncrementalRemoteKeyedStateHandle restore( @@ -181,16 +162,6 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH stateHandleId); } - @Override - public KeyGroupRange getKeyGroupRange() { - return keyGroupRange; - } - - @Override - public long getCheckpointId() { - return checkpointId; - } - @Override public CheckpointBoundKeyedStateHandle rebound(long checkpointId) { return new IncrementalRemoteKeyedStateHandle( @@ -212,15 +183,6 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH return privateState; } - public StreamStateHandle getMetaStateHandle() { - return metaStateHandle; - } - - @Nonnull - public UUID getBackendIdentifier() { - return backendIdentifier; - } - @Nonnull @Override public List<HandleAndLocalPath> getSharedStateHandles() { @@ -231,19 +193,6 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH return sharedStateRegistry; } - @Override - public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { - return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals( - this.keyGroupRange.getIntersection(keyGroupRange)) - ? null - : this; - } - - @Override - public StateHandleID getStateHandleId() { - return stateHandleId; - } - @Override public void discardState() throws Exception { @@ -390,7 +339,7 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH if (!getPrivateState().equals(that.getPrivateState())) { return false; } - return getMetaStateHandle().equals(that.getMetaStateHandle()); + return getMetaDataStateHandle().equals(that.getMetaDataStateHandle()); } /** This method should only be called in tests! This should never serve as key in a hash map. */ @@ -402,7 +351,7 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32)); result = 31 * result + getSharedState().hashCode(); result = 31 * result + getPrivateState().hashCode(); - result = 31 * result + getMetaStateHandle().hashCode(); + result = 31 * result + getMetaDataStateHandle().hashCode(); result = 31 * result + getStateHandleId().hashCode(); return result; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java index 2d786c44d0d..6c5c7feae06 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java @@ -174,7 +174,7 @@ public interface ChangelogStateBackendHandle StreamStateHandle castMetaStateHandle = restoreFileStateHandle( - incrementalRemoteKeyedStateHandle.getMetaStateHandle()); + incrementalRemoteKeyedStateHandle.getMetaDataStateHandle()); List<HandleAndLocalPath> castSharedStates = incrementalRemoteKeyedStateHandle.getSharedState().stream() .map( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 49a22a4d435..1174d9ee18b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -2987,7 +2987,7 @@ class CheckpointCoordinatorTest { handleAndLocalPath.getHandle(), TernaryBoolean.FALSE); } - verify(incrementalKeyedStateHandle.getMetaStateHandle(), never()) + verify(incrementalKeyedStateHandle.getMetaDataStateHandle(), never()) .discardState(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java index 890010e1f6d..1329665ea21 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java @@ -80,7 +80,7 @@ public class ChangelogTestUtils { stateHandle.getCheckpointId(), stateHandle.getSharedState(), stateHandle.getPrivateState(), - stateHandle.getMetaStateHandle(), + stateHandle.getMetaDataStateHandle(), stateHandle.getCheckpointedSize(), stateHandle.getStateHandleId()); this.stateHandle = stateHandle; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java index 9b12b3a43a1..1422bc8bcf0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java @@ -61,7 +61,7 @@ class IncrementalRemoteKeyedStateHandleTest { verifyDiscard(handleAndLocalPath.getHandle(), TernaryBoolean.TRUE); } - verify(stateHandle.getMetaStateHandle()).discardState(); + verify(stateHandle.getMetaDataStateHandle()).discardState(); } /** @@ -130,8 +130,8 @@ class IncrementalRemoteKeyedStateHandleTest { verify(handleAndLocalPath.getHandle(), times(0)).discardState(); } - verify(stateHandle1.getMetaStateHandle(), times(1)).discardState(); - verify(stateHandle2.getMetaStateHandle(), times(0)).discardState(); + verify(stateHandle1.getMetaDataStateHandle(), times(1)).discardState(); + verify(stateHandle2.getMetaDataStateHandle(), times(0)).discardState(); // We discard the second stateHandle2.discardState(); @@ -146,8 +146,8 @@ class IncrementalRemoteKeyedStateHandleTest { verifyDiscard(handleAndLocalPath.getHandle(), TernaryBoolean.TRUE); } - verify(stateHandle1.getMetaStateHandle(), times(1)).discardState(); - verify(stateHandle2.getMetaStateHandle(), times(1)).discardState(); + verify(stateHandle1.getMetaDataStateHandle(), times(1)).discardState(); + verify(stateHandle2.getMetaDataStateHandle(), times(1)).discardState(); } /** @@ -176,7 +176,7 @@ class IncrementalRemoteKeyedStateHandleTest { // Everything should be discarded for this handle stateHandleZ.discardState(); - verify(stateHandleZ.getMetaStateHandle(), times(1)).discardState(); + verify(stateHandleZ.getMetaDataStateHandle(), times(1)).discardState(); // Close the first registry stateRegistryA.close(); @@ -188,16 +188,16 @@ class IncrementalRemoteKeyedStateHandleTest { // Private state should still get discarded stateHandleY.discardState(); - verify(stateHandleY.getMetaStateHandle(), times(1)).discardState(); + verify(stateHandleY.getMetaDataStateHandle(), times(1)).discardState(); // This should still be unaffected - verify(stateHandleX.getMetaStateHandle(), never()).discardState(); + verify(stateHandleX.getMetaDataStateHandle(), never()).discardState(); // We re-register the handle with a new registry SharedStateRegistry sharedStateRegistryB = spy(new SharedStateRegistryImpl()); stateHandleX.registerSharedStates(sharedStateRegistryB, 0L); stateHandleX.discardState(); - verify(stateHandleX.getMetaStateHandle(), times(1)).discardState(); + verify(stateHandleX.getMetaDataStateHandle(), times(1)).discardState(); // Should be completely discarded because it is tracked through the new registry sharedStateRegistryB.unregisterUnusedState(1L); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index 3c36c754f3b..a62bbb4a70b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -252,7 +252,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper stateHandle.getCheckpointId(), new DirectoryStateHandle(downloadedState.getDownloadDestination()), stateHandle.getKeyGroupRange(), - stateHandle.getMetaStateHandle(), + stateHandle.getMetaDataStateHandle(), stateHandle.getSharedState())); } @@ -260,7 +260,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception { KeyedBackendSerializationProxy<K> serializationProxy = - readMetaData(localKeyedStateHandle.getMetaDataState()); + readMetaData(localKeyedStateHandle.getMetaDataStateHandle()); List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots(); @@ -474,7 +474,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper StateHandleDownloadSpec downloadRequest) throws Exception { KeyedBackendSerializationProxy<K> serializationProxy = - readMetaData(downloadRequest.getStateHandle().getMetaStateHandle()); + readMetaData(downloadRequest.getStateHandle().getMetaDataStateHandle()); // read meta data List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots();