This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 64702bcf6f8 KAFKA-16862: Refactor ConsumerTaskTest to be deterministic 
and avoid tight loops (#16303)
64702bcf6f8 is described below

commit 64702bcf6f883d266ccffcec458b4c3c0706ad75
Author: Wang Xiaoqing <w...@outlook.com>
AuthorDate: Fri Jun 21 03:35:14 2024 +0800

    KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight 
loops (#16303)
    
    Reviewers: Greg Harris <greg.har...@aiven.io>
---
 .../log/remote/metadata/storage/ConsumerTask.java  |  50 ++++---
 .../remote/metadata/storage/ConsumerTaskTest.java  | 154 +++++++++++----------
 2 files changed, 110 insertions(+), 94 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index 593d84f7273..a0376823f34 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -123,33 +123,43 @@ class ConsumerTask implements Runnable, Closeable {
     public void run() {
         log.info("Starting consumer task thread.");
         while (!isClosed) {
-            try {
-                if (hasAssignmentChanged) {
-                    maybeWaitForPartitionAssignments();
-                }
+            ingestRecords();
+        }
+        closeConsumer();
+        log.info("Exited from consumer task thread");
+    }
 
-                log.trace("Polling consumer to receive remote log metadata 
topic records");
-                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
-                for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
-                    processConsumerRecord(record);
-                }
-                maybeMarkUserPartitionsAsReady();
-            } catch (final WakeupException ex) {
-                // ignore logging the error
-                isClosed = true;
-            } catch (final RetriableException ex) {
-                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
-            } catch (final Exception ex) {
-                isClosed = true;
-                log.error("Error occurred while processing the records", ex);
+    // public for testing
+    public void ingestRecords() {
+        try {
+            if (hasAssignmentChanged) {
+                maybeWaitForPartitionAssignments();
+            }
+
+            log.trace("Polling consumer to receive remote log metadata topic 
records");
+            final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
+            for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+                processConsumerRecord(record);
             }
+            maybeMarkUserPartitionsAsReady();
+        } catch (final WakeupException ex) {
+            // ignore logging the error
+            isClosed = true;
+        } catch (final RetriableException ex) {
+            log.warn("Retriable error occurred while processing the records. 
Retrying...", ex);
+        } catch (final Exception ex) {
+            isClosed = true;
+            log.error("Error occurred while processing the records", ex);
         }
+    }
+
+    // public for testing
+    public void closeConsumer() {
         try {
             consumer.close(Duration.ofSeconds(30));
         } catch (final Exception e) {
             log.error("Error encountered while closing the consumer", e);
         }
-        log.info("Exited from consumer task thread");
     }
 
     private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
@@ -469,4 +479,4 @@ class ConsumerTask implements Runnable, Closeable {
                 '}';
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
index a29316d8da3..66176c68477 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.server.log.remote.metadata.storage;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicIdPartition;
@@ -33,8 +32,6 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
-import org.apache.kafka.test.TestCondition;
-import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -69,9 +66,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.spy;
 
 public class ConsumerTaskTest {
 
@@ -85,41 +79,40 @@ public class ConsumerTaskTest {
 
     private ConsumerTask consumerTask;
     private MockConsumer<byte[], byte[]> consumer;
-    private Thread thread;
 
     @BeforeEach
     public void beforeEach() {
         final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
             .collect(Collectors.toMap(Function.identity(), e -> 0L));
-        consumer = spy(new MockConsumer<>(OffsetResetStrategy.EARLIEST));
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
         consumer.updateBeginningOffsets(offsets);
         consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 
300_000L, Time.SYSTEM);
-        thread = new Thread(consumerTask);
     }
 
     @AfterEach
-    public void afterEach() throws InterruptedException {
-        if (thread != null) {
-            assertDoesNotThrow(() -> consumerTask.close(), "Close method threw 
exception");
-            thread.join(10_000);
-            assertFalse(thread.isAlive(), "Consumer task thread is still 
alive");
-        }
+    public void afterEach() {
+        assertDoesNotThrow(() -> consumerTask.close(), "Close method threw 
exception");
+        assertDoesNotThrow(() -> consumerTask.closeConsumer(), "CloseConsumer 
method threw exception");
+        assertTrue(consumer.closed());
     }
 
     /**
      * Tests that the consumer task shuts down gracefully when there were no 
assignments.
      */
     @Test
-    public void testCloseOnNoAssignment() throws InterruptedException {
-        thread.start();
-        Thread.sleep(10);
+    public void testCloseOnNoAssignment() {
         assertDoesNotThrow(() -> consumerTask.close(), "Close method threw 
exception");
+        assertDoesNotThrow(() -> consumerTask.closeConsumer(), "CloseConsumer 
method threw exception");
     }
 
     @Test
     public void testIdempotentClose() {
-        thread.start();
+        // Go through the closure process
         consumerTask.close();
+        consumerTask.closeConsumer();
+
+        // Go through the closure process again
+        // Note: After ConsumerTask is closed, the second close() normally 
does not call closeConsumer() again
         consumerTask.close();
     }
 
@@ -137,46 +130,47 @@ public class ConsumerTaskTest {
     }
 
     @Test
-    public void testAddAssignmentsForPartitions() throws InterruptedException {
+    public void testAddAssignmentsForPartitions() {
         final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
         final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
             .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
             .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
         consumer.updateEndOffsets(endOffsets);
         consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
-        thread.start();
+        consumerTask.ingestRecords();
         for (final TopicIdPartition idPartition : idPartitions) {
-            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");
+            assertTrue(consumerTask.isUserPartitionAssigned(idPartition), 
"Partition " + idPartition + " has not been assigned");
             
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
             assertTrue(handler.isPartitionLoaded.get(idPartition));
         }
     }
 
     @Test
-    public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+    public void testRemoveAssignmentsForPartitions() {
         final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
3);
         final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
             .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
             .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
         consumer.updateEndOffsets(endOffsets);
         consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
-        thread.start();
+        consumerTask.ingestRecords();
 
         final TopicIdPartition tpId = allPartitions.get(0);
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " 
to be assigned");
+        assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + 
tpId + " has not been assigned");
         addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
-            "Couldn't read record");
+        consumerTask.ingestRecords();
+        
assertTrue(consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent());
 
         final Set<TopicIdPartition> removePartitions = 
Collections.singleton(tpId);
         consumerTask.removeAssignmentsForPartitions(removePartitions);
+        consumerTask.ingestRecords();
         for (final TopicIdPartition idPartition : allPartitions) {
-            final TestCondition condition = () -> 
removePartitions.contains(idPartition) == 
!consumerTask.isUserPartitionAssigned(idPartition);
-            TestUtils.waitForCondition(condition, "Timed out waiting for " + 
idPartition + " to be removed");
+            assertEquals(!removePartitions.contains(idPartition), 
consumerTask.isUserPartitionAssigned(idPartition),
+                    "Partition " + idPartition + " has not been removed");
         }
         for (TopicIdPartition removePartition : removePartitions) {
-            TestUtils.waitForCondition(() -> 
handler.isPartitionCleared.containsKey(removePartition),
-                "Timed out waiting for " + removePartition + " to be cleared");
+            assertTrue(handler.isPartitionCleared.containsKey(removePartition),
+                    "Partition " + removePartition + " has not been cleared");
         }
     }
 
@@ -227,7 +221,7 @@ public class ConsumerTaskTest {
     }
 
     @Test
-    public void testCanProcessRecord() throws InterruptedException {
+    public void testCanProcessRecord() {
         final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
         final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
         final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 1));
@@ -239,29 +233,34 @@ public class ConsumerTaskTest {
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 0L));
         final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
         consumerTask.addAssignmentsForPartitions(assignments);
-        thread.start();
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + 
" to be assigned");
+        consumerTask.ingestRecords();
+        assertTrue(consumerTask.isUserPartitionAssigned(tpId0), "Partition " + 
tpId0 + " has not been assigned");
 
         addRecord(consumer, metadataPartition, tpId0, 0);
         addRecord(consumer, metadataPartition, tpId0, 1);
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+        consumerTask.ingestRecords();
+        assertEquals(Optional.of(1L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition), "Partition 
offset did not reach expected value of 1");
         assertEquals(2, handler.metadataCounter);
 
         // should only read the tpId1 records
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + 
" to be assigned");
+        consumerTask.ingestRecords();
+        assertTrue(consumerTask.isUserPartitionAssigned(tpId1), "Partition " + 
tpId1 + " has not been assigned");
+
         addRecord(consumer, metadataPartition, tpId1, 2);
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(2L)),
 "Couldn't read record");
+        consumerTask.ingestRecords();
+        assertEquals(Optional.of(2L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition), "Partition 
offset did not reach expected value of 2");
         assertEquals(3, handler.metadataCounter);
 
         // shouldn't read tpId2 records because it's not assigned
         addRecord(consumer, metadataPartition, tpId2, 3);
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L)),
 "Couldn't read record");
+        consumerTask.ingestRecords();
+        assertEquals(Optional.of(3L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition), "Partition 
offset did not reach expected value of 3");
         assertEquals(3, handler.metadataCounter);
     }
 
     @Test
-    public void testCanReprocessSkippedRecords() throws InterruptedException {
+    public void testCanReprocessSkippedRecords() {
         final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
         final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
         final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 1));
@@ -272,33 +271,26 @@ public class ConsumerTaskTest {
         final int metadataPartition = partitioner.metadataPartition(tpId0);
         final int anotherMetadataPartition = 
partitioner.metadataPartition(tpId3);
 
-        // Mocking the consumer to be able to wait for the second reassignment
-        doAnswer(invocation -> {
-            if (consumerTask.isUserPartitionAssigned(tpId1) && 
!consumerTask.isUserPartitionAssigned(tpId3)) {
-                return ConsumerRecords.empty();
-            } else {
-                return invocation.callRealMethod();
-            }
-        }).when(consumer).poll(any());
-
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 0L));
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(anotherMetadataPartition),
 0L));
         final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
         consumerTask.addAssignmentsForPartitions(assignments);
-        thread.start();
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + 
" to be assigned");
+        consumerTask.ingestRecords();
+        assertTrue(consumerTask.isUserPartitionAssigned(tpId0), "Partition " + 
tpId0 + " has not been assigned");
 
         // Adding metadata records in the order opposite to the order of 
assignments
         addRecord(consumer, metadataPartition, tpId1, 0);
         addRecord(consumer, metadataPartition, tpId0, 1);
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+        consumerTask.ingestRecords();
+        assertEquals(Optional.of(1L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
         // Only one record is processed, tpId1 record is skipped as unassigned
         // but read offset is 1 e.g., record for tpId1 has been read by 
consumer
         assertEquals(1, handler.metadataCounter);
 
         // Adding assignment for tpId1 after related metadata records have 
already been read
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + 
" to be assigned");
+        consumerTask.ingestRecords();
+        assertTrue(consumerTask.isUserPartitionAssigned(tpId1), "Partition " + 
tpId1 + " has not been assigned");
 
         // Adding assignment for tpId0 to trigger the reset to last read offset
         // and assignment for tpId3 that has different metadata partition to 
trigger the update of metadata snapshot
@@ -309,49 +301,53 @@ public class ConsumerTaskTest {
         // explicitly re-adding the records since MockConsumer drops them on 
poll.
         addRecord(consumer, metadataPartition, tpId1, 0);
         addRecord(consumer, metadataPartition, tpId0, 1);
+        consumerTask.ingestRecords();
         // Waiting for all metadata records to be re-read from the first 
metadata partition number
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+        assertEquals(Optional.of(1L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
         // Verifying that all the metadata records from the first metadata 
partition were processed properly.
-        TestUtils.waitForCondition(() -> handler.metadataCounter == 2, 
"Couldn't read record");
+        assertEquals(2, handler.metadataCounter);
     }
 
     @Test
-    public void testMaybeMarkUserPartitionsAsReady() throws 
InterruptedException {
+    public void testMaybeMarkUserPartitionsAsReady() {
         final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
         final int metadataPartition = partitioner.metadataPartition(tpId);
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 2L));
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
-        thread.start();
+        consumerTask.ingestRecords();
 
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+        assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + 
tpId + " has not been assigned");
         
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
         assertFalse(handler.isPartitionInitialized.containsKey(tpId));
         IntStream.range(0, 5).forEach(offset -> addRecord(consumer, 
metadataPartition, tpId, offset));
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)),
 "Couldn't read record");
+        consumerTask.ingestRecords();
+        assertEquals(Optional.of(4L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
         assertTrue(handler.isPartitionInitialized.get(tpId));
     }
 
     @ParameterizedTest
     @CsvSource(value = {"0, 0", "500, 500"})
-    public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long 
beginOffset,
-                                                                  long 
endOffset) throws InterruptedException {
+    public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long 
beginOffset, long endOffset) {
         final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
         final int metadataPartition = partitioner.metadataPartition(tpId);
         
consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 beginOffset));
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 endOffset));
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
-        thread.start();
+        consumerTask.ingestRecords();
 
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+        assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + 
tpId + " has not been assigned");
         
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
-        TestUtils.waitForCondition(() -> 
handler.isPartitionInitialized.containsKey(tpId),
-            "should have initialized the partition");
+        assertTrue(handler.isPartitionInitialized.containsKey(tpId), "Should 
have initialized the partition");
         
assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
     }
 
     @Test
     public void testConcurrentAccess() throws InterruptedException {
+        // Here we need to test concurrent access. When ConsumerTask is 
ingesting records,
+        // we need to concurrently add partitions and perform close()
+        Thread thread = new Thread(consumerTask);
         thread.start();
+
         final CountDownLatch latch = new CountDownLatch(1);
         final TopicIdPartition tpId = getIdPartitions("concurrent", 1).get(0);
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(partitioner.metadataPartition(tpId)),
 0L));
@@ -377,41 +373,51 @@ public class ConsumerTaskTest {
         latch.countDown();
         assignmentThread.join();
         closeThread.join();
+
+        thread.join(10_000);
+        assertFalse(thread.isAlive(), "Consumer task thread is still alive");
     }
 
     @Test
-    public void testConsumerShouldNotCloseOnRetriableError() throws 
InterruptedException {
+    public void testConsumerShouldNotCloseOnRetriableError() {
         final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
         final int metadataPartition = partitioner.metadataPartition(tpId);
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 1L));
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
-        thread.start();
+        consumerTask.ingestRecords();
 
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+        assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + 
tpId + " has not been assigned");
         
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
 
-        consumer.setPollException(new LeaderNotAvailableException("leader not 
available!"));
+        consumer.setPollException(new LeaderNotAvailableException("Leader not 
available!"));
+        consumerTask.ingestRecords();
         addRecord(consumer, metadataPartition, tpId, 0);
+        consumerTask.ingestRecords();
         consumer.setPollException(new TimeoutException("Not able to complete 
the operation within the timeout"));
+        consumerTask.ingestRecords();
         addRecord(consumer, metadataPartition, tpId, 1);
+        consumerTask.ingestRecords();
 
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+        assertEquals(Optional.of(1L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
         assertEquals(2, handler.metadataCounter);
     }
 
     @Test
-    public void testConsumerShouldCloseOnNonRetriableError() throws 
InterruptedException {
+    public void testConsumerShouldCloseOnNonRetriableError() {
         final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
         final int metadataPartition = partitioner.metadataPartition(tpId);
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 1L));
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
-        thread.start();
+        consumerTask.ingestRecords();
 
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+        assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + 
tpId + " has not been assigned");
         
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
 
         consumer.setPollException(new AuthorizationException("Unauthorized to 
read the topic!"));
-        TestUtils.waitForCondition(() -> consumer.closed(), "Should close the 
consume on non-retriable error");
+        // Due to the exception set up earlier, calling run() will trigger an 
exception and close the Consumer, instead of resulting in an infinite loop
+        // The purpose of calling run() is to validate the capability of 
ConsumerTask to shut down automatically
+        consumerTask.run();
+        assertTrue(consumer.closed(), "Should close the consume on 
non-retriable error");
     }
 
     private void addRecord(final MockConsumer<byte[], byte[]> consumer,
@@ -471,4 +477,4 @@ public class ConsumerTaskTest {
             isPartitionLoaded.put(partition, true);
         }
     }
-}
+}
\ No newline at end of file

Reply via email to