[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339698#comment-16339698
 ] 

ASF GitHub Bot commented on FLINK-8345:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5230#discussion_r163938102
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
 ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Objects;
    +
    +public class RegisteredBroadcastBackendStateMetaInfo<K, V> {
    +
    +   /** The name of the state, as registered by the user. */
    +   private final String name;
    +
    +   /** The mode how elements in this state are assigned to tasks during 
restore. */
    +   private final OperatorStateHandle.Mode assignmentMode;
    +
    +   /** The type serializer for the keys in the map state. */
    +   private final TypeSerializer<K> keySerializer;
    +
    +   /** The type serializer for the values in the map state. */
    +   private final TypeSerializer<V> valueSerializer;
    +
    +   public RegisteredBroadcastBackendStateMetaInfo(
    +                   final String name,
    +                   final OperatorStateHandle.Mode assignmentMode,
    +                   final TypeSerializer<K> keySerializer,
    +                   final TypeSerializer<V> valueSerializer) {
    +
    +           Preconditions.checkArgument(assignmentMode != null && 
assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST);
    +
    +           this.name = Preconditions.checkNotNull(name);
    +           this.assignmentMode = assignmentMode;
    +           this.keySerializer = Preconditions.checkNotNull(keySerializer);
    +           this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer);
    +   }
    +
    +   public String getName() {
    +           return name;
    +   }
    +
    +   public TypeSerializer<K> getKeySerializer() {
    +           return keySerializer;
    +   }
    +
    +   public TypeSerializer<V> getValueSerializer() {
    +           return valueSerializer;
    +   }
    +
    +   public OperatorStateHandle.Mode getAssignmentMode() {
    +           return assignmentMode;
    +   }
    +
    +   public RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> 
snapshot() {
    +           return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(
    +                           name,
    +                           assignmentMode,
    +                           keySerializer.duplicate(),
    +                           valueSerializer.duplicate(),
    +                           keySerializer.snapshotConfiguration(),
    +                           valueSerializer.snapshotConfiguration());
    +   }
    +
    +   @Override
    +   public boolean equals(Object obj) {
    +           if (obj == this) {
    +                   return true;
    +           }
    +
    +           if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) {
    +                   return false;
    +           }
    +
    +           final RegisteredBroadcastBackendStateMetaInfo other =
    +                           (RegisteredBroadcastBackendStateMetaInfo) obj;
    +
    +           return Objects.equals(name, other.getName())
    +                           && Objects.equals(assignmentMode, 
other.getAssignmentMode())
    +                           && Objects.equals(keySerializer, 
other.getKeySerializer())
    +                           && Objects.equals(valueSerializer, 
other.getValueSerializer());
    +   }
    +
    +   @Override
    +   public int hashCode() {
    +           int result = name.hashCode();
    +           result = 31 * result + assignmentMode.hashCode();
    +           result = 31 * result + keySerializer.hashCode();
    +           result = 31 * result + valueSerializer.hashCode();
    +           return result;
    +   }
    +
    +   @Override
    +   public String toString() {
    +           return "RegisteredBroadcastBackendStateMetaInfo{" +
    +                           "name='" + name + '\'' +
    +                           ", keySerializer=" + keySerializer +
    +                           ", valueSerializer=" + valueSerializer +
    +                           ", assignmentMode=" + assignmentMode +
    +                           '}';
    +   }
    +
    +   /**
    +    * A consistent snapshot of a {@link 
RegisteredOperatorBackendStateMetaInfo}.
    +    */
    +   public static class Snapshot<K, V> {
    +
    +           private String name;
    +           private OperatorStateHandle.Mode assignmentMode;
    +           private TypeSerializer<K> keySerializer;
    +           private TypeSerializer<V> valueSerializer;
    +           private TypeSerializerConfigSnapshot 
keySerializerConfigSnapshot;
    +           private TypeSerializerConfigSnapshot 
valueSerializerConfigSnapshot;
    +
    +           /** Empty constructor used when restoring the state meta info 
snapshot. */
    +           Snapshot() {}
    +
    +           private Snapshot(
    +                           final String name,
    +                           final OperatorStateHandle.Mode assignmentMode,
    +                           final TypeSerializer<K> keySerializer,
    +                           final TypeSerializer<V> valueSerializer,
    +                           final TypeSerializerConfigSnapshot 
keySerializerConfigSnapshot,
    +                           final TypeSerializerConfigSnapshot 
valueSerializerConfigSnapshot) {
    +
    +                   this.name = Preconditions.checkNotNull(name);
    +                   this.assignmentMode = 
Preconditions.checkNotNull(assignmentMode);
    +                   this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
    +                   this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer);
    +                   this.keySerializerConfigSnapshot = 
Preconditions.checkNotNull(keySerializerConfigSnapshot);
    +                   this.valueSerializerConfigSnapshot = 
Preconditions.checkNotNull(valueSerializerConfigSnapshot);
    +           }
    +
    +           public String getName() {
    +                   return name;
    +           }
    +
    +           void setName(String name) {
    +                   this.name = name;
    +           }
    +
    +           public OperatorStateHandle.Mode getAssignmentMode() {
    +                   return assignmentMode;
    +           }
    +
    +           void setAssignmentMode(OperatorStateHandle.Mode mode) {
    +                   this.assignmentMode = mode;
    +           }
    +
    +           public TypeSerializer<K> getKeySerializer() {
    +                   return keySerializer;
    +           }
    +
    +           void setKeySerializer(TypeSerializer<K> serializer) {
    +                   this.keySerializer = serializer;
    +           }
    +
    +           public TypeSerializer<V> getValueSerializer() {
    +                   return valueSerializer;
    +           }
    +
    +           void setValueSerializer(TypeSerializer<V> serializer) {
    +                   this.valueSerializer = serializer;
    +           }
    +
    +           public TypeSerializerConfigSnapshot 
getKeySerializerConfigSnapshot() {
    +                   return keySerializerConfigSnapshot;
    +           }
    +
    +           void 
setKeySerializerConfigSnapshot(TypeSerializerConfigSnapshot configSnapshot) {
    +                   this.keySerializerConfigSnapshot = configSnapshot;
    +           }
    +
    +           public TypeSerializerConfigSnapshot 
getValueSerializerConfigSnapshot() {
    +                   return valueSerializerConfigSnapshot;
    +           }
    +
    +           void 
setValueSerializerConfigSnapshot(TypeSerializerConfigSnapshot configSnapshot) {
    +                   this.valueSerializerConfigSnapshot = configSnapshot;
    +           }
    +
    +           @Override
    +           public boolean equals(Object obj) {
    +                   if (obj == this) {
    +                           return true;
    +                   }
    +
    +                   if (!(obj instanceof 
RegisteredBroadcastBackendStateMetaInfo.Snapshot)) {
    +                           return false;
    +                   }
    +
    +                   RegisteredBroadcastBackendStateMetaInfo.Snapshot 
snapshot =
    +                                   
(RegisteredBroadcastBackendStateMetaInfo.Snapshot) obj;
    +
    +                   // need to check for nulls because serializer and 
config snapshots may be null on restore
    --- End diff --
    
    I don't think the serializer and configs will be null in this case ...
    
    It _used_ to maybe be null in the past because previous versions did not 
have the config written.
    For adding broadcast state now, we should be able to ensure that there is 
always a restored config, and for unloadable serializers, it should be replaced 
with a `UnloadableDummyTypeSerializer`.


> Iterate over keyed state on broadcast side of connect with broadcast.
> ---------------------------------------------------------------------
>
>                 Key: FLINK-8345
>                 URL: https://issues.apache.org/jira/browse/FLINK-8345
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>    Affects Versions: 1.5.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Major
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to