[ https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sihua Zhou updated FLINK-9269: ------------------------------ Description: {code:java} @Test public void testConccurrencyProblem() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); Environment env = new DummyEnvironment(); AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); try { long checkpointID = 0; List<Future> futureList = new ArrayList(); for (int i = 0; i < 10; ++i) { ValueStateDescriptor<Integer> kvId = new ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE); ValueState<Integer> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId); ((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE); backend.setCurrentKey(i); state.update(i); futureList.add(runSnapshotAsync(backend.snapshot(checkpointID++, System.currentTimeMillis(), streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()))); } for (Future future : futureList) { future.get(); } } catch (Exception e) { fail(); } finally { backend.dispose(); } } protected Future<?> runSnapshotAsync( RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture) throws Exception { if (!snapshotRunnableFuture.isDone()) { return Executors.newFixedThreadPool(5).submit(() -> { try { snapshotRunnableFuture.run(); snapshotRunnableFuture.get(); } catch (Exception e) { e.printStackTrace(); fail(); } }); } return null; } {code} Place the above code in `StateBackendTestBase` and run `AsyncMemoryStateBackendTest`, it will get the follows exception {code} java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662) at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84) ... 5 more java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662) at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84) ... 5 more {code} was: {code:java} @Nonnull @Override protected SnapshotResult<KeyedStateHandle> performOperation() throws Exception { // do something long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()]; for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) { int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos); keyGroupRangeOffsets[keyGroupPos] = localStream.getPos(); outView.writeInt(keyGroupId); for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { // do something } } // do something } {code} > Concurrency problem in HeapKeyedStateBackend when performing checkpoint async > ----------------------------------------------------------------------------- > > Key: FLINK-9269 > URL: https://issues.apache.org/jira/browse/FLINK-9269 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.5.0 > Reporter: Sihua Zhou > Assignee: Sihua Zhou > Priority: Major > Fix For: 1.5.0 > > > {code:java} > @Test > public void testConccurrencyProblem() throws Exception { > CheckpointStreamFactory streamFactory = createStreamFactory(); > Environment env = new DummyEnvironment(); > AbstractKeyedStateBackend<Integer> backend = > createKeyedBackend(IntSerializer.INSTANCE, env); > try { > long checkpointID = 0; > List<Future> futureList = new ArrayList(); > for (int i = 0; i < 10; ++i) { > ValueStateDescriptor<Integer> kvId = new > ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE); > ValueState<Integer> state = > backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId); > ((InternalValueState) > state).setCurrentNamespace(VoidNamespace.INSTANCE); > backend.setCurrentKey(i); > state.update(i); > > futureList.add(runSnapshotAsync(backend.snapshot(checkpointID++, > System.currentTimeMillis(), streamFactory, > CheckpointOptions.forCheckpointWithDefaultLocation()))); > } > for (Future future : futureList) { > future.get(); > } > } catch (Exception e) { > fail(); > } finally { > backend.dispose(); > } > } > protected Future<?> runSnapshotAsync( > RunnableFuture<SnapshotResult<KeyedStateHandle>> > snapshotRunnableFuture) throws Exception { > if (!snapshotRunnableFuture.isDone()) { > return Executors.newFixedThreadPool(5).submit(() -> { > try { > snapshotRunnableFuture.run(); > snapshotRunnableFuture.get(); > } catch (Exception e) { > e.printStackTrace(); > fail(); > } > }); > } > return null; > } > {code} > Place the above code in `StateBackendTestBase` and run > `AsyncMemoryStateBackendTest`, it will get the follows exception > {code} > java.util.concurrent.ExecutionException: java.lang.NullPointerException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662) > at > org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84) > ... 5 more > java.util.concurrent.ExecutionException: java.lang.NullPointerException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662) > at > org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84) > ... 5 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)