Aljoscha Krettek created FLINK-9060: ---------------------------------------
Summary: Deleting state using KeyedStateBackend.getKeys() throws Exception Key: FLINK-9060 URL: https://issues.apache.org/jira/browse/FLINK-9060 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Aljoscha Krettek Fix For: 1.5.0 Adding this test to {{StateBackendTestBase}} showcases the problem: {code} @Test public void testConcurrentModificationWithGetKeys() throws Exception { AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); try { ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("foo", StringSerializer.INSTANCE); backend.setCurrentKey(1); backend .getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor) .add("Hello"); backend.setCurrentKey(2); backend .getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor) .add("Ciao"); Stream<Integer> keys = backend .getKeys(listStateDescriptor.getName(), VoidNamespace.INSTANCE); keys.forEach((key) -> { backend.setCurrentKey(key); try { backend .getPartitionedState( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor) .clear(); } catch (Exception e) { e.printStackTrace(); } }); } finally { IOUtils.closeQuietly(backend); backend.dispose(); } } {code} This should work because one of the use cases of {{getKeys()}} and {{applyToAllKeys()}} is to do stuff for every key, which includes deleting them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)