dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1247805182
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -172,19 +186,21 @@ public List<Record> build(TopicsImage topicsImage) { }); // Add subscription metadata. - Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>(); - members.forEach((memberId, member) -> { - member.subscribedTopicNames().forEach(topicName -> { - TopicImage topicImage = topicsImage.getTopic(topicName); - if (topicImage != null) { - subscriptionMetadata.put(topicName, new TopicMetadata( - topicImage.id(), - topicImage.name(), - topicImage.partitions().size() - )); - } + if (subscriptionMetadata == null) { Review Comment: Most of tests are just fine with the auto-generated subscription metadata. However, the new ones need specific subscription metadata to verify the check. This is why I extended this builder to either accept the subscription metadata to use or to auto-generate it. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ########## @@ -825,4 +828,60 @@ public void testClose() throws Exception { assertFutureThrows(write1, NotCoordinatorException.class); assertFutureThrows(write2, NotCoordinatorException.class); } + + @Test + public void testOnNewMetadataImage() { + TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); + TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); + + MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); + MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); + + CoordinatorRuntime<MockCoordinator, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinator, String>() + .withLoader(loader) + .withEventProcessor(new MockEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorBuilderSupplier(supplier) + .build(); + + MockCoordinator coordinator0 = mock(MockCoordinator.class); + MockCoordinator coordinator1 = mock(MockCoordinator.class); + + when(supplier.get()).thenReturn(builder); + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.build()) + .thenReturn(coordinator0) + .thenReturn(coordinator1); + + CompletableFuture<Void> future0 = new CompletableFuture<>(); + when(loader.load(tp0, coordinator0)).thenReturn(future0); + + CompletableFuture<Void> future1 = new CompletableFuture<>(); + when(loader.load(tp1, coordinator1)).thenReturn(future1); + + runtime.scheduleLoadOperation(tp0, 0); + runtime.scheduleLoadOperation(tp1, 0); + + // Coordinator 0 is loaded. It should get the current image + // that is the empty one. + future0.complete(null); + verify(coordinator0).onLoaded(MetadataImage.EMPTY); + + // Publish a new image. + MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); + MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY); + runtime.onNewMetadataImage(newImage, delta); + + // Coordinator 0 should be notified about it. + verify(coordinator0).onNewMetadataImage(newImage, delta); Review Comment: We cannot test this here because the runtime is not aware of the concrete implementation of the state machine. I also want to ensure that we are on the same page. The metadata image is updated when `onNewMetadataImage` is called but the subscription metadata is refreshed on the next heartbeat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org