This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1442862bbd7 KAFKA-16009: Fix PlaintextConsumerTest. 
testMaxPollIntervalMsDelayInRevocation (#15383)
1442862bbd7 is described below

commit 1442862bbd7195b4dde76f9076cf94fcd500d3b9
Author: Lucas Brutschy <lbruts...@confluent.io>
AuthorDate: Mon Feb 19 15:33:37 2024 +0100

    KAFKA-16009: Fix PlaintextConsumerTest. 
testMaxPollIntervalMsDelayInRevocation (#15383)
    
    The wake-up mechanism in the new consumer is preventing from committing 
within a rebalance listener callback. The reason is that we are trying to 
register two wake-uppable actions at the same time.
    
    The fix is to register the wake-uppable action more closely to where we are 
in fact blocking on it, so that the action is not registered when we execute 
rebalance listeneners and callback listeners.
    
    Reviewers: Bruno Cadonna <cado...@apache.org>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  4 +--
 .../consumer/internals/AsyncKafkaConsumerTest.java | 34 +++++++++++++++++++++-
 .../kafka/api/PlaintextConsumerTest.scala          |  4 +--
 3 files changed, 36 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 31481079cc9..28d26a83be6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -692,7 +692,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
         acquireAndEnsureOpen();
         try {
-            wakeupTrigger.setFetchAction(fetchBuffer);
             kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
 
             if (subscriptions.hasNoSubscriptionOrUserAssignment()) {
@@ -724,7 +723,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             return ConsumerRecords.empty();
         } finally {
             kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
-            wakeupTrigger.clearTask();
             release();
         }
     }
@@ -1511,6 +1509,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         log.trace("Polling for fetches with timeout {}", pollTimeout);
 
         Timer pollTimer = time.timer(pollTimeout);
+        wakeupTrigger.setFetchAction(fetchBuffer);
 
         // Wait a bit for some fetched data to arrive, as there may not be 
anything immediately available. Note the
         // use of a shorter, dedicated "pollTimer" here which updates "timer" 
so that calling method (poll) will
@@ -1521,6 +1520,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             log.trace("Timeout during fetch", e);
         } finally {
             timer.update(pollTimer.currentTimeMs());
+            wakeupTrigger.clearTask();
         }
 
         return collectFetch();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 691467d29fb..57546951982 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -81,6 +81,7 @@ import org.mockito.Mockito;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -90,6 +91,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
@@ -107,6 +109,7 @@ import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListe
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
+import static 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -384,7 +387,7 @@ public class AsyncKafkaConsumerTest {
         doAnswer(invocation -> {
             consumer.wakeup();
             return Fetch.empty();
-        }).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        }).doAnswer(invocation -> 
Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
         Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
         completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
@@ -419,6 +422,35 @@ public class AsyncKafkaConsumerTest {
         assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ZERO));
     }
 
+    @Test
+    public void testCommitInRebalanceCallback() {
+        consumer = newConsumer();
+        final String topicName = "foo";
+        final int partition = 3;
+        final TopicPartition tp = new TopicPartition(topicName, partition);
+        doAnswer(invocation -> 
Fetch.empty()).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
+        SortedSet<TopicPartition> sortedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+        sortedPartitions.add(tp);
+        CompletableBackgroundEvent<Void> e = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, 
sortedPartitions);
+        backgroundEventQueue.add(e);
+        completeCommitApplicationEventSuccessfully();
+
+        ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(final Collection<TopicPartition> 
partitions) {
+                assertDoesNotThrow(() -> consumer.commitSync(mkMap(mkEntry(tp, 
new OffsetAndMetadata(0)))));
+            }
+
+            @Override
+            public void onPartitionsAssigned(final Collection<TopicPartition> 
partitions) {
+                // no-op
+            }
+        };
+
+        consumer.subscribe(Collections.singletonList(topicName), listener);
+        consumer.poll(Duration.ZERO);
+    }
+
     @Test
     public void testClearWakeupTriggerAfterPoll() {
         consumer = newConsumer();
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index fbbd7c42a9c..553188900cd 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -197,10 +197,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(1, listener.callsToRevoked)
   }
 
-  // TODO: Enable this test for both protocols when the Jira tracking its 
failure (KAFKA-16009) is fixed. This
-  //       is done by setting the @MethodSource value to 
"getTestQuorumAndGroupProtocolParametersAll"
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testMaxPollIntervalMsDelayInRevocation(quorum: String, groupProtocol: 
String): Unit = {
     
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
5000.toString)
     
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)

Reply via email to