Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174423536
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
 ---
    @@ -140,19 +159,28 @@ public boolean contains(UK userKey) {
        }
     
        @Override
    -   public byte[] getSerializedValue(K key, N namespace) throws IOException 
{
    -           Preconditions.checkState(namespace != null, "No namespace 
given.");
    -           Preconditions.checkState(key != null, "No key given.");
    +   public byte[] getSerializedValue(
    +                   byte[] serializedKeyAndNamespace,
    +                   TypeSerializer<K> keySerializer,
    +                   TypeSerializer<N> namespaceSerializer,
    +                   TypeSerializer<HashMap<UK, UV>> valueSerializer) throws 
Exception {
     
    -           HashMap<UK, UV> result = stateTable.get(key, namespace);
    +           Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
     
    -           if (null == result) {
    +           Tuple2<K, N> keyAndNamespace = 
KvStateSerializer.deserializeKeyAndNamespace(
    +                           serializedKeyAndNamespace, keySerializer, 
namespaceSerializer);
    +
    +           Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, 
keyAndNamespace.f1);
    +
    +           if (result == null) {
                        return null;
                }
     
    -           TypeSerializer<UK> userKeySerializer = 
stateDesc.getKeySerializer();
    -           TypeSerializer<UV> userValueSerializer = 
stateDesc.getValueSerializer();
    +           final HashMapSerializer<UK, UV> serializer = 
(HashMapSerializer<UK, UV>) valueSerializer;
    --- End diff --
    
    The problem is that the Heap Backend uses the `HashMapSerializer` while 
RocksDB uses `MapSerializer`. So here it should be sth like `TypeSerializer<? 
extends Map<UK, UV>>` that then should somehow be casted to the appropriate 
serializer somehow in order to get the key and the value serializer. The 
solution that we have here is a bit cleaner, although the best would be to 
remove the `HashMapSerializer` and have a `MapSerializer` in here.


---

Reply via email to