squah-confluent commented on code in PR #21263:
URL: https://github.com/apache/kafka/pull/21263#discussion_r2668791154
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -3151,107 +3153,73 @@ public void
testCompleteTransactionWithUnexpectedPartition() {
}
@Test
- public void testOnPartitionsDeleted() {
- CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ public void testOnMetadataUpdateSchedulesOperationsWhenTopicsDeleted()
throws Exception {
+ var runtime = mockRuntime();
+ var service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
service.startup(() -> 3);
- MetadataImage image = new MetadataImageBuilder()
- .addTopic(Uuid.randomUuid(), "foo", 1)
+ var topicId = Uuid.randomUuid();
+ var initialImage = new MetadataImageBuilder()
+ .addTopic(topicId, "foo", 1)
.build();
- service.onMetadataUpdate(new MetadataDelta(image), image);
+ // Create a delta that deletes the topic.
+ var delta = new MetadataDelta(initialImage);
+ delta.replay(new RemoveTopicRecord().setTopicId(topicId));
+ var newImage = delta.apply(new MetadataProvenance(1, 0, 0L, true));
+
+ // Use incomplete futures to verify method blocks.
+ var offsetFutures = List.of(
+ new CompletableFuture<>(),
+ new CompletableFuture<>(),
+ new CompletableFuture<>()
+ );
+ var shareFutures = List.of(
+ new CompletableFuture<>(),
+ new CompletableFuture<>(),
+ new CompletableFuture<>()
+ );
when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("on-partition-deleted"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
- )).thenReturn(Arrays.asList(
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null),
-
FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
- ));
+ )).thenReturn(offsetFutures);
when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
- )).thenReturn(Arrays.asList(
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null),
- CompletableFuture.completedFuture(null)
- ));
-
- // The exception is logged and swallowed.
- assertDoesNotThrow(() ->
- service.onPartitionsDeleted(
- List.of(new TopicPartition("foo", 0)),
- BufferSupplier.NO_CACHING
- )
- );
- }
-
- @Test
- public void testOnPartitionsDeletedWhenServiceIsNotStarted() {
Review Comment:
Do we have an equivalent test for `onMetadataUpdate`? I couldn't find one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]