jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248100591


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
         assertFutureThrows(write1, NotCoordinatorException.class);
         assertFutureThrows(write2, NotCoordinatorException.class);
     }
+
+    @Test
+    public void testOnNewMetadataImage() {
+        TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+        TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorBuilderSupplier supplier = 
mock(MockCoordinatorBuilderSupplier.class);
+        MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withLoader(loader)
+                .withEventProcessor(new MockEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorBuilderSupplier(supplier)
+                .build();
+
+        MockCoordinator coordinator0 = mock(MockCoordinator.class);
+        MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+        when(supplier.get()).thenReturn(builder);
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.build())
+            .thenReturn(coordinator0)
+            .thenReturn(coordinator1);
+
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+        when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+        CompletableFuture<Void> future1 = new CompletableFuture<>();
+        when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+        runtime.scheduleLoadOperation(tp0, 0);
+        runtime.scheduleLoadOperation(tp1, 0);
+
+        // Coordinator 0 is loaded. It should get the current image
+        // that is the empty one.
+        future0.complete(null);
+        verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+        // Publish a new image.
+        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+        runtime.onNewMetadataImage(newImage, delta);
+
+        // Coordinator 0 should be notified about it.
+        verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   Sorry I found it after posting. I should have looked a little longer. 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to