[ 
https://issues.apache.org/jira/browse/FLINK-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828101#comment-15828101
 ] 

ASF GitHub Bot commented on FLINK-5530:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3143#discussion_r96638451
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
    @@ -242,6 +245,132 @@ public void testValueState() throws Exception {
                backend.dispose();
        }
     
    +   /**
    +    * Tests {@link ValueState#value()} and {@link 
KvState#getSerializedValue(byte[])}
    +    * accessing the state concurrently. They should not get in the way of 
each
    +    * other.
    +    */
    +   @Test
    +   @SuppressWarnings("unchecked")
    +   public void testValueStateRace() throws Exception {
    +           final AbstractKeyedStateBackend<Integer> backend =
    +                   createKeyedBackend(IntSerializer.INSTANCE);
    +           final Integer namespace = Integer.valueOf(1);
    +
    +           final ValueStateDescriptor<String> kvId =
    +                   new ValueStateDescriptor<>("id", String.class);
    +           kvId.initializeSerializerUnlessSet(new ExecutionConfig());
    +
    +           final TypeSerializer<Integer> keySerializer = 
IntSerializer.INSTANCE;
    +           final TypeSerializer<Integer> namespaceSerializer =
    +                   IntSerializer.INSTANCE;
    +           final TypeSerializer<String> valueSerializer = 
kvId.getSerializer();
    +
    +           final ValueState<String> state = backend
    +                   .getPartitionedState(namespace, IntSerializer.INSTANCE, 
kvId);
    +
    +           @SuppressWarnings("unchecked")
    +           final KvState<Integer> kvState = (KvState<Integer>) state;
    +
    +           /**
    +            * 1) Test that ValueState#value() before and after
    +            * KvState#getSerializedValue(byte[]) return the same value.
    +            */
    +
    +           // set some key and namespace
    +           final int key1 = 1;
    +           backend.setCurrentKey(key1);
    +           kvState.setCurrentNamespace(2);
    +           state.update("2");
    +           assertEquals("2", state.value());
    +
    +           // query another key and namespace
    +           assertNull(getSerializedValue(kvState, 3, keySerializer,
    +                   namespace, IntSerializer.INSTANCE,
    +                   valueSerializer));
    +
    +           // the state should not have changed!
    +           assertEquals("2", state.value());
    +
    +           // re-set values
    +           kvState.setCurrentNamespace(namespace);
    +
    +           /**
    +            * 2) Test two threads concurrently using ValueState#value() and
    +            * KvState#getSerializedValue(byte[]).
    +            */
    +
    +           // some modifications to the state
    +           final int key2 = 10;
    +           backend.setCurrentKey(key2);
    +           assertNull(state.value());
    +           assertNull(getSerializedValue(kvState, key2, keySerializer,
    +                   namespace, namespaceSerializer, valueSerializer));
    +           state.update("1");
    +
    +           boolean getterSuccess;
    +           final Throwable[] throwables = {null, null};
    +
    +           final Thread getter = new Thread("State getter") {
    --- End diff --
    
    nice replacement - didn't know about that class


> race condition in AbstractRocksDBState#getSerializedValue
> ---------------------------------------------------------
>
>                 Key: FLINK-5530
>                 URL: https://issues.apache.org/jira/browse/FLINK-5530
>             Project: Flink
>          Issue Type: Bug
>          Components: Queryable State
>    Affects Versions: 1.2.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Blocker
>
> AbstractRocksDBState#getSerializedValue() uses the same key serialisation 
> stream as the ordinary state access methods but is called in parallel during 
> state queries thus violating the assumption of only one thread accessing it. 
> This may lead to either wrong results in queries or corrupt data while 
> queries are executed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to