xiaoqingwanga commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1643456559


##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -309,49 +300,52 @@ public void testCanReprocessSkippedRecords() throws 
InterruptedException {
         // explicitly re-adding the records since MockConsumer drops them on 
poll.
         addRecord(consumer, metadataPartition, tpId1, 0);
         addRecord(consumer, metadataPartition, tpId0, 1);
+        consumerTask.ingestRecords();
         // Waiting for all metadata records to be re-read from the first 
metadata partition number
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+        assertEquals(Optional.of(1L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
         // Verifying that all the metadata records from the first metadata 
partition were processed properly.
-        TestUtils.waitForCondition(() -> handler.metadataCounter == 2, 
"Couldn't read record");
+        assertEquals(2, handler.metadataCounter);
     }
 
     @Test
-    public void testMaybeMarkUserPartitionsAsReady() throws 
InterruptedException {
+    public void testMaybeMarkUserPartitionsAsReady() {
         final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
         final int metadataPartition = partitioner.metadataPartition(tpId);
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 2L));
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
-        thread.start();
+        consumerTask.ingestRecords();
 
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+        assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + 
tpId + " has not been assigned");
         
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
         assertFalse(handler.isPartitionInitialized.containsKey(tpId));
         IntStream.range(0, 5).forEach(offset -> addRecord(consumer, 
metadataPartition, tpId, offset));
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)),
 "Couldn't read record");
+        consumerTask.ingestRecords();
+        assertEquals(Optional.of(4L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
         assertTrue(handler.isPartitionInitialized.get(tpId));
     }
 
     @ParameterizedTest
     @CsvSource(value = {"0, 0", "500, 500"})
-    public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long 
beginOffset,
-                                                                  long 
endOffset) throws InterruptedException {
+    public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long 
beginOffset, long endOffset) {
         final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
         final int metadataPartition = partitioner.metadataPartition(tpId);
         
consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 beginOffset));
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 endOffset));
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
-        thread.start();
+        consumerTask.ingestRecords();
 
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+        assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + 
tpId + " has not been assigned");
         
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
-        TestUtils.waitForCondition(() -> 
handler.isPartitionInitialized.containsKey(tpId),
-            "should have initialized the partition");
+        assertTrue(handler.isPartitionInitialized.containsKey(tpId), "Should 
have initialized the partition");
         
assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
     }
 
     @Test
     public void testConcurrentAccess() throws InterruptedException {
-        thread.start();
+        // Here we need to test concurrent access. When ConsumerTask is 
ingesting records,
+        // we need to concurrently add partitions and perform close()
+        new Thread(consumerTask).start();

Review Comment:
   I don't necessarily think it needs join(), because this thread will be shut 
down with close(). Even if, for some reason, it doesn't get closed, the 
resources will be freed when the test ends anyway. 
   Just to be extra careful, I'll give it a try.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to