[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339698#comment-16339698 ]
ASF GitHub Bot commented on FLINK-8345: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163938102 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.util.Preconditions; + +import java.util.Objects; + +public class RegisteredBroadcastBackendStateMetaInfo<K, V> { + + /** The name of the state, as registered by the user. */ + private final String name; + + /** The mode how elements in this state are assigned to tasks during restore. */ + private final OperatorStateHandle.Mode assignmentMode; + + /** The type serializer for the keys in the map state. */ + private final TypeSerializer<K> keySerializer; + + /** The type serializer for the values in the map state. */ + private final TypeSerializer<V> valueSerializer; + + public RegisteredBroadcastBackendStateMetaInfo( + final String name, + final OperatorStateHandle.Mode assignmentMode, + final TypeSerializer<K> keySerializer, + final TypeSerializer<V> valueSerializer) { + + Preconditions.checkArgument(assignmentMode != null && assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST); + + this.name = Preconditions.checkNotNull(name); + this.assignmentMode = assignmentMode; + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.valueSerializer = Preconditions.checkNotNull(valueSerializer); + } + + public String getName() { + return name; + } + + public TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + public TypeSerializer<V> getValueSerializer() { + return valueSerializer; + } + + public OperatorStateHandle.Mode getAssignmentMode() { + return assignmentMode; + } + + public RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> snapshot() { + return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>( + name, + assignmentMode, + keySerializer.duplicate(), + valueSerializer.duplicate(), + keySerializer.snapshotConfiguration(), + valueSerializer.snapshotConfiguration()); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) { + return false; + } + + final RegisteredBroadcastBackendStateMetaInfo other = + (RegisteredBroadcastBackendStateMetaInfo) obj; + + return Objects.equals(name, other.getName()) + && Objects.equals(assignmentMode, other.getAssignmentMode()) + && Objects.equals(keySerializer, other.getKeySerializer()) + && Objects.equals(valueSerializer, other.getValueSerializer()); + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + assignmentMode.hashCode(); + result = 31 * result + keySerializer.hashCode(); + result = 31 * result + valueSerializer.hashCode(); + return result; + } + + @Override + public String toString() { + return "RegisteredBroadcastBackendStateMetaInfo{" + + "name='" + name + '\'' + + ", keySerializer=" + keySerializer + + ", valueSerializer=" + valueSerializer + + ", assignmentMode=" + assignmentMode + + '}'; + } + + /** + * A consistent snapshot of a {@link RegisteredOperatorBackendStateMetaInfo}. + */ + public static class Snapshot<K, V> { + + private String name; + private OperatorStateHandle.Mode assignmentMode; + private TypeSerializer<K> keySerializer; + private TypeSerializer<V> valueSerializer; + private TypeSerializerConfigSnapshot keySerializerConfigSnapshot; + private TypeSerializerConfigSnapshot valueSerializerConfigSnapshot; + + /** Empty constructor used when restoring the state meta info snapshot. */ + Snapshot() {} + + private Snapshot( + final String name, + final OperatorStateHandle.Mode assignmentMode, + final TypeSerializer<K> keySerializer, + final TypeSerializer<V> valueSerializer, + final TypeSerializerConfigSnapshot keySerializerConfigSnapshot, + final TypeSerializerConfigSnapshot valueSerializerConfigSnapshot) { + + this.name = Preconditions.checkNotNull(name); + this.assignmentMode = Preconditions.checkNotNull(assignmentMode); + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.valueSerializer = Preconditions.checkNotNull(valueSerializer); + this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializerConfigSnapshot); + this.valueSerializerConfigSnapshot = Preconditions.checkNotNull(valueSerializerConfigSnapshot); + } + + public String getName() { + return name; + } + + void setName(String name) { + this.name = name; + } + + public OperatorStateHandle.Mode getAssignmentMode() { + return assignmentMode; + } + + void setAssignmentMode(OperatorStateHandle.Mode mode) { + this.assignmentMode = mode; + } + + public TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + void setKeySerializer(TypeSerializer<K> serializer) { + this.keySerializer = serializer; + } + + public TypeSerializer<V> getValueSerializer() { + return valueSerializer; + } + + void setValueSerializer(TypeSerializer<V> serializer) { + this.valueSerializer = serializer; + } + + public TypeSerializerConfigSnapshot getKeySerializerConfigSnapshot() { + return keySerializerConfigSnapshot; + } + + void setKeySerializerConfigSnapshot(TypeSerializerConfigSnapshot configSnapshot) { + this.keySerializerConfigSnapshot = configSnapshot; + } + + public TypeSerializerConfigSnapshot getValueSerializerConfigSnapshot() { + return valueSerializerConfigSnapshot; + } + + void setValueSerializerConfigSnapshot(TypeSerializerConfigSnapshot configSnapshot) { + this.valueSerializerConfigSnapshot = configSnapshot; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo.Snapshot)) { + return false; + } + + RegisteredBroadcastBackendStateMetaInfo.Snapshot snapshot = + (RegisteredBroadcastBackendStateMetaInfo.Snapshot) obj; + + // need to check for nulls because serializer and config snapshots may be null on restore --- End diff -- I don't think the serializer and configs will be null in this case ... It _used_ to maybe be null in the past because previous versions did not have the config written. For adding broadcast state now, we should be able to ensure that there is always a restored config, and for unloadable serializers, it should be replaced with a `UnloadableDummyTypeSerializer`. > Iterate over keyed state on broadcast side of connect with broadcast. > --------------------------------------------------------------------- > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming > Affects Versions: 1.5.0 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)