This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push: new 491a079cfa9 KAFKA-16786: Remove old assignment strategy usage in new consumer (#16214) 491a079cfa9 is described below commit 491a079cfa9d4fd1a3f8f49c4c3bd37568a0b29a Author: Lianet Magrans <98415067+lian...@users.noreply.github.com> AuthorDate: Thu Jun 6 09:45:36 2024 +0200 KAFKA-16786: Remove old assignment strategy usage in new consumer (#16214) Remove usage of the partition.assignment.strategy config in the new consumer. This config is deprecated with the new consumer protocol, so the AsyncKafkaConsumer should not use or validate the property. Reviewers: Lucas Brutschy <lbruts...@confluent.io> --- .../consumer/internals/AsyncKafkaConsumer.java | 21 +-------------------- .../internals/ConsumerDelegateCreator.java | 3 +-- .../kafka/clients/consumer/KafkaConsumerTest.java | 4 ++-- .../consumer/internals/AsyncKafkaConsumerTest.java | 22 ++++++++++++---------- 4 files changed, 16 insertions(+), 34 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 6930cd02955..248073e8b48 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -26,7 +26,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerInterceptor; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.GroupProtocol; @@ -240,7 +239,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { private final int defaultApiTimeoutMs; private final boolean autoCommitEnabled; private volatile boolean closed = false; - private final List<ConsumerPartitionAssignor> assignors; private final Optional<ClientTelemetryReporter> clientTelemetryReporter; // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates @@ -373,10 +371,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { rebalanceListenerInvoker ); this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext); - this.assignors = ConsumerPartitionAssignor.getAssignorInstances( - config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), - config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)) - ); // The FetchCollector is only used on the application thread. this.fetchCollector = fetchCollectorFactory.build(logContext, @@ -424,7 +418,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { ConsumerMetadata metadata, long retryBackoffMs, int defaultApiTimeoutMs, - List<ConsumerPartitionAssignor> assignors, String groupId, boolean autoCommitEnabled) { this.log = logContext.logger(getClass()); @@ -445,7 +438,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { this.defaultApiTimeoutMs = defaultApiTimeoutMs; this.deserializers = deserializers; this.applicationEventHandler = applicationEventHandler; - this.assignors = assignors; this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); this.clientTelemetryReporter = Optional.empty(); this.autoCommitEnabled = autoCommitEnabled; @@ -460,8 +452,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { Deserializer<V> valueDeserializer, KafkaClient client, SubscriptionState subscriptions, - ConsumerMetadata metadata, - List<ConsumerPartitionAssignor> assignors) { + ConsumerMetadata metadata) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); @@ -475,7 +466,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer); - this.assignors = assignors; this.clientTelemetryReporter = Optional.empty(); ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX); @@ -1687,12 +1677,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { } } - private void throwIfNoAssignorsConfigured() { - if (assignors.isEmpty()) - throw new IllegalStateException("Must configure at least one partition assigner class name to " + - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property"); - } - private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) { if (offsetAndMetadata != null) offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); @@ -1780,7 +1764,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { if (pattern == null || pattern.toString().isEmpty()) throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? "null" : "empty")); - throwIfNoAssignorsConfigured(); log.info("Subscribed to pattern: '{}'", pattern); subscriptions.subscribe(pattern, listener); metadata.requestUpdateForNewTopics(); @@ -1805,8 +1788,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic"); } - throwIfNoAssignorsConfigured(); - // Clear the buffered data which are not a part of newly assigned topics final Set<TopicPartition> currentTopicPartitions = new HashSet<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java index bd95e06c864..81c45aba69c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java @@ -91,8 +91,7 @@ public class ConsumerDelegateCreator { valueDeserializer, client, subscriptions, - metadata, - assignors + metadata ); else return new LegacyKafkaConsumer<>( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 850ac2bd8f9..d34d09cd8a9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -495,7 +495,7 @@ public class KafkaConsumerTest { } @ParameterizedTest - @EnumSource(GroupProtocol.class) + @EnumSource(value = GroupProtocol.class, names = "CLASSIC") public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol groupProtocol) { Properties props = new Properties(); props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); @@ -3227,7 +3227,7 @@ public void testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro } @ParameterizedTest - @EnumSource(GroupProtocol.class) + @EnumSource(value = GroupProtocol.class, names = "CLASSIC") public void testAssignorNameConflict(GroupProtocol groupProtocol) { Map<String, Object> configs = new HashMap<>(); configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 66ee724a0e5..c32d2e5e5c0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.Metadata.LeaderAndEpoch; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -30,7 +29,6 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; -import org.apache.kafka.clients.consumer.RoundRobinAssignor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; @@ -205,7 +203,6 @@ public class AsyncKafkaConsumerTest { ConsumerInterceptors<String, String> interceptors, ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, SubscriptionState subscriptions, - List<ConsumerPartitionAssignor> assignors, String groupId, String clientId) { long retryBackoffMs = 100L; @@ -228,7 +225,6 @@ public class AsyncKafkaConsumerTest { metadata, retryBackoffMs, defaultApiTimeoutMs, - assignors, groupId, autoCommitEnabled); } @@ -564,7 +560,6 @@ public class AsyncKafkaConsumerTest { new ConsumerInterceptors<>(Collections.emptyList()), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, - singletonList(new RoundRobinAssignor()), "group-id", "client-id"); completeCommitSyncApplicationEventSuccessfully(); @@ -784,7 +779,6 @@ public class AsyncKafkaConsumerTest { mock(ConsumerInterceptors.class), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, - singletonList(new RoundRobinAssignor()), "group-id", "client-id"); @@ -806,7 +800,6 @@ public class AsyncKafkaConsumerTest { new ConsumerInterceptors<>(Collections.emptyList()), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, - singletonList(new RoundRobinAssignor()), "group-id", "client-id"); subscriptions.subscribe(singleton("topic"), Optional.of(listener)); @@ -844,7 +837,6 @@ public class AsyncKafkaConsumerTest { mock(ConsumerInterceptors.class), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, - singletonList(new RoundRobinAssignor()), "group-id", "client-id"); consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); @@ -862,7 +854,6 @@ public class AsyncKafkaConsumerTest { mock(ConsumerInterceptors.class), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, - singletonList(new RoundRobinAssignor()), "group-id", "client-id"); consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); @@ -1624,6 +1615,18 @@ public class AsyncKafkaConsumerTest { assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); } + @Test + public void testPartitionAssignmentStrategyUnusedInAsyncConsumer() { + final Properties props = requiredConsumerConfig(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1"); + props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "CooperativeStickyAssignor"); + final ConsumerConfig config = new ConsumerConfig(props); + consumer = newConsumer(config); + + assertTrue(config.unused().contains(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)); + } + @Test public void testGroupIdNull() { final Properties props = requiredConsumerConfig(); @@ -1666,7 +1669,6 @@ public class AsyncKafkaConsumerTest { new ConsumerInterceptors<>(Collections.emptyList()), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, - singletonList(new RoundRobinAssignor()), "group-id", "client-id"); final TopicPartition tp = new TopicPartition("topic", 0);