Aljoscha Krettek created FLINK-9060:
---------------------------------------

             Summary: Deleting state using KeyedStateBackend.getKeys() throws 
Exception
                 Key: FLINK-9060
                 URL: https://issues.apache.org/jira/browse/FLINK-9060
             Project: Flink
          Issue Type: Bug
          Components: State Backends, Checkpointing
            Reporter: Aljoscha Krettek
             Fix For: 1.5.0


Adding this test to {{StateBackendTestBase}} showcases the problem:

{code}
@Test
public void testConcurrentModificationWithGetKeys() throws Exception {
        AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);

        try {

                ListStateDescriptor<String> listStateDescriptor =
                        new ListStateDescriptor<>("foo", 
StringSerializer.INSTANCE);

                backend.setCurrentKey(1);

                backend
                        .getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
                        .add("Hello");

                backend.setCurrentKey(2);

                backend
                        .getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
                        .add("Ciao");

                Stream<Integer> keys = backend
                        .getKeys(listStateDescriptor.getName(), 
VoidNamespace.INSTANCE);

                keys.forEach((key) -> {
                        backend.setCurrentKey(key);
                        try {
                                backend
                                        .getPartitionedState(
                                                VoidNamespace.INSTANCE,
                                                
VoidNamespaceSerializer.INSTANCE,
                                                listStateDescriptor)
                                        .clear();
                        } catch (Exception e) {
                                e.printStackTrace();
                        }
                });

        }
        finally {
                IOUtils.closeQuietly(backend);
                backend.dispose();
        }
}

{code}

This should work because one of the use cases of {{getKeys()}} and 
{{applyToAllKeys()}} is to do stuff for every key, which includes deleting them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to