[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874575#comment-15874575
 ] 

ASF GitHub Bot commented on FLINK-4856:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3336#discussion_r101992577
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
 ---
    @@ -484,6 +487,71 @@ public static Throwable 
deserializeServerFailure(ByteBuf buf) throws IOException
                        return null;
                }
        }
    +   
    +   /**
    +    * Serializes all values of the Iterable with the given serializer.
    +    *
    +    * @param entries         Key-value pairs to serialize
    +    * @param keySerializer   Serializer for UK
    +    * @param valueSerializer Serializer for UV
    +    * @param <UK>            Type of the keys
    +    * @param <UV>            Type of the values
    +    * @return Serialized values or <code>null</code> if values 
<code>null</code> or empty
    +    * @throws IOException On failure during serialization
    +    */
    +   public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> 
entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) 
throws IOException {
    +           if (entries != null) {
    +                   Iterator<Map.Entry<UK, UV>> it = entries.iterator();
    +
    +                   if (it.hasNext()) {
    +                           // Serialize
    +                           DataOutputSerializer dos = new 
DataOutputSerializer(32);
    +
    +                           while (it.hasNext()) {
    +                                   Map.Entry<UK, UV> entry = it.next();
    +
    +                                   keySerializer.serialize(entry.getKey(), 
dos);
    +                                   
valueSerializer.serialize(entry.getValue(), dos);
    +                           }
    +
    +                           return dos.getCopyOfBuffer();
    +                   } else {
    +                           return null;
    --- End diff --
    
    I wonder if null and and empty map should considered the same. From the 
other code in the class, I think we should return an empty byte[] here. Then, 
you could also simply use a for-each loop over the iterable to make the code 
shorter.


> Add MapState for keyed streams
> ------------------------------
>
>                 Key: FLINK-4856
>                 URL: https://issues.apache.org/jira/browse/FLINK-4856
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API, State Backends, Checkpointing
>            Reporter: Xiaogang Shi
>            Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to