Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174450032
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
 ---
    @@ -140,19 +159,28 @@ public boolean contains(UK userKey) {
        }
     
        @Override
    -   public byte[] getSerializedValue(K key, N namespace) throws IOException 
{
    -           Preconditions.checkState(namespace != null, "No namespace 
given.");
    -           Preconditions.checkState(key != null, "No key given.");
    +   public byte[] getSerializedValue(
    +                   byte[] serializedKeyAndNamespace,
    +                   TypeSerializer<K> keySerializer,
    +                   TypeSerializer<N> namespaceSerializer,
    +                   TypeSerializer<HashMap<UK, UV>> valueSerializer) throws 
Exception {
     
    -           HashMap<UK, UV> result = stateTable.get(key, namespace);
    +           Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
     
    -           if (null == result) {
    +           Tuple2<K, N> keyAndNamespace = 
KvStateSerializer.deserializeKeyAndNamespace(
    +                           serializedKeyAndNamespace, keySerializer, 
namespaceSerializer);
    +
    +           Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, 
keyAndNamespace.f1);
    +
    +           if (result == null) {
                        return null;
                }
     
    -           TypeSerializer<UK> userKeySerializer = 
stateDesc.getKeySerializer();
    -           TypeSerializer<UV> userValueSerializer = 
stateDesc.getValueSerializer();
    +           final HashMapSerializer<UK, UV> serializer = 
(HashMapSerializer<UK, UV>) valueSerializer;
    --- End diff --
    
    i mean why isn't the signature of `KvStateSerializer.serializeMap`:
    ```
    KvStateSerializer.serializeMap(Map<UK, UV> map, TypeSerializer<Map<UK, UV>> 
serializer);
    ```
    
    with the following implementation:
    ```
    ...
    serializeMap(Map<UK, UV> map, TypeSerializer<Map<UK, UV>> serializer) {
                if (map != null) {
                        DataOutputSerializer dos = new DataOutputSerializer(32);
                        serializer.serialize(map, dos);
                        return dos.getCopyOfBuffer();
                } else {
                        return null;
                }
    ```
    
    Why deal with the map key/value entries at all outside the serializer?


---

Reply via email to