This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 0b2829bfc20 KAFKA-16493: Avoid unneeded subscription regex check if 
metadata version unchanged (#15869)
0b2829bfc20 is described below

commit 0b2829bfc20b1dae34f1f07947b164658051b3ae
Author: Phuc-Hong-Tran <44060007+phuc-hong-t...@users.noreply.github.com>
AuthorDate: Sat Jun 8 03:30:07 2024 +1000

    KAFKA-16493: Avoid unneeded subscription regex check if metadata version 
unchanged (#15869)
    
    This PR includes changes for AsyncKafkaConsumer to avoid evaluating the 
subscription regex on every poll if metadata hasn't changed. The 
metadataVersionSnapshot was introduced to identify whether metadata has changed 
or not, if yes then the current subscription regex will be evaluated.
    
    This is the same mechanism used by the LegacyConsumer.
    
    Reviewers: Lianet Magrans <liane...@gmail.com>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 26 ++++++++------
 .../consumer/internals/AsyncKafkaConsumerTest.java | 41 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 10 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 248073e8b48..94af40c27aa 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -234,6 +234,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     private final SubscriptionState subscriptions;
     private final ConsumerMetadata metadata;
+    private int metadataVersionSnapshot;
     private final Metrics metrics;
     private final long retryBackoffMs;
     private final int defaultApiTimeoutMs;
@@ -311,6 +312,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             this.metadata = metadataFactory.build(config, subscriptions, 
logContext, clusterResourceListeners);
             final List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config);
             metadata.bootstrap(addresses);
+            this.metadataVersionSnapshot = metadata.updateVersion();
 
             FetchMetricsManager fetchMetricsManager = 
createFetchMetricsManager(metrics);
             FetchConfig fetchConfig = new FetchConfig(config);
@@ -434,6 +436,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.metrics = metrics;
         this.groupMetadata.set(initializeGroupMetadata(groupId, 
Optional.empty()));
         this.metadata = metadata;
+        this.metadataVersionSnapshot = metadata.updateVersion();
         this.retryBackoffMs = retryBackoffMs;
         this.defaultApiTimeoutMs = defaultApiTimeoutMs;
         this.deserializers = deserializers;
@@ -463,6 +466,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.time = time;
         this.metrics = new Metrics(time);
         this.metadata = metadata;
+        this.metadataVersionSnapshot = metadata.updateVersion();
         this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
         this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer);
@@ -1468,12 +1472,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     /**
-     * TODO: remove this when we implement the KIP-848 protocol.
-     *
      * <p>
-     * The contents of this method are shamelessly stolen from
-     * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
-     * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+     *
+     * This function evaluates the regex that the consumer subscribed to
+     * against the list of topic names from metadata, and updates
+     * the list of topics in subscription state accordingly
      *
      * @param cluster Cluster from which we get the topics
      */
@@ -1483,7 +1486,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 .collect(Collectors.toSet());
         if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
             applicationEventHandler.add(new SubscriptionChangeEvent());
-            metadata.requestUpdateForNewTopics();
+            this.metadataVersionSnapshot = 
metadata.requestUpdateForNewTopics();
         }
     }
 
@@ -1799,7 +1802,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 fetchBuffer.retainAll(currentTopicPartitions);
                 log.info("Subscribed to topic(s): {}", String.join(", ", 
topics));
                 if (subscriptions.subscribe(new HashSet<>(topics), listener))
-                    metadata.requestUpdateForNewTopics();
+                    this.metadataVersionSnapshot = 
metadata.requestUpdateForNewTopics();
 
                 // Trigger subscribe event to effectively join the group if 
not already part of it,
                 // or just send the new subscription to the broker.
@@ -1978,9 +1981,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     private void maybeUpdateSubscriptionMetadata() {
-        if (subscriptions.hasPatternSubscription()) {
-            updatePatternSubscription(metadata.fetch());
+        if (this.metadataVersionSnapshot < metadata.updateVersion()) {
+            this.metadataVersionSnapshot = metadata.updateVersion();
+            if (subscriptions.hasPatternSubscription()) {
+                updatePatternSubscription(metadata.fetch());
+            }
         }
     }
-
 }
+
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index c32d2e5e5c0..69e51961f65 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -54,6 +54,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -103,6 +104,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
@@ -139,6 +141,7 @@ import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.clearInvocations;
 
 @SuppressWarnings("unchecked")
 public class AsyncKafkaConsumerTest {
@@ -476,6 +479,44 @@ public class AsyncKafkaConsumerTest {
         assertTrue(callbackExecuted.get());
     }
 
+    @Test
+    public void testSubscriptionRegexEvalOnPollOnlyIfMetadataChanges() {
+        SubscriptionState subscriptions = mock(SubscriptionState.class);
+        Cluster cluster = mock(Cluster.class);
+
+        consumer = newConsumer(
+                mock(FetchBuffer.class),
+                mock(ConsumerInterceptors.class),
+                mock(ConsumerRebalanceListenerInvoker.class),
+                subscriptions,
+                "group-id",
+                "client-id");
+
+        final String topicName = "foo";
+        final int partition = 3;
+        final TopicPartition tp = new TopicPartition(topicName, partition);
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(1));
+        completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        doReturn(cluster).when(metadata).fetch();
+        doReturn(Collections.singleton(topicName)).when(cluster).topics();
+
+        consumer.subscribe(Pattern.compile("f*"));
+        verify(metadata).requestUpdateForNewTopics();
+        verify(subscriptions).matchesSubscribedPattern(topicName);
+        clearInvocations(subscriptions);
+
+        when(subscriptions.hasPatternSubscription()).thenReturn(true);
+        consumer.poll(Duration.ZERO);
+        verify(subscriptions, never()).matchesSubscribedPattern(topicName);
+
+        when(metadata.updateVersion()).thenReturn(2);
+        when(subscriptions.hasPatternSubscription()).thenReturn(true);
+        consumer.poll(Duration.ZERO);
+        verify(subscriptions).matchesSubscribedPattern(topicName);
+    }
+
     @Test
     public void testClearWakeupTriggerAfterPoll() {
         consumer = newConsumer();

Reply via email to