This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 1442862bbd7 KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation (#15383) 1442862bbd7 is described below commit 1442862bbd7195b4dde76f9076cf94fcd500d3b9 Author: Lucas Brutschy <lbruts...@confluent.io> AuthorDate: Mon Feb 19 15:33:37 2024 +0100 KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation (#15383) The wake-up mechanism in the new consumer is preventing from committing within a rebalance listener callback. The reason is that we are trying to register two wake-uppable actions at the same time. The fix is to register the wake-uppable action more closely to where we are in fact blocking on it, so that the action is not registered when we execute rebalance listeneners and callback listeners. Reviewers: Bruno Cadonna <cado...@apache.org> --- .../consumer/internals/AsyncKafkaConsumer.java | 4 +-- .../consumer/internals/AsyncKafkaConsumerTest.java | 34 +++++++++++++++++++++- .../kafka/api/PlaintextConsumerTest.scala | 4 +-- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 31481079cc9..28d26a83be6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -692,7 +692,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { acquireAndEnsureOpen(); try { - wakeupTrigger.setFetchAction(fetchBuffer); kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs()); if (subscriptions.hasNoSubscriptionOrUserAssignment()) { @@ -724,7 +723,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { return ConsumerRecords.empty(); } finally { kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs()); - wakeupTrigger.clearTask(); release(); } } @@ -1511,6 +1509,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { log.trace("Polling for fetches with timeout {}", pollTimeout); Timer pollTimer = time.timer(pollTimeout); + wakeupTrigger.setFetchAction(fetchBuffer); // Wait a bit for some fetched data to arrive, as there may not be anything immediately available. Note the // use of a shorter, dedicated "pollTimer" here which updates "timer" so that calling method (poll) will @@ -1521,6 +1520,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { log.trace("Timeout during fetch", e); } finally { timer.update(pollTimer.currentTimeMs()); + wakeupTrigger.clearTask(); } return collectFetch(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 691467d29fb..57546951982 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -81,6 +81,7 @@ import org.mockito.Mockito; import java.time.Duration; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -90,6 +91,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -107,6 +109,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListe import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST; import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED; +import static org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -384,7 +387,7 @@ public class AsyncKafkaConsumerTest { doAnswer(invocation -> { consumer.wakeup(); return Fetch.empty(); - }).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + }).doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); @@ -419,6 +422,35 @@ public class AsyncKafkaConsumerTest { assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO)); } + @Test + public void testCommitInRebalanceCallback() { + consumer = newConsumer(); + final String topicName = "foo"; + final int partition = 3; + final TopicPartition tp = new TopicPartition(topicName, partition); + doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); + SortedSet<TopicPartition> sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + sortedPartitions.add(tp); + CompletableBackgroundEvent<Void> e = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, sortedPartitions); + backgroundEventQueue.add(e); + completeCommitApplicationEventSuccessfully(); + + ConsumerRebalanceListener listener = new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(final Collection<TopicPartition> partitions) { + assertDoesNotThrow(() -> consumer.commitSync(mkMap(mkEntry(tp, new OffsetAndMetadata(0))))); + } + + @Override + public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { + // no-op + } + }; + + consumer.subscribe(Collections.singletonList(topicName), listener); + consumer.poll(Duration.ZERO); + } + @Test public void testClearWakeupTriggerAfterPoll() { consumer = newConsumer(); diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index fbbd7c42a9c..553188900cd 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -197,10 +197,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(1, listener.callsToRevoked) } - // TODO: Enable this test for both protocols when the Jira tracking its failure (KAFKA-16009) is fixed. This - // is done by setting the @MethodSource value to "getTestQuorumAndGroupProtocolParametersAll" @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testMaxPollIntervalMsDelayInRevocation(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000.toString) this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)