[FLINK-4892] Add Key-Group Ranges Test in HeapInternalTimerServiceTest

This checks whether key groups are correctly checkpointed and wether we
can correctly restore reassigned key-group ranges.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9dc26355
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9dc26355
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9dc26355

Branch: refs/heads/master
Commit: 9dc26355d2bfebcee05ba1185debfd5706d4f7a9
Parents: e3b5d33
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Oct 24 14:40:34 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Oct 26 23:26:28 2016 +0200

----------------------------------------------------------------------
 .../operators/HeapInternalTimerServiceTest.java | 113 +++++++++++++++++++
 1 file changed, 113 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9dc26355/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index bba6517..d753e4e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -608,6 +608,119 @@ public class HeapInternalTimerServiceTest {
                assertEquals(0, timerService.numEventTimeTimers());
        }
 
+       /**
+        * This test checks whether timers are assigned to correct key groups
+        * and whether snapshot/restore respects key groups.
+        */
+       @Test
+       public void testSnapshotAndRebalancingRestore() throws Exception {
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               HeapInternalTimerService<Integer, String> timerService =
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService, testKeyGroupRange, maxParallelism);
+
+               int midpoint = testKeyGroupRange.getStartKeyGroup() +
+                               (testKeyGroupRange.getEndKeyGroup() - 
testKeyGroupRange.getStartKeyGroup()) / 2;
+
+               // get two sub key-ranges so that we can restore two ranges 
separately
+               KeyGroupRange subKeyGroupRange1 = new 
KeyGroupRange(testKeyGroupRange.getStartKeyGroup(), midpoint);
+               KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(midpoint + 
1, testKeyGroupRange.getEndKeyGroup());
+
+               // get two different keys, one per sub range
+               int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, 
maxParallelism);
+               int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, 
maxParallelism);
+
+               keyContext.setCurrentKey(key1);
+
+               timerService.registerProcessingTimeTimer("ciao", 10);
+               timerService.registerEventTimeTimer("hello", 10);
+
+               keyContext.setCurrentKey(key2);
+
+               timerService.registerEventTimeTimer("ciao", 10);
+               timerService.registerProcessingTimeTimer("hello", 10);
+
+               assertEquals(2, timerService.numProcessingTimeTimers());
+               assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+               assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+               assertEquals(2, timerService.numEventTimeTimers());
+               assertEquals(1, timerService.numEventTimeTimers("hello"));
+               assertEquals(1, timerService.numEventTimeTimers("ciao"));
+
+               // one map per sub key-group range
+               Map<Integer, byte[]> snapshot1 = new HashMap<>();
+               Map<Integer, byte[]> snapshot2 = new HashMap<>();
+               for (Integer keyGroupIndex : testKeyGroupRange) {
+                       ByteArrayOutputStream outStream = new 
ByteArrayOutputStream();
+                       timerService.snapshotTimersForKeyGroup(new 
DataOutputViewStreamWrapper(outStream), keyGroupIndex);
+                       outStream.close();
+                       if (subKeyGroupRange1.contains(keyGroupIndex)) {
+                               snapshot1.put(keyGroupIndex, 
outStream.toByteArray());
+                       } else if (subKeyGroupRange2.contains(keyGroupIndex)) {
+                               snapshot2.put(keyGroupIndex, 
outStream.toByteArray());
+                       } else {
+                               throw new IllegalStateException("Key-Group 
index doesn't belong to any sub range.");
+                       }
+               }
+
+               // from now on we need everything twice. once per sub key-group 
range
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable1 = 
mock(Triggerable.class);
+
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable2 = 
mock(Triggerable.class);
+
+
+               TestKeyContext keyContext1 = new TestKeyContext();
+               TestKeyContext keyContext2 = new TestKeyContext();
+
+               TestProcessingTimeService processingTimeService1 = new 
TestProcessingTimeService();
+               TestProcessingTimeService processingTimeService2 = new 
TestProcessingTimeService();
+
+               HeapInternalTimerService<Integer, String> timerService1 = 
restoreTimerService(
+                               snapshot1,
+                               mockTriggerable1,
+                               keyContext1,
+                               processingTimeService1,
+                               subKeyGroupRange1,
+                               maxParallelism);
+
+               HeapInternalTimerService<Integer, String> timerService2 = 
restoreTimerService(
+                               snapshot2,
+                               mockTriggerable2,
+                               keyContext2,
+                               processingTimeService2,
+                               subKeyGroupRange2,
+                               maxParallelism);
+
+
+               processingTimeService1.setCurrentTime(10);
+               timerService1.advanceWatermark(10);
+
+               verify(mockTriggerable1, 
times(1)).onProcessingTime(anyInternalTimer());
+               verify(mockTriggerable1, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, key1, "ciao")));
+               verify(mockTriggerable1, never()).onProcessingTime(eq(new 
InternalTimer<>(10, key2, "hello")));
+               verify(mockTriggerable1, 
times(1)).onEventTime(anyInternalTimer());
+               verify(mockTriggerable1, times(1)).onEventTime(eq(new 
InternalTimer<>(10, key1, "hello")));
+               verify(mockTriggerable1, never()).onEventTime(eq(new 
InternalTimer<>(10, key2, "ciao")));
+
+               assertEquals(0, timerService1.numEventTimeTimers());
+
+               processingTimeService2.setCurrentTime(10);
+               timerService2.advanceWatermark(10);
+
+               verify(mockTriggerable2, 
times(1)).onProcessingTime(anyInternalTimer());
+               verify(mockTriggerable2, never()).onProcessingTime(eq(new 
InternalTimer<>(10, key1, "ciao")));
+               verify(mockTriggerable2, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, key2, "hello")));
+               verify(mockTriggerable2, 
times(1)).onEventTime(anyInternalTimer());
+               verify(mockTriggerable2, never()).onEventTime(eq(new 
InternalTimer<>(10, key1, "hello")));
+               verify(mockTriggerable2, times(1)).onEventTime(eq(new 
InternalTimer<>(10, key2, "ciao")));
+
+               assertEquals(0, timerService2.numEventTimeTimers());
+       }
 
        private static class TestKeyContext implements KeyContext {
 

Reply via email to