xiaoqingwanga commented on code in PR #16303: URL: https://github.com/apache/kafka/pull/16303#discussion_r1643456559
########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -309,49 +300,52 @@ public void testCanReprocessSkippedRecords() throws InterruptedException { // explicitly re-adding the records since MockConsumer drops them on poll. addRecord(consumer, metadataPartition, tpId1, 0); addRecord(consumer, metadataPartition, tpId0, 1); + consumerTask.ingestRecords(); // Waiting for all metadata records to be re-read from the first metadata partition number - TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record"); + assertEquals(Optional.of(1L), consumerTask.readOffsetForMetadataPartition(metadataPartition)); // Verifying that all the metadata records from the first metadata partition were processed properly. - TestUtils.waitForCondition(() -> handler.metadataCounter == 2, "Couldn't read record"); + assertEquals(2, handler.metadataCounter); } @Test - public void testMaybeMarkUserPartitionsAsReady() throws InterruptedException { + public void testMaybeMarkUserPartitionsAsReady() { final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0); final int metadataPartition = partitioner.metadataPartition(tpId); consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 2L)); consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId)); - thread.start(); + consumerTask.ingestRecords(); - TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned"); + assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + tpId + " has not been assigned"); assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition)); assertFalse(handler.isPartitionInitialized.containsKey(tpId)); IntStream.range(0, 5).forEach(offset -> addRecord(consumer, metadataPartition, tpId, offset)); - TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)), "Couldn't read record"); + consumerTask.ingestRecords(); + assertEquals(Optional.of(4L), consumerTask.readOffsetForMetadataPartition(metadataPartition)); assertTrue(handler.isPartitionInitialized.get(tpId)); } @ParameterizedTest @CsvSource(value = {"0, 0", "500, 500"}) - public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long beginOffset, - long endOffset) throws InterruptedException { + public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long beginOffset, long endOffset) { final TopicIdPartition tpId = getIdPartitions("world", 1).get(0); final int metadataPartition = partitioner.metadataPartition(tpId); consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), beginOffset)); consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), endOffset)); consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId)); - thread.start(); + consumerTask.ingestRecords(); - TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned"); + assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + tpId + " has not been assigned"); assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition)); - TestUtils.waitForCondition(() -> handler.isPartitionInitialized.containsKey(tpId), - "should have initialized the partition"); + assertTrue(handler.isPartitionInitialized.containsKey(tpId), "Should have initialized the partition"); assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent()); } @Test public void testConcurrentAccess() throws InterruptedException { - thread.start(); + // Here we need to test concurrent access. When ConsumerTask is ingesting records, + // we need to concurrently add partitions and perform close() + new Thread(consumerTask).start(); Review Comment: I don't necessarily think it needs join(), because this thread will be shut down with close(). Even if, for some reason, it doesn't get closed, the resources will be freed when the test ends anyway. Just to be extra careful, I'll give it a try. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org