Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174382685
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
    @@ -78,13 +75,22 @@ public KvStateServerHandler(
                final CompletableFuture<KvStateResponse> responseFuture = new 
CompletableFuture<>();
     
                try {
    -                   final InternalKvState<?> kvState = 
registry.getKvState(request.getKvStateId());
    +                   final KvStateEntry<?, ?, ?> kvState = 
registry.getKvState(request.getKvStateId());
                        if (kvState == null) {
                                responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
                        } else {
                                byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
     
    -                           byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
    +                           // here we remove any type check...
    +                           // Ideally we want to keep that the info match 
the state.
    --- End diff --
    
    you can retain type safety:
    
    Call from the handler:
    ```
    byte[] serializedResult = getSerializedValue(kvState, 
serializedKeyAndNamespace);
    ```
    
    Added method:
    ```
    private static <K, N, V> byte[] getSerializedValue(KvStateEntry<K, N, V> 
entry, byte[] serializedKeyAndNamespace) throws Exception {
                InternalKvState<K, N, V> state = entry.getState();
                KvStateInfo<K, N, V> infoForCurrentThread = 
entry.getInfoForCurrentThread();
                
                return state.getSerializedValue(
                        serializedKeyAndNamespace,
                        infoForCurrentThread.getKeySerializer(),
                        infoForCurrentThread.getNamespaceSerializer(),
                        infoForCurrentThread.getStateValueSerializer()
                );
        }
    ```


---

Reply via email to