This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b74b182841a KAFKA-16786: Remove old assignment strategy usage in new 
consumer (#16214)
b74b182841a is described below

commit b74b182841a340b7b92d852142201cd5c1ee1b85
Author: Lianet Magrans <98415067+lian...@users.noreply.github.com>
AuthorDate: Thu Jun 6 09:45:36 2024 +0200

    KAFKA-16786: Remove old assignment strategy usage in new consumer (#16214)
    
    Remove usage of the partition.assignment.strategy config in the new 
consumer. This config is deprecated with the new consumer protocol, so the 
AsyncKafkaConsumer should not use or validate the property.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 21 +--------------------
 .../internals/ConsumerDelegateCreator.java         |  3 +--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  4 ++--
 .../consumer/internals/AsyncKafkaConsumerTest.java | 22 ++++++++++++----------
 4 files changed, 16 insertions(+), 34 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 6930cd02955..248073e8b48 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -26,7 +26,6 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerInterceptor;
-import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.GroupProtocol;
@@ -240,7 +239,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private final int defaultApiTimeoutMs;
     private final boolean autoCommitEnabled;
     private volatile boolean closed = false;
-    private final List<ConsumerPartitionAssignor> assignors;
     private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
 
     // to keep from repeatedly scanning subscriptions in poll(), cache the 
result during metadata updates
@@ -373,10 +371,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     rebalanceListenerInvoker
             );
             this.backgroundEventReaper = 
backgroundEventReaperFactory.build(logContext);
-            this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
-                    
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
-                    
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId))
-            );
 
             // The FetchCollector is only used on the application thread.
             this.fetchCollector = fetchCollectorFactory.build(logContext,
@@ -424,7 +418,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                        ConsumerMetadata metadata,
                        long retryBackoffMs,
                        int defaultApiTimeoutMs,
-                       List<ConsumerPartitionAssignor> assignors,
                        String groupId,
                        boolean autoCommitEnabled) {
         this.log = logContext.logger(getClass());
@@ -445,7 +438,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.defaultApiTimeoutMs = defaultApiTimeoutMs;
         this.deserializers = deserializers;
         this.applicationEventHandler = applicationEventHandler;
-        this.assignors = assignors;
         this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, 
"consumer");
         this.clientTelemetryReporter = Optional.empty();
         this.autoCommitEnabled = autoCommitEnabled;
@@ -460,8 +452,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                        Deserializer<V> valueDeserializer,
                        KafkaClient client,
                        SubscriptionState subscriptions,
-                       ConsumerMetadata metadata,
-                       List<ConsumerPartitionAssignor> assignors) {
+                       ConsumerMetadata metadata) {
         this.log = logContext.logger(getClass());
         this.subscriptions = subscriptions;
         this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
@@ -475,7 +466,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
         this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer);
-        this.assignors = assignors;
         this.clientTelemetryReporter = Optional.empty();
 
         ConsumerMetrics metricsRegistry = new 
ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
@@ -1687,12 +1677,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
-    private void throwIfNoAssignorsConfigured() {
-        if (assignors.isEmpty())
-            throw new IllegalStateException("Must configure at least one 
partition assigner class name to " +
-                    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " 
configuration property");
-    }
-
     private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, 
OffsetAndMetadata offsetAndMetadata) {
         if (offsetAndMetadata != null)
             offsetAndMetadata.leaderEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
@@ -1780,7 +1764,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             if (pattern == null || pattern.toString().isEmpty())
                 throw new IllegalArgumentException("Topic pattern to subscribe 
to cannot be " + (pattern == null ?
                     "null" : "empty"));
-            throwIfNoAssignorsConfigured();
             log.info("Subscribed to pattern: '{}'", pattern);
             subscriptions.subscribe(pattern, listener);
             metadata.requestUpdateForNewTopics();
@@ -1805,8 +1788,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                         throw new IllegalArgumentException("Topic collection 
to subscribe to cannot contain null or empty topic");
                 }
 
-                throwIfNoAssignorsConfigured();
-
                 // Clear the buffered data which are not a part of newly 
assigned topics
                 final Set<TopicPartition> currentTopicPartitions = new 
HashSet<>();
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
index bd95e06c864..81c45aba69c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
@@ -91,8 +91,7 @@ public class ConsumerDelegateCreator {
                     valueDeserializer,
                     client,
                     subscriptions,
-                    metadata,
-                    assignors
+                    metadata
                 );
             else
                 return new LegacyKafkaConsumer<>(
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 850ac2bd8f9..d34d09cd8a9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -495,7 +495,7 @@ public class KafkaConsumerTest {
     }
 
     @ParameterizedTest
-    @EnumSource(GroupProtocol.class)
+    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol 
groupProtocol) {
         Properties props = new Properties();
         props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name());
@@ -3227,7 +3227,7 @@ public void 
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
     }
 
     @ParameterizedTest
-    @EnumSource(GroupProtocol.class)
+    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testAssignorNameConflict(GroupProtocol groupProtocol) {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 66ee724a0e5..c32d2e5e5c0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.Metadata.LeaderAndEpoch;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
-import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -30,7 +29,6 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
-import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
@@ -205,7 +203,6 @@ public class AsyncKafkaConsumerTest {
         ConsumerInterceptors<String, String> interceptors,
         ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
         SubscriptionState subscriptions,
-        List<ConsumerPartitionAssignor> assignors,
         String groupId,
         String clientId) {
         long retryBackoffMs = 100L;
@@ -228,7 +225,6 @@ public class AsyncKafkaConsumerTest {
             metadata,
             retryBackoffMs,
             defaultApiTimeoutMs,
-            assignors,
             groupId,
             autoCommitEnabled);
     }
@@ -564,7 +560,6 @@ public class AsyncKafkaConsumerTest {
             new ConsumerInterceptors<>(Collections.emptyList()),
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
-            singletonList(new RoundRobinAssignor()),
             "group-id",
             "client-id");
         completeCommitSyncApplicationEventSuccessfully();
@@ -784,7 +779,6 @@ public class AsyncKafkaConsumerTest {
             mock(ConsumerInterceptors.class),
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
-            singletonList(new RoundRobinAssignor()),
             "group-id",
             "client-id");
 
@@ -806,7 +800,6 @@ public class AsyncKafkaConsumerTest {
             new ConsumerInterceptors<>(Collections.emptyList()),
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
-            singletonList(new RoundRobinAssignor()),
             "group-id",
             "client-id");
         subscriptions.subscribe(singleton("topic"), Optional.of(listener));
@@ -844,7 +837,6 @@ public class AsyncKafkaConsumerTest {
             mock(ConsumerInterceptors.class),
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
-            singletonList(new RoundRobinAssignor()),
             "group-id",
             "client-id");
         consumer.subscribe(singleton("topic"), 
mock(ConsumerRebalanceListener.class));
@@ -862,7 +854,6 @@ public class AsyncKafkaConsumerTest {
             mock(ConsumerInterceptors.class),
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
-            singletonList(new RoundRobinAssignor()),
             "group-id",
             "client-id");
         consumer.subscribe(singleton("topic"), 
mock(ConsumerRebalanceListener.class));
@@ -1624,6 +1615,18 @@ public class AsyncKafkaConsumerTest {
         
assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
     }
 
+    @Test
+    public void testPartitionAssignmentStrategyUnusedInAsyncConsumer() {
+        final Properties props = requiredConsumerConfig();
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1");
+        props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
+        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
"CooperativeStickyAssignor");
+        final ConsumerConfig config = new ConsumerConfig(props);
+        consumer = newConsumer(config);
+
+        
assertTrue(config.unused().contains(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
+    }
+
     @Test
     public void testGroupIdNull() {
         final Properties props = requiredConsumerConfig();
@@ -1666,7 +1669,6 @@ public class AsyncKafkaConsumerTest {
                 new ConsumerInterceptors<>(Collections.emptyList()),
                 mock(ConsumerRebalanceListenerInvoker.class),
                 subscriptions,
-                singletonList(new RoundRobinAssignor()),
                 "group-id",
                 "client-id");
         final TopicPartition tp = new TopicPartition("topic", 0);

Reply via email to