This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push: new 0b2829bfc20 KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged (#15869) 0b2829bfc20 is described below commit 0b2829bfc20b1dae34f1f07947b164658051b3ae Author: Phuc-Hong-Tran <44060007+phuc-hong-t...@users.noreply.github.com> AuthorDate: Sat Jun 8 03:30:07 2024 +1000 KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged (#15869) This PR includes changes for AsyncKafkaConsumer to avoid evaluating the subscription regex on every poll if metadata hasn't changed. The metadataVersionSnapshot was introduced to identify whether metadata has changed or not, if yes then the current subscription regex will be evaluated. This is the same mechanism used by the LegacyConsumer. Reviewers: Lianet Magrans <liane...@gmail.com>, Matthias J. Sax <matth...@confluent.io> --- .../consumer/internals/AsyncKafkaConsumer.java | 26 ++++++++------ .../consumer/internals/AsyncKafkaConsumerTest.java | 41 ++++++++++++++++++++++ 2 files changed, 57 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 248073e8b48..94af40c27aa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -234,6 +234,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { private final SubscriptionState subscriptions; private final ConsumerMetadata metadata; + private int metadataVersionSnapshot; private final Metrics metrics; private final long retryBackoffMs; private final int defaultApiTimeoutMs; @@ -311,6 +312,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners); final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config); metadata.bootstrap(addresses); + this.metadataVersionSnapshot = metadata.updateVersion(); FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics); FetchConfig fetchConfig = new FetchConfig(config); @@ -434,6 +436,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { this.metrics = metrics; this.groupMetadata.set(initializeGroupMetadata(groupId, Optional.empty())); this.metadata = metadata; + this.metadataVersionSnapshot = metadata.updateVersion(); this.retryBackoffMs = retryBackoffMs; this.defaultApiTimeoutMs = defaultApiTimeoutMs; this.deserializers = deserializers; @@ -463,6 +466,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { this.time = time; this.metrics = new Metrics(time); this.metadata = metadata; + this.metadataVersionSnapshot = metadata.updateVersion(); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer); @@ -1468,12 +1472,11 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { } /** - * TODO: remove this when we implement the KIP-848 protocol. - * * <p> - * The contents of this method are shamelessly stolen from - * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access - * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * This function evaluates the regex that the consumer subscribed to + * against the list of topic names from metadata, and updates + * the list of topics in subscription state accordingly * * @param cluster Cluster from which we get the topics */ @@ -1483,7 +1486,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { .collect(Collectors.toSet()); if (subscriptions.subscribeFromPattern(topicsToSubscribe)) { applicationEventHandler.add(new SubscriptionChangeEvent()); - metadata.requestUpdateForNewTopics(); + this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics(); } } @@ -1799,7 +1802,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { fetchBuffer.retainAll(currentTopicPartitions); log.info("Subscribed to topic(s): {}", String.join(", ", topics)); if (subscriptions.subscribe(new HashSet<>(topics), listener)) - metadata.requestUpdateForNewTopics(); + this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics(); // Trigger subscribe event to effectively join the group if not already part of it, // or just send the new subscription to the broker. @@ -1978,9 +1981,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { } private void maybeUpdateSubscriptionMetadata() { - if (subscriptions.hasPatternSubscription()) { - updatePatternSubscription(metadata.fetch()); + if (this.metadataVersionSnapshot < metadata.updateVersion()) { + this.metadataVersionSnapshot = metadata.updateVersion(); + if (subscriptions.hasPatternSubscription()) { + updatePatternSubscription(metadata.fetch()); + } } } - } + diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index c32d2e5e5c0..69e51961f65 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -54,6 +54,7 @@ import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.Node; +import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.GroupAuthorizationException; @@ -103,6 +104,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; import java.util.stream.Stream; import static java.util.Arrays.asList; @@ -139,6 +141,7 @@ import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.clearInvocations; @SuppressWarnings("unchecked") public class AsyncKafkaConsumerTest { @@ -476,6 +479,44 @@ public class AsyncKafkaConsumerTest { assertTrue(callbackExecuted.get()); } + @Test + public void testSubscriptionRegexEvalOnPollOnlyIfMetadataChanges() { + SubscriptionState subscriptions = mock(SubscriptionState.class); + Cluster cluster = mock(Cluster.class); + + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, + "group-id", + "client-id"); + + final String topicName = "foo"; + final int partition = 3; + final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(tp, new OffsetAndMetadata(1)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + doReturn(cluster).when(metadata).fetch(); + doReturn(Collections.singleton(topicName)).when(cluster).topics(); + + consumer.subscribe(Pattern.compile("f*")); + verify(metadata).requestUpdateForNewTopics(); + verify(subscriptions).matchesSubscribedPattern(topicName); + clearInvocations(subscriptions); + + when(subscriptions.hasPatternSubscription()).thenReturn(true); + consumer.poll(Duration.ZERO); + verify(subscriptions, never()).matchesSubscribedPattern(topicName); + + when(metadata.updateVersion()).thenReturn(2); + when(subscriptions.hasPatternSubscription()).thenReturn(true); + consumer.poll(Duration.ZERO); + verify(subscriptions).matchesSubscribedPattern(topicName); + } + @Test public void testClearWakeupTriggerAfterPoll() { consumer = newConsumer();