jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1256084088


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -893,4 +1041,250 @@ public void testOnNewMetadataImage() {
         future1.complete(null);
         verify(coordinator1).onLoaded(newImage);
     }
+
+    @Test
+    public void testScheduleTimer() throws InterruptedException {
+        MockTimer timer = new MockTimer();
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(new MockPartitionWriter())
+                .withCoordinatorBuilderSupplier(new 
MockCoordinatorBuilderSupplier())
+                .build();
+
+        // Loads the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Check initial state.
+        CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = 
runtime.contextOrThrow(TP);
+        assertEquals(0, ctx.lastWrittenOffset);
+        assertEquals(0, ctx.lastCommittedOffset);
+
+        // The coordinator timer should be empty.
+        assertEquals(0, ctx.timer.size());
+
+        // Timer #1.
+        ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS,
+            () -> Arrays.asList("record1", "record2"));
+
+        // Timer #2.
+        ctx.coordinator.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS,
+            () -> Arrays.asList("record3", "record4"));
+
+        // The coordinator timer should have two pending tasks.
+        assertEquals(2, ctx.timer.size());
+
+        // Advance time to fire timer #1,
+        timer.advanceClock(10 + 1);
+
+        // Verify that the operation was executed.
+        assertEquals(mkSet("record1", "record2"), ctx.coordinator.records());
+        assertEquals(1, ctx.timer.size());
+
+        // Advance time to fire timer #2,
+        timer.advanceClock(10 + 1);
+
+        // Verify that the operation was executed.
+        assertEquals(mkSet("record1", "record2", "record3", "record4"), 
ctx.coordinator.records());
+        assertEquals(0, ctx.timer.size());
+    }
+
+    @Test
+    public void testRescheduleTimer() throws InterruptedException {
+        MockTimer timer = new MockTimer();
+        ManualEventProcessor processor = new ManualEventProcessor();
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(processor)
+                .withPartitionWriter(new MockPartitionWriter())
+                .withCoordinatorBuilderSupplier(new 
MockCoordinatorBuilderSupplier())
+                .build();
+
+        // Loads the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Poll twice to process the pending events related to the loading.
+        processor.poll();
+        processor.poll();
+
+        // Check initial state.
+        CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = 
runtime.contextOrThrow(TP);
+        assertEquals(0, ctx.timer.size());
+
+        // The processor should be empty.
+        assertEquals(0, processor.size());
+
+        // Timer #1.
+        ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS,
+            () -> Collections.singletonList("record1"));
+
+        // The coordinator timer should have one pending task.
+        assertEquals(1, ctx.timer.size());
+
+        // Advance time to fire the pending timer.
+        timer.advanceClock(10 + 1);
+
+        // An event should be waiting in the processor.
+        assertEquals(1, processor.size());
+
+        // Schedule a second timer with the same key.
+        ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS,
+            () -> Collections.singletonList("record2"));
+
+        // The coordinator timer should still have one pending task.
+        assertEquals(1, ctx.timer.size());
+
+        // Schedule a third timer with the same key.
+        ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS,
+            () -> Collections.singletonList("record3"));
+
+        // The coordinator timer should still have one pending task.
+        assertEquals(1, ctx.timer.size());
+
+        // Advance time to fire the pending timer.
+        timer.advanceClock(10 + 1);
+
+        // Another event should be waiting in the processor.
+        assertEquals(2, processor.size());
+
+        // Poll twice to execute the two pending events.
+        assertTrue(processor.poll());
+        assertTrue(processor.poll());
+
+        // Verify that the correct operation was executed. Only the third
+        // instance should have been executed here.
+        assertEquals(mkSet("record3"), ctx.coordinator.records());
+        assertEquals(0, ctx.timer.size());
+    }
+
+    @Test
+    public void testCancelTimer() throws InterruptedException {
+        MockTimer timer = new MockTimer();
+        ManualEventProcessor processor = new ManualEventProcessor();
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(processor)
+                .withPartitionWriter(new MockPartitionWriter())
+                .withCoordinatorBuilderSupplier(new 
MockCoordinatorBuilderSupplier())
+                .build();
+
+        // Loads the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Poll twice to process the pending events related to the loading.
+        processor.poll();
+        processor.poll();
+
+        // Check initial state.
+        CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = 
runtime.contextOrThrow(TP);
+        assertEquals(0, ctx.timer.size());
+
+        // The processor should be empty.
+        assertEquals(0, processor.size());
+
+        // Timer #1.
+        ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS,
+            () -> Collections.singletonList("record1"));
+
+        // The coordinator timer should have one pending task.
+        assertEquals(1, ctx.timer.size());
+
+        // Advance time to fire the pending timer.
+        timer.advanceClock(10 + 1);
+
+        // An event should be waiting in the processor.
+        assertEquals(1, processor.size());
+
+        // Schedule a second timer with the same key.
+        ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS,
+            () -> Collections.singletonList("record2"));
+
+        // The coordinator timer should still have one pending task.
+        assertEquals(1, ctx.timer.size());
+
+        // Cancel the timer.
+        ctx.coordinator.timer.cancel("timer-1");
+
+        // The coordinator timer have no pending timers.
+        assertEquals(0, ctx.timer.size());
+
+        // Advance time to fire the cancelled timer.
+        timer.advanceClock(10 + 1);
+
+        // No new event expected because the timer was cancelled before
+        // it expired.
+        assertEquals(1, processor.size());
+
+        // Poll to execute the pending event.
+        assertTrue(processor.poll());
+
+        // Verify that no operation was executed.
+        assertEquals(Collections.emptySet(), ctx.coordinator.records());
+        assertEquals(0, ctx.timer.size());
+    }
+
+    @Test
+    public void testRetryTimer() throws InterruptedException {
+        MockTimer timer = new MockTimer();
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(new MockPartitionWriter())
+                .withCoordinatorBuilderSupplier(new 
MockCoordinatorBuilderSupplier())
+                .build();
+
+        // Loads the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Check initial state.
+        CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = 
runtime.contextOrThrow(TP);
+        assertEquals(0, ctx.timer.size());
+
+        // Timer #1.
+        AtomicInteger cnt = new AtomicInteger(0);
+        ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, 
() -> {
+            cnt.incrementAndGet();
+            throw new KafkaException("error");
+        });
+
+        // The coordinator timer should have one pending task.
+        assertEquals(1, ctx.timer.size());
+
+        // Advance time to fire the pending timer.
+        timer.advanceClock(10 + 1);
+
+        // The timer should have been called and the timer should have one 
pending task.
+        assertEquals(1, cnt.get());
+        assertEquals(1, ctx.timer.size());

Review Comment:
   Sorry I misunderstood something about where the error was thrown, but I 
understand now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to