Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-18 Thread via GitHub


gharris1727 commented on PR #16303:
URL: https://github.com/apache/kafka/pull/16303#issuecomment-2176717352

   Thanks @xiaoqingwanga this looks good. Can you fix up the build errors? I 
think it just requires a whitespace change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-17 Thread via GitHub


xiaoqingwanga commented on PR #16303:
URL: https://github.com/apache/kafka/pull/16303#issuecomment-2174489650

   Hey, @gharris1727 , The code has been committed, please review it again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-17 Thread via GitHub


xiaoqingwanga commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1643456559


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -309,49 +300,52 @@ public void testCanReprocessSkippedRecords() throws 
InterruptedException {
 // explicitly re-adding the records since MockConsumer drops them on 
poll.
 addRecord(consumer, metadataPartition, tpId1, 0);
 addRecord(consumer, metadataPartition, tpId0, 1);
+consumerTask.ingestRecords();
 // Waiting for all metadata records to be re-read from the first 
metadata partition number
-TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+assertEquals(Optional.of(1L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
 // Verifying that all the metadata records from the first metadata 
partition were processed properly.
-TestUtils.waitForCondition(() -> handler.metadataCounter == 2, 
"Couldn't read record");
+assertEquals(2, handler.metadataCounter);
 }
 
 @Test
-public void testMaybeMarkUserPartitionsAsReady() throws 
InterruptedException {
+public void testMaybeMarkUserPartitionsAsReady() {
 final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
 final int metadataPartition = partitioner.metadataPartition(tpId);
 
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 2L));
 consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
-thread.start();
+consumerTask.ingestRecords();
 
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + 
tpId + " has not been assigned");
 
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
 assertFalse(handler.isPartitionInitialized.containsKey(tpId));
 IntStream.range(0, 5).forEach(offset -> addRecord(consumer, 
metadataPartition, tpId, offset));
-TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)),
 "Couldn't read record");
+consumerTask.ingestRecords();
+assertEquals(Optional.of(4L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
 assertTrue(handler.isPartitionInitialized.get(tpId));
 }
 
 @ParameterizedTest
 @CsvSource(value = {"0, 0", "500, 500"})
-public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long 
beginOffset,
-  long 
endOffset) throws InterruptedException {
+public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long 
beginOffset, long endOffset) {
 final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
 final int metadataPartition = partitioner.metadataPartition(tpId);
 
consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 beginOffset));
 
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 endOffset));
 consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
-thread.start();
+consumerTask.ingestRecords();
 
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + 
tpId + " has not been assigned");
 
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
-TestUtils.waitForCondition(() -> 
handler.isPartitionInitialized.containsKey(tpId),
-"should have initialized the partition");
+assertTrue(handler.isPartitionInitialized.containsKey(tpId), "Should 
have initialized the partition");
 
assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
 }
 
 @Test
 public void testConcurrentAccess() throws InterruptedException {
-thread.start();
+// Here we need to test concurrent access. When ConsumerTask is 
ingesting records,
+// we need to concurrently add partitions and perform close()
+new Thread(consumerTask).start();

Review Comment:
   I don't necessarily think it needs join(), because this thread will be shut 
down with close(). Even if, for some reason, it doesn't get closed, the 
resources will be freed when the test ends anyway. 
   Just to be extra careful, I'll give it a try.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the 

Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-17 Thread via GitHub


gharris1727 commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1643026116


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -309,49 +300,52 @@ public void testCanReprocessSkippedRecords() throws 
InterruptedException {
 // explicitly re-adding the records since MockConsumer drops them on 
poll.
 addRecord(consumer, metadataPartition, tpId1, 0);
 addRecord(consumer, metadataPartition, tpId0, 1);
+consumerTask.ingestRecords();
 // Waiting for all metadata records to be re-read from the first 
metadata partition number
-TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+assertEquals(Optional.of(1L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
 // Verifying that all the metadata records from the first metadata 
partition were processed properly.
-TestUtils.waitForCondition(() -> handler.metadataCounter == 2, 
"Couldn't read record");
+assertEquals(2, handler.metadataCounter);
 }
 
 @Test
-public void testMaybeMarkUserPartitionsAsReady() throws 
InterruptedException {
+public void testMaybeMarkUserPartitionsAsReady() {
 final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
 final int metadataPartition = partitioner.metadataPartition(tpId);
 
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 2L));
 consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
-thread.start();
+consumerTask.ingestRecords();
 
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + 
tpId + " has not been assigned");
 
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
 assertFalse(handler.isPartitionInitialized.containsKey(tpId));
 IntStream.range(0, 5).forEach(offset -> addRecord(consumer, 
metadataPartition, tpId, offset));
-TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)),
 "Couldn't read record");
+consumerTask.ingestRecords();
+assertEquals(Optional.of(4L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
 assertTrue(handler.isPartitionInitialized.get(tpId));
 }
 
 @ParameterizedTest
 @CsvSource(value = {"0, 0", "500, 500"})
-public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long 
beginOffset,
-  long 
endOffset) throws InterruptedException {
+public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long 
beginOffset, long endOffset) {
 final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
 final int metadataPartition = partitioner.metadataPartition(tpId);
 
consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 beginOffset));
 
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 endOffset));
 consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
-thread.start();
+consumerTask.ingestRecords();
 
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " + 
tpId + " has not been assigned");
 
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
-TestUtils.waitForCondition(() -> 
handler.isPartitionInitialized.containsKey(tpId),
-"should have initialized the partition");
+assertTrue(handler.isPartitionInitialized.containsKey(tpId), "Should 
have initialized the partition");
 
assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
 }
 
 @Test
 public void testConcurrentAccess() throws InterruptedException {
-thread.start();
+// Here we need to test concurrent access. When ConsumerTask is 
ingesting records,
+// we need to concurrently add partitions and perform close()
+new Thread(consumerTask).start();

Review Comment:
   Please store this thread in a variable and join it like the others to make 
sure it is stopped before the end of the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact 

Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-15 Thread via GitHub


xiaoqingwanga commented on PR #16303:
URL: https://github.com/apache/kafka/pull/16303#issuecomment-2169733225

   Recovering all isUserPartitionAssigned() is a big deal. They are all done. 
Please take a look again, thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-14 Thread via GitHub


gharris1727 commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1640209104


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -84,42 +78,41 @@ public class ConsumerTaskTest {
 
 private ConsumerTask consumerTask;
 private MockConsumer consumer;
-private Thread thread;
 
 @BeforeEach
 public void beforeEach() {
 final Map offsets = remoteLogPartitions.stream()
 .collect(Collectors.toMap(Function.identity(), e -> 0L));
-consumer = spy(new MockConsumer<>(OffsetResetStrategy.EARLIEST));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
 consumer.updateBeginningOffsets(offsets);
 consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 
300_000L, Time.SYSTEM);
-thread = new Thread(consumerTask);
 }
 
 @AfterEach
-public void afterEach() throws InterruptedException {
-if (thread != null) {
-assertDoesNotThrow(() -> consumerTask.close(), "Close method threw 
exception");
-thread.join(10_000);
-assertFalse(thread.isAlive(), "Consumer task thread is still 
alive");
-}
+public void afterEach() {
+assertDoesNotThrow(() -> consumerTask.close(), "Close method threw 
exception");
+assertDoesNotThrow(() -> consumerTask.closeConsumer(), "CloseConsumer 
method threw exception");
+assertTrue(consumer.closed());
 }
 
 /**
  * Tests that the consumer task shuts down gracefully when there were no 
assignments.
  */
 @Test
-public void testCloseOnNoAssignment() throws InterruptedException {
-thread.start();
-Thread.sleep(10);
+public void testCloseOnNoAssignment() {
 assertDoesNotThrow(() -> consumerTask.close(), "Close method threw 
exception");
+assertDoesNotThrow(() -> consumerTask.closeConsumer(), "CloseConsumer 
method threw exception");
 }
 
 @Test
 public void testIdempotentClose() {
-thread.start();
+// Go through the closure process
 consumerTask.close();
+consumerTask.closeConsumer();
+
+// Go through the closure process again
 consumerTask.close();
+consumerTask.closeConsumer();

Review Comment:
   nit: closeConsumer is normally only called once during thread exit, so it's 
probably not a good idea to call it again here.



##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -136,46 +129,46 @@ public void testUserTopicIdPartitionEquals() {
 }
 
 @Test
-public void testAddAssignmentsForPartitions() throws InterruptedException {
+public void testAddAssignmentsForPartitions() {
 final List idPartitions = getIdPartitions("sample", 
3);
 final Map endOffsets = idPartitions.stream()
 .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
 .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
 consumer.updateEndOffsets(endOffsets);
 consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
-thread.start();
+consumerTask.ingestRecords();
 for (final TopicIdPartition idPartition : idPartitions) {
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");

Review Comment:
   Can you find all of the `waitForCondition(() -> 
consumerTask.isUserPartitionAssigned` assertions that were removed, and re-add 
them as assertTrue calls?



##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -308,63 +294,62 @@ public void testCanReprocessSkippedRecords() throws 
InterruptedException {
 // explicitly re-adding the records since MockConsumer drops them on 
poll.
 addRecord(consumer, metadataPartition, tpId1, 0);
 addRecord(consumer, metadataPartition, tpId0, 1);
+consumerTask.ingestRecords();
 // Waiting for all metadata records to be re-read from the first 
metadata partition number
-TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+assertEquals(Optional.of(1L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
 // Verifying that all the metadata records from the first metadata 
partition were processed properly.
-TestUtils.waitForCondition(() -> handler.metadataCounter == 2, 
"Couldn't read record");
+assertEquals(2, handler.metadataCounter);
 }
 
 @Test
-public void testMaybeMarkUserPartitionsAsReady() throws 
InterruptedException {
+public void testMaybeMarkUserPartitionsAsReady() {
 final TopicIdPartition tpId = 

Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-13 Thread via GitHub


xiaoqingwanga commented on PR #16303:
URL: https://github.com/apache/kafka/pull/16303#issuecomment-2167234187

   @gharris1727 Hey, I have made all the changes. Please feel free to review 
the code again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-13 Thread via GitHub


xiaoqingwanga commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1639075718


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -115,40 +118,54 @@ public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataE
 this.pollTimeoutMs = pollTimeoutMs;
 this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs;
 this.time = Objects.requireNonNull(time);
+this.isInternalConsumerClosed = new AtomicBoolean(false);
 this.uninitializedAt = time.milliseconds();
 }
 
 @Override
 public void run() {
 log.info("Starting consumer task thread.");
 while (!isClosed) {
-try {
-if (hasAssignmentChanged) {
-maybeWaitForPartitionAssignments();
-}
+ingestRecords();
+}
+closeConsumer();
+log.info("Exited from consumer task thread");
+}
 
-log.trace("Polling consumer to receive remote log metadata 
topic records");
-final ConsumerRecords consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
-for (ConsumerRecord record : consumerRecords) {
-processConsumerRecord(record);
-}
-maybeMarkUserPartitionsAsReady();
-} catch (final WakeupException ex) {
-// ignore logging the error
-isClosed = true;
-} catch (final RetriableException ex) {
-log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
-} catch (final Exception ex) {
-isClosed = true;
-log.error("Error occurred while processing the records", ex);
+// public for testing
+public void ingestRecords() {
+try {
+if (hasAssignmentChanged) {
+maybeWaitForPartitionAssignments();
 }
+
+log.trace("Polling consumer to receive remote log metadata topic 
records");
+final ConsumerRecords consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
+for (ConsumerRecord record : consumerRecords) {
+processConsumerRecord(record);
+}
+maybeMarkUserPartitionsAsReady();
+} catch (final WakeupException ex) {
+// ignore logging the error
+isClosed = true;
+closeConsumer();
+} catch (final RetriableException ex) {
+log.warn("Retriable error occurred while processing the records. 
Retrying...", ex);
+} catch (final Exception ex) {
+isClosed = true;
+log.error("Error occurred while processing the records", ex);
+closeConsumer();
 }
-try {
-consumer.close(Duration.ofSeconds(30));
-} catch (final Exception e) {
-log.error("Error encountered while closing the consumer", e);
+}
+
+private void closeConsumer() {
+if (isInternalConsumerClosed.compareAndSet(false, true)) {

Review Comment:
   The ConsumerTask is an important component, and making fewer changes is 
safer. If ingestRecords does not have the ability to close the consumer, it may 
seem a bit incomplete, but after all, it's an internal method. 
   I think keeping it simple is indeed better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-13 Thread via GitHub


xiaoqingwanga commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1639071233


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
 }
 
 @Test
-public void testAddAssignmentsForPartitions() throws InterruptedException {
+public void testAddAssignmentsForPartitions() {
 final List idPartitions = getIdPartitions("sample", 
3);
 final Map endOffsets = idPartitions.stream()
 .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
 .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
 consumer.updateEndOffsets(endOffsets);
 consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
-thread.start();
+consumerTask.ingestRecords();
 for (final TopicIdPartition idPartition : idPartitions) {
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");
 
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
 assertTrue(handler.isPartitionLoaded.get(idPartition));
 }
 }
 
 @Test
-public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+public void testRemoveAssignmentsForPartitions() {
 final List allPartitions = getIdPartitions("sample", 
3);
 final Map endOffsets = allPartitions.stream()
 .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
 .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
 consumer.updateEndOffsets(endOffsets);
 consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
-thread.start();
+consumerTask.ingestRecords();
 
 final TopicIdPartition tpId = allPartitions.get(0);
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " 
to be assigned");
 addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
-TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
-"Couldn't read record");
+consumerTask.ingestRecords();
+assertTrue(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),

Review Comment:
   I see, there was a bit of misunderstanding before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-13 Thread via GitHub


gharris1727 commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1638488900


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
 }
 
 @Test
-public void testAddAssignmentsForPartitions() throws InterruptedException {
+public void testAddAssignmentsForPartitions() {
 final List idPartitions = getIdPartitions("sample", 
3);
 final Map endOffsets = idPartitions.stream()
 .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
 .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
 consumer.updateEndOffsets(endOffsets);
 consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
-thread.start();
+consumerTask.ingestRecords();
 for (final TopicIdPartition idPartition : idPartitions) {
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");
 
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
 assertTrue(handler.isPartitionLoaded.get(idPartition));
 }
 }
 
 @Test
-public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+public void testRemoveAssignmentsForPartitions() {
 final List allPartitions = getIdPartitions("sample", 
3);
 final Map endOffsets = allPartitions.stream()
 .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
 .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
 consumer.updateEndOffsets(endOffsets);
 consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
-thread.start();
+consumerTask.ingestRecords();
 
 final TopicIdPartition tpId = allPartitions.get(0);
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " 
to be assigned");
 addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
-TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
-"Couldn't read record");
+consumerTask.ingestRecords();
+assertTrue(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),

Review Comment:
   Hey this wasn't addressed. Can you remove the () -> where it it isn't 
necessary?



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -115,40 +118,54 @@ public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataE
 this.pollTimeoutMs = pollTimeoutMs;
 this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs;
 this.time = Objects.requireNonNull(time);
+this.isInternalConsumerClosed = new AtomicBoolean(false);
 this.uninitializedAt = time.milliseconds();
 }
 
 @Override
 public void run() {
 log.info("Starting consumer task thread.");
 while (!isClosed) {
-try {
-if (hasAssignmentChanged) {
-maybeWaitForPartitionAssignments();
-}
+ingestRecords();
+}
+closeConsumer();
+log.info("Exited from consumer task thread");
+}
 
-log.trace("Polling consumer to receive remote log metadata 
topic records");
-final ConsumerRecords consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
-for (ConsumerRecord record : consumerRecords) {
-processConsumerRecord(record);
-}
-maybeMarkUserPartitionsAsReady();
-} catch (final WakeupException ex) {
-// ignore logging the error
-isClosed = true;
-} catch (final RetriableException ex) {
-log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
-} catch (final Exception ex) {
-isClosed = true;
-log.error("Error occurred while processing the records", ex);
+// public for testing
+public void ingestRecords() {
+try {
+if (hasAssignmentChanged) {
+maybeWaitForPartitionAssignments();
 }
+
+log.trace("Polling consumer to receive remote log metadata topic 
records");
+final ConsumerRecords consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
+for (ConsumerRecord record : consumerRecords) {
+processConsumerRecord(record);
+}
+

Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-12 Thread via GitHub


xiaoqingwanga commented on PR #16303:
URL: https://github.com/apache/kafka/pull/16303#issuecomment-2164374450

   @gharris1727 Thank you very much for your suggestion! I have made another 
commit, resolving the issue mentioned above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-12 Thread via GitHub


xiaoqingwanga commented on PR #16303:
URL: https://github.com/apache/kafka/pull/16303#issuecomment-2164138080

   @gharris1727 Thanks for the tip! I'll look into it and tweak the code a bit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-12 Thread via GitHub


gharris1727 commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1636921387


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
 }
 
 @Test
-public void testAddAssignmentsForPartitions() throws InterruptedException {
+public void testAddAssignmentsForPartitions() {
 final List idPartitions = getIdPartitions("sample", 
3);
 final Map endOffsets = idPartitions.stream()
 .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
 .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
 consumer.updateEndOffsets(endOffsets);
 consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
-thread.start();
+consumerTask.ingestRecords();
 for (final TopicIdPartition idPartition : idPartitions) {
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");
 
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
 assertTrue(handler.isPartitionLoaded.get(idPartition));
 }
 }
 
 @Test
-public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+public void testRemoveAssignmentsForPartitions() {
 final List allPartitions = getIdPartitions("sample", 
3);
 final Map endOffsets = allPartitions.stream()
 .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
 .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
 consumer.updateEndOffsets(endOffsets);
 consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
-thread.start();
+consumerTask.ingestRecords();
 
 final TopicIdPartition tpId = allPartitions.get(0);
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " 
to be assigned");
 addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
-TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
-"Couldn't read record");
+consumerTask.ingestRecords();
+assertTrue(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
+"Couldn't read record");
 
 final Set removePartitions = 
Collections.singleton(tpId);
 consumerTask.removeAssignmentsForPartitions(removePartitions);
+consumerTask.ingestRecords();
 for (final TopicIdPartition idPartition : allPartitions) {
-final TestCondition condition = () -> 
removePartitions.contains(idPartition) == 
!consumerTask.isUserPartitionAssigned(idPartition);
-TestUtils.waitForCondition(condition, "Timed out waiting for " + 
idPartition + " to be removed");
+final BooleanSupplier condition = () -> 
removePartitions.contains(idPartition) == 
!consumerTask.isUserPartitionAssigned(idPartition);
+assertTrue(condition, "Partition " + idPartition + " has not been 
removed");

Review Comment:
   This can be an assertEquals now.
   ```suggestion
   assertEquals(!removePartitions.contains(idPartition), 
consumerTask.isUserPartitionAssigned(idPartition), "Partition " + idPartition + 
" has not been removed");
   
   ```



##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
 }
 
 @Test
-public void testAddAssignmentsForPartitions() throws InterruptedException {
+public void testAddAssignmentsForPartitions() {
 final List idPartitions = getIdPartitions("sample", 
3);
 final Map endOffsets = idPartitions.stream()
 .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
 .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
 consumer.updateEndOffsets(endOffsets);
 consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
-thread.start();
+consumerTask.ingestRecords();
 for (final TopicIdPartition idPartition : idPartitions) {
-TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");
 
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
 assertTrue(handler.isPartitionLoaded.get(idPartition));
 }
 }
 
 @Test
-public void testRemoveAssignmentsForPartitions() throws 

[PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-12 Thread via GitHub


xiaoqingwanga opened a new pull request, #16303:
URL: https://github.com/apache/kafka/pull/16303

   This PR refactors the ConsumerTaskTest. It extracts the single loop 
iteration within the run method of ConsumerTask into a separate method called 
ingestRecords. The refactored ConsumerTaskTest no longer calls ConsumerTask 
asynchronously through a background thread, but instead directly invokes the 
ingestRecords method at different times.
   
   All the cases in ConsumerTaskTest have passed the tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org