This is an automated email from the ASF dual-hosted git repository. gharris pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 64702bcf6f8 KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops (#16303) 64702bcf6f8 is described below commit 64702bcf6f883d266ccffcec458b4c3c0706ad75 Author: Wang Xiaoqing <w...@outlook.com> AuthorDate: Fri Jun 21 03:35:14 2024 +0800 KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops (#16303) Reviewers: Greg Harris <greg.har...@aiven.io> --- .../log/remote/metadata/storage/ConsumerTask.java | 50 ++++--- .../remote/metadata/storage/ConsumerTaskTest.java | 154 +++++++++++---------- 2 files changed, 110 insertions(+), 94 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java index 593d84f7273..a0376823f34 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java @@ -123,33 +123,43 @@ class ConsumerTask implements Runnable, Closeable { public void run() { log.info("Starting consumer task thread."); while (!isClosed) { - try { - if (hasAssignmentChanged) { - maybeWaitForPartitionAssignments(); - } + ingestRecords(); + } + closeConsumer(); + log.info("Exited from consumer task thread"); + } - log.trace("Polling consumer to receive remote log metadata topic records"); - final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs)); - for (ConsumerRecord<byte[], byte[]> record : consumerRecords) { - processConsumerRecord(record); - } - maybeMarkUserPartitionsAsReady(); - } catch (final WakeupException ex) { - // ignore logging the error - isClosed = true; - } catch (final RetriableException ex) { - log.warn("Retriable error occurred while processing the records. Retrying...", ex); - } catch (final Exception ex) { - isClosed = true; - log.error("Error occurred while processing the records", ex); + // public for testing + public void ingestRecords() { + try { + if (hasAssignmentChanged) { + maybeWaitForPartitionAssignments(); + } + + log.trace("Polling consumer to receive remote log metadata topic records"); + final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs)); + for (ConsumerRecord<byte[], byte[]> record : consumerRecords) { + processConsumerRecord(record); } + maybeMarkUserPartitionsAsReady(); + } catch (final WakeupException ex) { + // ignore logging the error + isClosed = true; + } catch (final RetriableException ex) { + log.warn("Retriable error occurred while processing the records. Retrying...", ex); + } catch (final Exception ex) { + isClosed = true; + log.error("Error occurred while processing the records", ex); } + } + + // public for testing + public void closeConsumer() { try { consumer.close(Duration.ofSeconds(30)); } catch (final Exception e) { log.error("Error encountered while closing the consumer", e); } - log.info("Exited from consumer task thread"); } private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) { @@ -469,4 +479,4 @@ class ConsumerTask implements Runnable, Closeable { '}'; } } -} +} \ No newline at end of file diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java index a29316d8da3..66176c68477 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.server.log.remote.metadata.storage; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicIdPartition; @@ -33,8 +32,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; -import org.apache.kafka.test.TestCondition; -import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -69,9 +66,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; public class ConsumerTaskTest { @@ -85,41 +79,40 @@ public class ConsumerTaskTest { private ConsumerTask consumerTask; private MockConsumer<byte[], byte[]> consumer; - private Thread thread; @BeforeEach public void beforeEach() { final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream() .collect(Collectors.toMap(Function.identity(), e -> 0L)); - consumer = spy(new MockConsumer<>(OffsetResetStrategy.EARLIEST)); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); consumer.updateBeginningOffsets(offsets); consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, Time.SYSTEM); - thread = new Thread(consumerTask); } @AfterEach - public void afterEach() throws InterruptedException { - if (thread != null) { - assertDoesNotThrow(() -> consumerTask.close(), "Close method threw exception"); - thread.join(10_000); - assertFalse(thread.isAlive(), "Consumer task thread is still alive"); - } + public void afterEach() { + assertDoesNotThrow(() -> consumerTask.close(), "Close method threw exception"); + assertDoesNotThrow(() -> consumerTask.closeConsumer(), "CloseConsumer method threw exception"); + assertTrue(consumer.closed()); } /** * Tests that the consumer task shuts down gracefully when there were no assignments. */ @Test - public void testCloseOnNoAssignment() throws InterruptedException { - thread.start(); - Thread.sleep(10); + public void testCloseOnNoAssignment() { assertDoesNotThrow(() -> consumerTask.close(), "Close method threw exception"); + assertDoesNotThrow(() -> consumerTask.closeConsumer(), "CloseConsumer method threw exception"); } @Test public void testIdempotentClose() { - thread.start(); + // Go through the closure process consumerTask.close(); + consumerTask.closeConsumer(); + + // Go through the closure process again + // Note: After ConsumerTask is closed, the second close() normally does not call closeConsumer() again consumerTask.close(); } @@ -137,46 +130,47 @@ public class ConsumerTaskTest { } @Test - public void testAddAssignmentsForPartitions() throws InterruptedException { + public void testAddAssignmentsForPartitions() { final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 3); final Map<TopicPartition, Long> endOffsets = idPartitions.stream() .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); consumer.updateEndOffsets(endOffsets); consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions)); - thread.start(); + consumerTask.ingestRecords(); for (final TopicIdPartition idPartition : idPartitions) { - TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + idPartition + " to be assigned"); + assertTrue(consumerTask.isUserPartitionAssigned(idPartition), "Partition " + idPartition + " has not been assigned"); assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition))); assertTrue(handler.isPartitionLoaded.get(idPartition)); } } @Test - public void testRemoveAssignmentsForPartitions() throws InterruptedException { + public void testRemoveAssignmentsForPartitions() { final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 3); final Map<TopicPartition, Long> endOffsets = allPartitions.stream() .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); consumer.updateEndOffsets(endOffsets); consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions)); - thread.start(); + consumerTask.ingestRecords(); final TopicIdPartition tpId = allPartitions.get(0); - TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " to be assigned"); + assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + tpId + " has not been assigned"); addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0); - TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(), - "Couldn't read record"); + consumerTask.ingestRecords(); + assertTrue(consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent()); final Set<TopicIdPartition> removePartitions = Collections.singleton(tpId); consumerTask.removeAssignmentsForPartitions(removePartitions); + consumerTask.ingestRecords(); for (final TopicIdPartition idPartition : allPartitions) { - final TestCondition condition = () -> removePartitions.contains(idPartition) == !consumerTask.isUserPartitionAssigned(idPartition); - TestUtils.waitForCondition(condition, "Timed out waiting for " + idPartition + " to be removed"); + assertEquals(!removePartitions.contains(idPartition), consumerTask.isUserPartitionAssigned(idPartition), + "Partition " + idPartition + " has not been removed"); } for (TopicIdPartition removePartition : removePartitions) { - TestUtils.waitForCondition(() -> handler.isPartitionCleared.containsKey(removePartition), - "Timed out waiting for " + removePartition + " to be cleared"); + assertTrue(handler.isPartitionCleared.containsKey(removePartition), + "Partition " + removePartition + " has not been cleared"); } } @@ -227,7 +221,7 @@ public class ConsumerTaskTest { } @Test - public void testCanProcessRecord() throws InterruptedException { + public void testCanProcessRecord() { final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg"); final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1)); @@ -239,29 +233,34 @@ public class ConsumerTaskTest { consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 0L)); final Set<TopicIdPartition> assignments = Collections.singleton(tpId0); consumerTask.addAssignmentsForPartitions(assignments); - thread.start(); - TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + " to be assigned"); + consumerTask.ingestRecords(); + assertTrue(consumerTask.isUserPartitionAssigned(tpId0), "Partition " + tpId0 + " has not been assigned"); addRecord(consumer, metadataPartition, tpId0, 0); addRecord(consumer, metadataPartition, tpId0, 1); - TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record"); + consumerTask.ingestRecords(); + assertEquals(Optional.of(1L), consumerTask.readOffsetForMetadataPartition(metadataPartition), "Partition offset did not reach expected value of 1"); assertEquals(2, handler.metadataCounter); // should only read the tpId1 records consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1)); - TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + " to be assigned"); + consumerTask.ingestRecords(); + assertTrue(consumerTask.isUserPartitionAssigned(tpId1), "Partition " + tpId1 + " has not been assigned"); + addRecord(consumer, metadataPartition, tpId1, 2); - TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(2L)), "Couldn't read record"); + consumerTask.ingestRecords(); + assertEquals(Optional.of(2L), consumerTask.readOffsetForMetadataPartition(metadataPartition), "Partition offset did not reach expected value of 2"); assertEquals(3, handler.metadataCounter); // shouldn't read tpId2 records because it's not assigned addRecord(consumer, metadataPartition, tpId2, 3); - TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L)), "Couldn't read record"); + consumerTask.ingestRecords(); + assertEquals(Optional.of(3L), consumerTask.readOffsetForMetadataPartition(metadataPartition), "Partition offset did not reach expected value of 3"); assertEquals(3, handler.metadataCounter); } @Test - public void testCanReprocessSkippedRecords() throws InterruptedException { + public void testCanReprocessSkippedRecords() { final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg"); final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1)); @@ -272,33 +271,26 @@ public class ConsumerTaskTest { final int metadataPartition = partitioner.metadataPartition(tpId0); final int anotherMetadataPartition = partitioner.metadataPartition(tpId3); - // Mocking the consumer to be able to wait for the second reassignment - doAnswer(invocation -> { - if (consumerTask.isUserPartitionAssigned(tpId1) && !consumerTask.isUserPartitionAssigned(tpId3)) { - return ConsumerRecords.empty(); - } else { - return invocation.callRealMethod(); - } - }).when(consumer).poll(any()); - consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 0L)); consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(anotherMetadataPartition), 0L)); final Set<TopicIdPartition> assignments = Collections.singleton(tpId0); consumerTask.addAssignmentsForPartitions(assignments); - thread.start(); - TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + " to be assigned"); + consumerTask.ingestRecords(); + assertTrue(consumerTask.isUserPartitionAssigned(tpId0), "Partition " + tpId0 + " has not been assigned"); // Adding metadata records in the order opposite to the order of assignments addRecord(consumer, metadataPartition, tpId1, 0); addRecord(consumer, metadataPartition, tpId0, 1); - TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record"); + consumerTask.ingestRecords(); + assertEquals(Optional.of(1L), consumerTask.readOffsetForMetadataPartition(metadataPartition)); // Only one record is processed, tpId1 record is skipped as unassigned // but read offset is 1 e.g., record for tpId1 has been read by consumer assertEquals(1, handler.metadataCounter); // Adding assignment for tpId1 after related metadata records have already been read consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1)); - TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + " to be assigned"); + consumerTask.ingestRecords(); + assertTrue(consumerTask.isUserPartitionAssigned(tpId1), "Partition " + tpId1 + " has not been assigned"); // Adding assignment for tpId0 to trigger the reset to last read offset // and assignment for tpId3 that has different metadata partition to trigger the update of metadata snapshot @@ -309,49 +301,53 @@ public class ConsumerTaskTest { // explicitly re-adding the records since MockConsumer drops them on poll. addRecord(consumer, metadataPartition, tpId1, 0); addRecord(consumer, metadataPartition, tpId0, 1); + consumerTask.ingestRecords(); // Waiting for all metadata records to be re-read from the first metadata partition number - TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record"); + assertEquals(Optional.of(1L), consumerTask.readOffsetForMetadataPartition(metadataPartition)); // Verifying that all the metadata records from the first metadata partition were processed properly. - TestUtils.waitForCondition(() -> handler.metadataCounter == 2, "Couldn't read record"); + assertEquals(2, handler.metadataCounter); } @Test - public void testMaybeMarkUserPartitionsAsReady() throws InterruptedException { + public void testMaybeMarkUserPartitionsAsReady() { final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0); final int metadataPartition = partitioner.metadataPartition(tpId); consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 2L)); consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId)); - thread.start(); + consumerTask.ingestRecords(); - TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned"); + assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + tpId + " has not been assigned"); assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition)); assertFalse(handler.isPartitionInitialized.containsKey(tpId)); IntStream.range(0, 5).forEach(offset -> addRecord(consumer, metadataPartition, tpId, offset)); - TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)), "Couldn't read record"); + consumerTask.ingestRecords(); + assertEquals(Optional.of(4L), consumerTask.readOffsetForMetadataPartition(metadataPartition)); assertTrue(handler.isPartitionInitialized.get(tpId)); } @ParameterizedTest @CsvSource(value = {"0, 0", "500, 500"}) - public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long beginOffset, - long endOffset) throws InterruptedException { + public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long beginOffset, long endOffset) { final TopicIdPartition tpId = getIdPartitions("world", 1).get(0); final int metadataPartition = partitioner.metadataPartition(tpId); consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), beginOffset)); consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), endOffset)); consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId)); - thread.start(); + consumerTask.ingestRecords(); - TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned"); + assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + tpId + " has not been assigned"); assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition)); - TestUtils.waitForCondition(() -> handler.isPartitionInitialized.containsKey(tpId), - "should have initialized the partition"); + assertTrue(handler.isPartitionInitialized.containsKey(tpId), "Should have initialized the partition"); assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent()); } @Test public void testConcurrentAccess() throws InterruptedException { + // Here we need to test concurrent access. When ConsumerTask is ingesting records, + // we need to concurrently add partitions and perform close() + Thread thread = new Thread(consumerTask); thread.start(); + final CountDownLatch latch = new CountDownLatch(1); final TopicIdPartition tpId = getIdPartitions("concurrent", 1).get(0); consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(partitioner.metadataPartition(tpId)), 0L)); @@ -377,41 +373,51 @@ public class ConsumerTaskTest { latch.countDown(); assignmentThread.join(); closeThread.join(); + + thread.join(10_000); + assertFalse(thread.isAlive(), "Consumer task thread is still alive"); } @Test - public void testConsumerShouldNotCloseOnRetriableError() throws InterruptedException { + public void testConsumerShouldNotCloseOnRetriableError() { final TopicIdPartition tpId = getIdPartitions("world", 1).get(0); final int metadataPartition = partitioner.metadataPartition(tpId); consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 1L)); consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId)); - thread.start(); + consumerTask.ingestRecords(); - TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned"); + assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + tpId + " has not been assigned"); assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition)); - consumer.setPollException(new LeaderNotAvailableException("leader not available!")); + consumer.setPollException(new LeaderNotAvailableException("Leader not available!")); + consumerTask.ingestRecords(); addRecord(consumer, metadataPartition, tpId, 0); + consumerTask.ingestRecords(); consumer.setPollException(new TimeoutException("Not able to complete the operation within the timeout")); + consumerTask.ingestRecords(); addRecord(consumer, metadataPartition, tpId, 1); + consumerTask.ingestRecords(); - TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record"); + assertEquals(Optional.of(1L), consumerTask.readOffsetForMetadataPartition(metadataPartition)); assertEquals(2, handler.metadataCounter); } @Test - public void testConsumerShouldCloseOnNonRetriableError() throws InterruptedException { + public void testConsumerShouldCloseOnNonRetriableError() { final TopicIdPartition tpId = getIdPartitions("world", 1).get(0); final int metadataPartition = partitioner.metadataPartition(tpId); consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 1L)); consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId)); - thread.start(); + consumerTask.ingestRecords(); - TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned"); + assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + tpId + " has not been assigned"); assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition)); consumer.setPollException(new AuthorizationException("Unauthorized to read the topic!")); - TestUtils.waitForCondition(() -> consumer.closed(), "Should close the consume on non-retriable error"); + // Due to the exception set up earlier, calling run() will trigger an exception and close the Consumer, instead of resulting in an infinite loop + // The purpose of calling run() is to validate the capability of ConsumerTask to shut down automatically + consumerTask.run(); + assertTrue(consumer.closed(), "Should close the consume on non-retriable error"); } private void addRecord(final MockConsumer<byte[], byte[]> consumer, @@ -471,4 +477,4 @@ public class ConsumerTaskTest { isPartitionLoaded.put(partition, true); } } -} +} \ No newline at end of file