[ 
https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16540034#comment-16540034
 ] 

Sihua Zhou commented on FLINK-9804:
-----------------------------------

Hi [~yanghua] do you start working on this already? I just finished this but 
forgot to assign it to myself. Anyway, this is my code 
([https://github.com/sihuazhou/flink/commit/efc4096a4a6f38b3acb0b5189804f7b452218f23]),
 hope this could somehow reduce your work or give you some help. ;)

> KeyedStateBackend.getKeys() does not work on RocksDB MapState
> -------------------------------------------------------------
>
>                 Key: FLINK-9804
>                 URL: https://issues.apache.org/jira/browse/FLINK-9804
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.5.1
>            Reporter: Aljoscha Krettek
>            Assignee: vinoyang
>            Priority: Blocker
>             Fix For: 1.5.2, 1.6.0
>
>
> This can be reproduced by adding this test to {{StateBackendTestBase}}:
> {code}
> @Test
> public void testMapStateGetKeys() throws Exception {
>       final int namespace1ElementsNum = 1000;
>       final int namespace2ElementsNum = 1000;
>       String fieldName = "get-keys-test";
>       AbstractKeyedStateBackend<Integer> backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>       try {
>               final String ns1 = "ns1";
>               MapState<String, Integer> keyedState1 = 
> backend.getPartitionedState(
>                       ns1,
>                       StringSerializer.INSTANCE,
>                       new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>               );
>               for (int key = 0; key < namespace1ElementsNum; key++) {
>                       backend.setCurrentKey(key);
>                       keyedState1.put("he", key * 2);
>                       keyedState1.put("ho", key * 2);
>               }
>               final String ns2 = "ns2";
>               MapState<String, Integer> keyedState2 = 
> backend.getPartitionedState(
>                       ns2,
>                       StringSerializer.INSTANCE,
>                       new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>               );
>               for (int key = namespace1ElementsNum; key < 
> namespace1ElementsNum + namespace2ElementsNum; key++) {
>                       backend.setCurrentKey(key);
>                       keyedState2.put("he", key * 2);
>                       keyedState2.put("ho", key * 2);
>               }
>               // valid for namespace1
>               try (Stream<Integer> keysStream = backend.getKeys(fieldName, 
> ns1).sorted()) {
>                       PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>                       for (int expectedKey = 0; expectedKey < 
> namespace1ElementsNum; expectedKey++) {
>                               assertTrue(actualIterator.hasNext());
>                               assertEquals(expectedKey, 
> actualIterator.nextInt());
>                       }
>                       assertFalse(actualIterator.hasNext());
>               }
>               // valid for namespace2
>               try (Stream<Integer> keysStream = backend.getKeys(fieldName, 
> ns2).sorted()) {
>                       PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>                       for (int expectedKey = namespace1ElementsNum; 
> expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
>                               assertTrue(actualIterator.hasNext());
>                               assertEquals(expectedKey, 
> actualIterator.nextInt());
>                       }
>                       assertFalse(actualIterator.hasNext());
>               }
>       }
>       finally {
>               IOUtils.closeQuietly(backend);
>               backend.dispose();
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to