jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348389082


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws 
InterruptedException {
         assertEquals(1, cnt.get());
         assertEquals(0, ctx.timer.size());
     }
+
+    @Test
+    public void testStateChanges() throws Exception {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+        MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+        MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+        GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withLoader(loader)
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(runtimeMetrics)
+                .build();
+
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.withTime(any())).thenReturn(builder);
+        when(builder.withTimer(any())).thenReturn(builder);
+        when(builder.withTopicPartition(any())).thenReturn(builder);
+        when(builder.build()).thenReturn(coordinator);
+        when(supplier.get()).thenReturn(builder);
+        CompletableFuture<CoordinatorLoader.LoadSummary> future = new 
CompletableFuture<>();
+        when(loader.load(TP, coordinator)).thenReturn(future);
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 0);
+
+        // Getting the context succeeds and the coordinator should be in 
loading.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(LOADING, ctx.state);
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+        // When the loading fails, the coordinator transitions to failed.
+        future.completeExceptionally(new Exception("failure"));
+        assertEquals(FAILED, ctx.state);
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
FAILED);
+
+        // Start loading a new topic partition.
+        TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+        future = new CompletableFuture<>();
+        when(loader.load(tp, coordinator)).thenReturn(future);
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(tp, 0);
+        // Getting the context succeeds and the coordinator should be in 
loading.
+        ctx = runtime.contextOrThrow(tp);
+        assertEquals(LOADING, ctx.state);
+        verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+        // When the loading completes, the coordinator transitions to active.
+        future.complete(null);
+        assertEquals(ACTIVE, ctx.state);
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
ACTIVE);
+
+        runtime.close();
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, 
CLOSED);

Review Comment:
   No, the partitions (aka coordinator context) is cleared when the broker 
resigns from leadership.
   
   When a partition migrates to a different broker, we schedule an unload 
operation (`GroupCoordinatorServer#onResignation`) and remove the coordinator 
context from memory after deregistering the HWM listener and clearing its state 
(`CoordinatorRuntime.CoordinatorContext#unload`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to