Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174454169
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
 ---
    @@ -140,19 +159,28 @@ public boolean contains(UK userKey) {
        }
     
        @Override
    -   public byte[] getSerializedValue(K key, N namespace) throws IOException 
{
    -           Preconditions.checkState(namespace != null, "No namespace 
given.");
    -           Preconditions.checkState(key != null, "No key given.");
    +   public byte[] getSerializedValue(
    +                   byte[] serializedKeyAndNamespace,
    +                   TypeSerializer<K> keySerializer,
    +                   TypeSerializer<N> namespaceSerializer,
    +                   TypeSerializer<HashMap<UK, UV>> valueSerializer) throws 
Exception {
     
    -           HashMap<UK, UV> result = stateTable.get(key, namespace);
    +           Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
     
    -           if (null == result) {
    +           Tuple2<K, N> keyAndNamespace = 
KvStateSerializer.deserializeKeyAndNamespace(
    +                           serializedKeyAndNamespace, keySerializer, 
namespaceSerializer);
    +
    +           Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, 
keyAndNamespace.f1);
    +
    +           if (result == null) {
                        return null;
                }
     
    -           TypeSerializer<UK> userKeySerializer = 
stateDesc.getKeySerializer();
    -           TypeSerializer<UV> userValueSerializer = 
stateDesc.getValueSerializer();
    +           final HashMapSerializer<UK, UV> serializer = 
(HashMapSerializer<UK, UV>) valueSerializer;
    --- End diff --
    
    The reason is that RocksDB returns an iterator that gets lazily populated 
as you call next() while the serialize() of the MapSerializer expects a Map. If 
it were to go with your option, we would have to iterate over the map twice, 
once to create the map, and then to serialize it.


---

Reply via email to