jeffkbkim commented on code in PR #14417: URL: https://github.com/apache/kafka/pull/14417#discussion_r1348389082
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ########## @@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws InterruptedException { assertEquals(1, cnt.get()); assertEquals(0, ctx.timer.size()); } + + @Test + public void testStateChanges() throws Exception { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withLoader(loader) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(runtimeMetrics) + .build(); + + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.withTime(any())).thenReturn(builder); + when(builder.withTimer(any())).thenReturn(builder); + when(builder.withTopicPartition(any())).thenReturn(builder); + when(builder.build()).thenReturn(coordinator); + when(supplier.get()).thenReturn(builder); + CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>(); + when(loader.load(TP, coordinator)).thenReturn(future); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 0); + + // Getting the context succeeds and the coordinator should be in loading. + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(LOADING, ctx.state); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, LOADING); + + // When the loading fails, the coordinator transitions to failed. + future.completeExceptionally(new Exception("failure")); + assertEquals(FAILED, ctx.state); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, FAILED); + + // Start loading a new topic partition. + TopicPartition tp = new TopicPartition("__consumer_offsets", 1); + future = new CompletableFuture<>(); + when(loader.load(tp, coordinator)).thenReturn(future); + // Schedule the loading. + runtime.scheduleLoadOperation(tp, 0); + // Getting the context succeeds and the coordinator should be in loading. + ctx = runtime.contextOrThrow(tp); + assertEquals(LOADING, ctx.state); + verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, LOADING); + + // When the loading completes, the coordinator transitions to active. + future.complete(null); + assertEquals(ACTIVE, ctx.state); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, ACTIVE); + + runtime.close(); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, CLOSED); Review Comment: No, the partitions (aka coordinator context) is cleared when the broker resigns from leadership. When a partition migrates to a different broker, we schedule an unload operation (`GroupCoordinatorServer#onResignation`) and remove the coordinator context from memory after deregistering the HWM listener and clearing its state (`CoordinatorRuntime.CoordinatorContext#unload`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org