Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163938277 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.util.Preconditions; + +import java.util.Objects; + +public class RegisteredBroadcastBackendStateMetaInfo<K, V> { + + /** The name of the state, as registered by the user. */ + private final String name; + + /** The mode how elements in this state are assigned to tasks during restore. */ + private final OperatorStateHandle.Mode assignmentMode; + + /** The type serializer for the keys in the map state. */ + private final TypeSerializer<K> keySerializer; + + /** The type serializer for the values in the map state. */ + private final TypeSerializer<V> valueSerializer; + + public RegisteredBroadcastBackendStateMetaInfo( + final String name, + final OperatorStateHandle.Mode assignmentMode, + final TypeSerializer<K> keySerializer, + final TypeSerializer<V> valueSerializer) { + + Preconditions.checkArgument(assignmentMode != null && assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST); + + this.name = Preconditions.checkNotNull(name); + this.assignmentMode = assignmentMode; + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.valueSerializer = Preconditions.checkNotNull(valueSerializer); + } + + public String getName() { + return name; + } + + public TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + public TypeSerializer<V> getValueSerializer() { + return valueSerializer; + } + + public OperatorStateHandle.Mode getAssignmentMode() { + return assignmentMode; + } + + public RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> snapshot() { + return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>( + name, + assignmentMode, + keySerializer.duplicate(), + valueSerializer.duplicate(), + keySerializer.snapshotConfiguration(), + valueSerializer.snapshotConfiguration()); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) { + return false; + } + + final RegisteredBroadcastBackendStateMetaInfo other = + (RegisteredBroadcastBackendStateMetaInfo) obj; + + return Objects.equals(name, other.getName()) + && Objects.equals(assignmentMode, other.getAssignmentMode()) + && Objects.equals(keySerializer, other.getKeySerializer()) + && Objects.equals(valueSerializer, other.getValueSerializer()); + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + assignmentMode.hashCode(); + result = 31 * result + keySerializer.hashCode(); + result = 31 * result + valueSerializer.hashCode(); + return result; + } + + @Override + public String toString() { + return "RegisteredBroadcastBackendStateMetaInfo{" + + "name='" + name + '\'' + + ", keySerializer=" + keySerializer + + ", valueSerializer=" + valueSerializer + + ", assignmentMode=" + assignmentMode + + '}'; + } + + /** + * A consistent snapshot of a {@link RegisteredOperatorBackendStateMetaInfo}. + */ + public static class Snapshot<K, V> { + + private String name; + private OperatorStateHandle.Mode assignmentMode; + private TypeSerializer<K> keySerializer; + private TypeSerializer<V> valueSerializer; + private TypeSerializerConfigSnapshot keySerializerConfigSnapshot; + private TypeSerializerConfigSnapshot valueSerializerConfigSnapshot; + + /** Empty constructor used when restoring the state meta info snapshot. */ + Snapshot() {} + + private Snapshot( + final String name, + final OperatorStateHandle.Mode assignmentMode, + final TypeSerializer<K> keySerializer, + final TypeSerializer<V> valueSerializer, + final TypeSerializerConfigSnapshot keySerializerConfigSnapshot, + final TypeSerializerConfigSnapshot valueSerializerConfigSnapshot) { + + this.name = Preconditions.checkNotNull(name); + this.assignmentMode = Preconditions.checkNotNull(assignmentMode); + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.valueSerializer = Preconditions.checkNotNull(valueSerializer); + this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializerConfigSnapshot); + this.valueSerializerConfigSnapshot = Preconditions.checkNotNull(valueSerializerConfigSnapshot); + } + + public String getName() { + return name; + } + + void setName(String name) { + this.name = name; + } + + public OperatorStateHandle.Mode getAssignmentMode() { + return assignmentMode; + } + + void setAssignmentMode(OperatorStateHandle.Mode mode) { + this.assignmentMode = mode; + } + + public TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + void setKeySerializer(TypeSerializer<K> serializer) { + this.keySerializer = serializer; + } + + public TypeSerializer<V> getValueSerializer() { + return valueSerializer; + } + + void setValueSerializer(TypeSerializer<V> serializer) { + this.valueSerializer = serializer; + } + + public TypeSerializerConfigSnapshot getKeySerializerConfigSnapshot() { + return keySerializerConfigSnapshot; + } + + void setKeySerializerConfigSnapshot(TypeSerializerConfigSnapshot configSnapshot) { + this.keySerializerConfigSnapshot = configSnapshot; + } + + public TypeSerializerConfigSnapshot getValueSerializerConfigSnapshot() { + return valueSerializerConfigSnapshot; + } + + void setValueSerializerConfigSnapshot(TypeSerializerConfigSnapshot configSnapshot) { + this.valueSerializerConfigSnapshot = configSnapshot; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo.Snapshot)) { + return false; + } + + RegisteredBroadcastBackendStateMetaInfo.Snapshot snapshot = + (RegisteredBroadcastBackendStateMetaInfo.Snapshot) obj; + + // need to check for nulls because serializer and config snapshots may be null on restore --- End diff -- That would entail that we shouldn't be loose and use the `Objects.equals` call below.
---