jeffkbkim commented on code in PR #14417: URL: https://github.com/apache/kafka/pull/14417#discussion_r1350909602
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ########## @@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws InterruptedException { assertEquals(1, cnt.get()); assertEquals(0, ctx.timer.size()); } + + @Test + public void testStateChanges() throws Exception { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withLoader(loader) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(runtimeMetrics) + .build(); + + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.withTime(any())).thenReturn(builder); + when(builder.withTimer(any())).thenReturn(builder); + when(builder.withTopicPartition(any())).thenReturn(builder); + when(builder.build()).thenReturn(coordinator); + when(supplier.get()).thenReturn(builder); + CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>(); + when(loader.load(TP, coordinator)).thenReturn(future); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 0); + + // Getting the context succeeds and the coordinator should be in loading. + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(LOADING, ctx.state); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, LOADING); + + // When the loading fails, the coordinator transitions to failed. + future.completeExceptionally(new Exception("failure")); + assertEquals(FAILED, ctx.state); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, FAILED); + + // Start loading a new topic partition. + TopicPartition tp = new TopicPartition("__consumer_offsets", 1); + future = new CompletableFuture<>(); + when(loader.load(tp, coordinator)).thenReturn(future); + // Schedule the loading. + runtime.scheduleLoadOperation(tp, 0); + // Getting the context succeeds and the coordinator should be in loading. + ctx = runtime.contextOrThrow(tp); + assertEquals(LOADING, ctx.state); + verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, LOADING); + + // When the loading completes, the coordinator transitions to active. + future.complete(null); + assertEquals(ACTIVE, ctx.state); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, ACTIVE); + + runtime.close(); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, CLOSED); Review Comment: We actually don't have the closed metric (and initial metric) anymore. This was removed in https://github.com/apache/kafka/pull/14417#discussion_r1334548382 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org