jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350610255


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws 
InterruptedException {
         assertEquals(1, cnt.get());
         assertEquals(0, ctx.timer.size());
     }
+
+    @Test
+    public void testStateChanges() throws Exception {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+        MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+        MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+        GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withLoader(loader)
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(runtimeMetrics)
+                .build();
+
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.withTime(any())).thenReturn(builder);
+        when(builder.withTimer(any())).thenReturn(builder);
+        when(builder.withTopicPartition(any())).thenReturn(builder);
+        when(builder.build()).thenReturn(coordinator);
+        when(supplier.get()).thenReturn(builder);
+        CompletableFuture<CoordinatorLoader.LoadSummary> future = new 
CompletableFuture<>();
+        when(loader.load(TP, coordinator)).thenReturn(future);
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 0);
+
+        // Getting the context succeeds and the coordinator should be in 
loading.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(LOADING, ctx.state);
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+        // When the loading fails, the coordinator transitions to failed.
+        future.completeExceptionally(new Exception("failure"));
+        assertEquals(FAILED, ctx.state);
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
FAILED);
+
+        // Start loading a new topic partition.
+        TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+        future = new CompletableFuture<>();
+        when(loader.load(tp, coordinator)).thenReturn(future);
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(tp, 0);
+        // Getting the context succeeds and the coordinator should be in 
loading.
+        ctx = runtime.contextOrThrow(tp);
+        assertEquals(LOADING, ctx.state);
+        verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+        // When the loading completes, the coordinator transitions to active.
+        future.complete(null);
+        assertEquals(ACTIVE, ctx.state);
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
ACTIVE);
+
+        runtime.close();
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, 
CLOSED);

Review Comment:
   So we should see this closed metric increment when we resign leadership as 
well? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to