[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339699#comment-16339699
 ] 

ASF GitHub Bot commented on FLINK-8345:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5230#discussion_r163934364
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 ---
    @@ -112,13 +142,40 @@ public void writeStateMetaInfo(DataOutputView out) 
throws IOException {
                }
        }
     
    +   public static class BroadcastStateMetaInfoWriterV2<K, V> extends 
AbstractBroadcastStateMetaInfoWriter<K, V> {
    +
    +           public BroadcastStateMetaInfoWriterV2(
    +                           final 
RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo) {
    +                   super(broadcastStateMetaInfo);
    +           }
    +
    +           @Override
    +           public void writeBroadcastStateMetaInfo(final DataOutputView 
out) throws IOException {
    +                   out.writeUTF(broadcastStateMetaInfo.getName());
    +                   
out.writeByte(broadcastStateMetaInfo.getAssignmentMode().ordinal());
    +
    +                   // write in a way that allows us to be fault-tolerant 
and skip blocks in the case of java serialization failures
    +                   
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
    +                                   out,
    +                                   Collections.singletonList(new Tuple2<>(
    +                                                   
broadcastStateMetaInfo.getKeySerializer(),
    +                                                   
broadcastStateMetaInfo.getKeySerializerConfigSnapshot())));
    +
    +                   
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
    --- End diff --
    
    Combining these two `writeSerializersAndConfigsWithResilience` calls into 
one call, with a single list containing both the key serializer and value 
serializer, would be more space-efficient in the written data:
    
    ```
    TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
        out,
        Arrays.asList(
            Tuple2.of(keySerializer, keySerializerConfig),
            Tuple2.of(valueSerializer, valueSerializerConfig));
    ```


> Iterate over keyed state on broadcast side of connect with broadcast.
> ---------------------------------------------------------------------
>
>                 Key: FLINK-8345
>                 URL: https://issues.apache.org/jira/browse/FLINK-8345
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>    Affects Versions: 1.5.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Major
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to