Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174473498
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
 ---
    @@ -70,10 +88,18 @@
         * <p>If no value is associated with key and namespace, 
<code>null</code>
         * is returned.
         *
    +    * <p><b>TO IMPLEMENTERS:</b> This method is called by multiple 
threads. Anything
    +    * stateful (e.g. serializers) should be either duplicated or protected 
from undesired
    +    * consequences of concurrent invocations.
    +    *
         * @param serializedKeyAndNamespace Serialized key and namespace
         * @return Serialized value or <code>null</code> if no value is 
associated with the key and namespace.
         * 
         * @throws Exception Exceptions during serialization are forwarded
         */
    -   byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws 
Exception;
    +   byte[] getSerializedValue(
    --- End diff --
    
    I would suggest to store the serializer in a thread local variable. The 
current solution is a bit confusing because this interface suddenly exposes 
serializers and caller have to provide serialzer in the `getSerializedValue` 
method. In my opinion this interface does not make much sense in this way. 
Furthermore, the serializers are copied externally into something that looks 
like a custom-build thread local. I suggest having the serializers thread local 
in the base class and bringing this interface back to the original form. There 
is also only one threadpool, dedicated for queryable state that would hold the 
serializers and even the current solution has a dedicated cleanup method. In 
that place, we can just clean the thread locals.


---

Reply via email to