[ https://issues.apache.org/jira/browse/FLINK-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827896#comment-15827896 ]
ASF GitHub Bot commented on FLINK-5530: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96612430 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java --- @@ -242,6 +245,132 @@ public void testValueState() throws Exception { backend.dispose(); } + /** + * Tests {@link ValueState#value()} and {@link KvState#getSerializedValue(byte[])} + * accessing the state concurrently. They should not get in the way of each + * other. + */ + @Test + @SuppressWarnings("unchecked") + public void testValueStateRace() throws Exception { + final AbstractKeyedStateBackend<Integer> backend = + createKeyedBackend(IntSerializer.INSTANCE); + final Integer namespace = Integer.valueOf(1); + + final ValueStateDescriptor<String> kvId = + new ValueStateDescriptor<>("id", String.class); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + final TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE; + final TypeSerializer<Integer> namespaceSerializer = + IntSerializer.INSTANCE; + final TypeSerializer<String> valueSerializer = kvId.getSerializer(); + + final ValueState<String> state = backend + .getPartitionedState(namespace, IntSerializer.INSTANCE, kvId); + + @SuppressWarnings("unchecked") + final KvState<Integer> kvState = (KvState<Integer>) state; + + /** + * 1) Test that ValueState#value() before and after + * KvState#getSerializedValue(byte[]) return the same value. + */ + + // set some key and namespace + final int key1 = 1; + backend.setCurrentKey(key1); + kvState.setCurrentNamespace(2); + state.update("2"); + assertEquals("2", state.value()); + + // query another key and namespace + assertNull(getSerializedValue(kvState, 3, keySerializer, + namespace, IntSerializer.INSTANCE, + valueSerializer)); + + // the state should not have changed! + assertEquals("2", state.value()); + + // re-set values + kvState.setCurrentNamespace(namespace); + + /** + * 2) Test two threads concurrently using ValueState#value() and + * KvState#getSerializedValue(byte[]). + */ + + // some modifications to the state + final int key2 = 10; + backend.setCurrentKey(key2); + assertNull(state.value()); + assertNull(getSerializedValue(kvState, key2, keySerializer, + namespace, namespaceSerializer, valueSerializer)); + state.update("1"); + + boolean getterSuccess; + final Throwable[] throwables = {null, null}; + + final Thread getter = new Thread("State getter") { --- End diff -- How about using the `CheckedThread` to avoid the stuff with Throwable arrays, etc > race condition in AbstractRocksDBState#getSerializedValue > --------------------------------------------------------- > > Key: FLINK-5530 > URL: https://issues.apache.org/jira/browse/FLINK-5530 > Project: Flink > Issue Type: Bug > Components: Queryable State > Affects Versions: 1.2.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > Priority: Blocker > > AbstractRocksDBState#getSerializedValue() uses the same key serialisation > stream as the ordinary state access methods but is called in parallel during > state queries thus violating the assumption of only one thread accessing it. > This may lead to either wrong results in queries or corrupt data while > queries are executed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)