[FLINK-4892] Add Key-Group Ranges Test in HeapInternalTimerServiceTest This checks whether key groups are correctly checkpointed and wether we can correctly restore reassigned key-group ranges.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9dc26355 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9dc26355 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9dc26355 Branch: refs/heads/master Commit: 9dc26355d2bfebcee05ba1185debfd5706d4f7a9 Parents: e3b5d33 Author: Aljoscha Krettek <[email protected]> Authored: Mon Oct 24 14:40:34 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Oct 26 23:26:28 2016 +0200 ---------------------------------------------------------------------- .../operators/HeapInternalTimerServiceTest.java | 113 +++++++++++++++++++ 1 file changed, 113 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9dc26355/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java index bba6517..d753e4e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java @@ -608,6 +608,119 @@ public class HeapInternalTimerServiceTest { assertEquals(0, timerService.numEventTimeTimers()); } + /** + * This test checks whether timers are assigned to correct key groups + * and whether snapshot/restore respects key groups. + */ + @Test + public void testSnapshotAndRebalancingRestore() throws Exception { + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + HeapInternalTimerService<Integer, String> timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism); + + int midpoint = testKeyGroupRange.getStartKeyGroup() + + (testKeyGroupRange.getEndKeyGroup() - testKeyGroupRange.getStartKeyGroup()) / 2; + + // get two sub key-ranges so that we can restore two ranges separately + KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(testKeyGroupRange.getStartKeyGroup(), midpoint); + KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(midpoint + 1, testKeyGroupRange.getEndKeyGroup()); + + // get two different keys, one per sub range + int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, maxParallelism); + int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, maxParallelism); + + keyContext.setCurrentKey(key1); + + timerService.registerProcessingTimeTimer("ciao", 10); + timerService.registerEventTimeTimer("hello", 10); + + keyContext.setCurrentKey(key2); + + timerService.registerEventTimeTimer("ciao", 10); + timerService.registerProcessingTimeTimer("hello", 10); + + assertEquals(2, timerService.numProcessingTimeTimers()); + assertEquals(1, timerService.numProcessingTimeTimers("hello")); + assertEquals(1, timerService.numProcessingTimeTimers("ciao")); + assertEquals(2, timerService.numEventTimeTimers()); + assertEquals(1, timerService.numEventTimeTimers("hello")); + assertEquals(1, timerService.numEventTimeTimers("ciao")); + + // one map per sub key-group range + Map<Integer, byte[]> snapshot1 = new HashMap<>(); + Map<Integer, byte[]> snapshot2 = new HashMap<>(); + for (Integer keyGroupIndex : testKeyGroupRange) { + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + timerService.snapshotTimersForKeyGroup(new DataOutputViewStreamWrapper(outStream), keyGroupIndex); + outStream.close(); + if (subKeyGroupRange1.contains(keyGroupIndex)) { + snapshot1.put(keyGroupIndex, outStream.toByteArray()); + } else if (subKeyGroupRange2.contains(keyGroupIndex)) { + snapshot2.put(keyGroupIndex, outStream.toByteArray()); + } else { + throw new IllegalStateException("Key-Group index doesn't belong to any sub range."); + } + } + + // from now on we need everything twice. once per sub key-group range + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable1 = mock(Triggerable.class); + + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable2 = mock(Triggerable.class); + + + TestKeyContext keyContext1 = new TestKeyContext(); + TestKeyContext keyContext2 = new TestKeyContext(); + + TestProcessingTimeService processingTimeService1 = new TestProcessingTimeService(); + TestProcessingTimeService processingTimeService2 = new TestProcessingTimeService(); + + HeapInternalTimerService<Integer, String> timerService1 = restoreTimerService( + snapshot1, + mockTriggerable1, + keyContext1, + processingTimeService1, + subKeyGroupRange1, + maxParallelism); + + HeapInternalTimerService<Integer, String> timerService2 = restoreTimerService( + snapshot2, + mockTriggerable2, + keyContext2, + processingTimeService2, + subKeyGroupRange2, + maxParallelism); + + + processingTimeService1.setCurrentTime(10); + timerService1.advanceWatermark(10); + + verify(mockTriggerable1, times(1)).onProcessingTime(anyInternalTimer()); + verify(mockTriggerable1, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable1, never()).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello"))); + verify(mockTriggerable1, times(1)).onEventTime(anyInternalTimer()); + verify(mockTriggerable1, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable1, never()).onEventTime(eq(new InternalTimer<>(10, key2, "ciao"))); + + assertEquals(0, timerService1.numEventTimeTimers()); + + processingTimeService2.setCurrentTime(10); + timerService2.advanceWatermark(10); + + verify(mockTriggerable2, times(1)).onProcessingTime(anyInternalTimer()); + verify(mockTriggerable2, never()).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello"))); + verify(mockTriggerable2, times(1)).onEventTime(anyInternalTimer()); + verify(mockTriggerable2, never()).onEventTime(eq(new InternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao"))); + + assertEquals(0, timerService2.numEventTimeTimers()); + } private static class TestKeyContext implements KeyContext {
