This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0be231972d5d687924db22ae81d6a10049f7c43e Author: Stefan Richter <srich...@confluent.io> AuthorDate: Fri Oct 27 11:01:49 2023 +0200 [FLINK-33341][state] Refactoring: consolidate equals/hashCode/toString for incremental state handle classes. --- .../state/AbstractIncrementalStateHandle.java | 36 +++++++++++ .../state/IncrementalLocalKeyedStateHandle.java | 35 +---------- .../state/IncrementalRemoteKeyedStateHandle.java | 69 ++-------------------- 3 files changed, 43 insertions(+), 97 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java index 8c7ea74c33c..85f12329f57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state; import javax.annotation.Nonnull; import java.util.List; +import java.util.Objects; import java.util.UUID; /** Abstract superclass for all {@link IncrementalKeyedStateHandle}. */ @@ -103,4 +104,39 @@ public abstract class AbstractIncrementalStateHandle implements IncrementalKeyed ? null : this; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AbstractIncrementalStateHandle that = (AbstractIncrementalStateHandle) o; + return Objects.equals(stateHandleId, that.stateHandleId); + } + + @Override + public int hashCode() { + return stateHandleId.hashCode(); + } + + @Override + public String toString() { + return "AbstractIncrementalStateHandle{" + + "checkpointId=" + + checkpointId + + ", backendIdentifier=" + + backendIdentifier + + ", keyGroupRange=" + + keyGroupRange + + ", sharedState=" + + sharedState + + ", metaStateHandle=" + + metaStateHandle + + ", stateHandleId=" + + stateHandleId + + '}'; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java index f854c111c6e..ac457f00622 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java @@ -94,44 +94,13 @@ public class IncrementalLocalKeyedStateHandle extends AbstractIncrementalStateHa return directoryStateHandle.getStateSize() + metaStateHandle.getStateSize(); } - @Override - public int hashCode() { - int result = directoryStateHandle.hashCode(); - result = 31 * result + getKeyGroupRange().hashCode(); - result = 31 * result + getMetaDataStateHandle().hashCode(); - return result; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - - IncrementalLocalKeyedStateHandle that = (IncrementalLocalKeyedStateHandle) o; - - return getKeyGroupRange().equals(that.keyGroupRange) - && getMetaDataStateHandle().equals(that.getMetaDataStateHandle()); - } - @Override public String toString() { return "IncrementalLocalKeyedStateHandle{" - + "metaDataState=" - + metaStateHandle - + "} " - + "DirectoryKeyedStateHandle{" + "directoryStateHandle=" + directoryStateHandle - + ", keyGroupRange=" - + keyGroupRange - + '}'; + + "} " + + super.toString(); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java index 86a5b59c168..41b9a0466cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java @@ -306,75 +306,16 @@ public class IncrementalRemoteKeyedStateHandle extends AbstractIncrementalStateH stateHandleId); } - /** - * This method is should only be called in tests! This should never serve as key in a hash map. - */ - @VisibleForTesting - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - IncrementalRemoteKeyedStateHandle that = (IncrementalRemoteKeyedStateHandle) o; - - if (!getStateHandleId().equals(that.getStateHandleId())) { - return false; - } - if (getCheckpointId() != that.getCheckpointId()) { - return false; - } - if (!getBackendIdentifier().equals(that.getBackendIdentifier())) { - return false; - } - if (!getKeyGroupRange().equals(that.getKeyGroupRange())) { - return false; - } - if (!getSharedState().equals(that.getSharedState())) { - return false; - } - if (!getPrivateState().equals(that.getPrivateState())) { - return false; - } - return getMetaDataStateHandle().equals(that.getMetaDataStateHandle()); - } - - /** This method should only be called in tests! This should never serve as key in a hash map. */ - @VisibleForTesting - @Override - public int hashCode() { - int result = getBackendIdentifier().hashCode(); - result = 31 * result + getKeyGroupRange().hashCode(); - result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32)); - result = 31 * result + getSharedState().hashCode(); - result = 31 * result + getPrivateState().hashCode(); - result = 31 * result + getMetaDataStateHandle().hashCode(); - result = 31 * result + getStateHandleId().hashCode(); - return result; - } - @Override public String toString() { return "IncrementalRemoteKeyedStateHandle{" - + "backendIdentifier=" - + backendIdentifier - + ", stateHandleId=" - + stateHandleId - + ", keyGroupRange=" - + keyGroupRange - + ", checkpointId=" - + checkpointId - + ", sharedState=" - + sharedState - + ", privateState=" + + "privateState=" + privateState - + ", metaStateHandle=" - + metaStateHandle + + ", persistedSizeOfThisCheckpoint=" + + persistedSizeOfThisCheckpoint + ", registered=" + (sharedStateRegistry != null) - + '}'; + + "} " + + super.toString(); } }