Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5230#discussion_r166337949
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
    @@ -38,6 +38,37 @@
         */
        void setCurrentKey(K newKey);
     
    +   /**
    +    * Returns a safe version of the current key (see {@link 
#setCurrentKey(Object)}).
    +    *
    +    * <p>"Safe" means that the user can interact with it without 
jeopardizing correctness.
    +    * This implies that:
    +    * <ol>
    +    *     <li>for the {@code MemoryStateBackend} and the {@code 
FsStateBackend} we
    +    *     return a <b>copy</b> of the actual key, while
    +    *     <li>for the {@code RocksDBStateBackend} we return the key 
itself, as returned
    +    *     by the backend.
    +    * </ol>
    +    *
    +    * <p>The copy is created using the {@link TypeSerializer#copy(Object) 
copy()} method
    +    * of the key {@link TypeSerializer}. Consequently, the correctness of 
the method assumes
    +    * a correct {@code copy()} method.
    +    *
    +    */
    +   K getCurrentKeySafe();
    +
    +   /**
    +    * Applies the provided {@link KeyedStateFunction} to the state with 
the provided
    +    * {@link StateDescriptor} of all the currently active keys.
    +    *
    +    * @param stateDescriptor the descriptor of the state to which the
    +    *                           function is going to be applied.
    +    * @param function the function to be applied to the keyed state.
    +    */
    +   <S extends State, T> void applyToAllKeys(
    --- End diff --
    
    I think this method needs to take the namespace. The current implementation 
in `AbstractKeyedStateBackend` always iterates over `VoidNamespace` implicitly, 
which can be a bit surprising.


---

Reply via email to