carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334180018
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##########
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
                assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
                stateMap.close();
-               assertEquals(0, stateMap.size());
-               assertEquals(0, stateMap.totalSize());
-               assertTrue(stateMap.isClosed());
        }
 
        /**
-        * Test basic operations.
+        * Test state put operation.
         */
        @Test
-       public void testBasicOperations() throws Exception {
-               TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
-               TypeSerializer<Long> namespaceSerializer = 
LongSerializer.INSTANCE;
-               TypeSerializer<String> stateSerializer = 
StringSerializer.INSTANCE;
-               CopyOnWriteSkipListStateMap<Integer, Long, String> stateMap = 
new CopyOnWriteSkipListStateMap<>(
-                       keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+       public void testPutState() {
+               testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+       }
 
-               ThreadLocalRandom random = ThreadLocalRandom.current();
-               // map to store expected states, namespace -> key -> state
-               Map<Long, Map<Integer, String>> referenceStates = new 
HashMap<>();
-               int totalSize = 0;
+       /**
+        * Test remove existing state.
+        */
+       @Test
+       public void testRemoveExistingState() {
+               testRemoveState(false, false);
+       }
 
-               // put some states
-               for (long namespace = 0; namespace < 10; namespace++) {
-                       for (int key = 0; key < 100; key++) {
-                               totalSize++;
-                               String state = String.valueOf(key * namespace);
-                               if (random.nextBoolean()) {
-                                       stateMap.put(key, namespace, state);
-                               } else {
-                                       assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+       /**
+        * Test remove and get existing state.
+        */
+       @Test
+       public void testRemoveAndGetExistingState() {
+               testRemoveState(false, true);
+       }
+
+       /**
+        * Test remove absent state.
+        */
+       @Test
+       public void testRemoveAbsentState() {
+               testRemoveState(true, true);
+       }
+
+       /**
+        * Test remove previously removed state.
+        */
+       @Test
+       public void testPutPreviouslyRemovedState() {
+               testWithFunction(
+                       (totalSize, stateMap, referenceStates) -> 
applyFunctionAfterRemove(stateMap, referenceStates,
+                               (removedCnt, removedStates) -> {
+                                       int size = totalSize - removedCnt;
+                                       for (Map.Entry<Long, Set<Integer>> 
entry : removedStates.entrySet()) {
+                                               long namespace = entry.getKey();
+                                               for (int key : 
entry.getValue()) {
+                                                       size++;
+                                                       String state = 
String.valueOf(key * namespace);
+                                                       
assertNull(stateMap.putAndGetOld(key, namespace, state));
+                                                       
referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, 
String.valueOf(state));
+                                               }
+                                       }
+                                       return getDefaultSizes(size);
                                }
-                               referenceStates.computeIfAbsent(namespace, 
(none) -> new HashMap<>()).put(key, state);
-                               assertEquals(totalSize, stateMap.size());
-                               assertEquals(totalSize, stateMap.totalSize());
-                       }
-               }
+                       )
+               );
 
 Review comment:
   Let me try to improve... It seems sometimes we suggest to use functions to 
reduce duplicated codes and the other cases we are against it. Hopefully I 
could get better known about the standard/convention.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to