This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b368c281e8 NIFI-12851 - ConsumeKafka, remove limitation on count of 
subscribed topics
b368c281e8 is described below

commit b368c281e8e6c25e040b083db6ebaaf95ec04c9a
Author: Paul Grey <gre...@yahoo.com>
AuthorDate: Wed Feb 28 16:39:10 2024 -0500

    NIFI-12851 - ConsumeKafka, remove limitation on count of subscribed topics
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #8460.
---
 .../kafka/pubsub/ConsumeKafkaRecord_2_6.java       |  2 +-
 .../processors/kafka/pubsub/ConsumeKafka_2_6.java  |  2 +-
 .../kafka/pubsub/TestConsumeKafka_2_6.java         | 38 ++++++++++++++++++++++
 3 files changed, 40 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index 782d43de24..5e263f0599 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -451,7 +451,7 @@ public class ConsumeKafkaRecord_2_6 extends 
AbstractProcessor implements KafkaCl
         }
 
         if (topicType.equals(TOPIC_NAME.getValue())) {
-            for (final String topic : topicListing.split(",", 100)) {
+            for (final String topic : topicListing.split(",")) {
                 final String trimmedName = topic.trim();
                 if (!trimmedName.isEmpty()) {
                     topics.add(trimmedName);
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index 043e81a313..784121e89e 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -397,7 +397,7 @@ public class ConsumeKafka_2_6 extends AbstractProcessor 
implements KafkaClientCo
         }
 
         if (topicType.equals(TOPIC_NAME.getValue())) {
-            for (final String topic : topicListing.split(",", 100)) {
+            for (final String topic : topicListing.split(",")) {
                 final String trimmedName = topic.trim();
                 if (!trimmedName.isEmpty()) {
                     topics.add(trimmedName);
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java
index 872df85cb9..42ab9d82ad 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java
@@ -22,12 +22,20 @@ import org.apache.nifi.kafka.shared.property.SaslMechanism;
 import org.apache.nifi.kafka.shared.property.SecurityProtocol;
 import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -44,6 +52,36 @@ public class TestConsumeKafka_2_6 {
         mockConsumerPool = mock(ConsumerPool.class);
     }
 
+    @Test
+    public void validateNoLimitToTopicCount() {
+        final int expectedCount = 101;
+        final String topics = String.join(",", 
Collections.nCopies(expectedCount, "foo"));
+        final ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6() {
+            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
+                final ConsumerPool consumerPool = 
super.createConsumerPool(context, log);
+                try {
+                    final Field topicsField = 
ConsumerPool.class.getDeclaredField("topics");
+                    topicsField.setAccessible(true);
+                    final Object o = topicsField.get(consumerPool);
+                    final List<?> list = assertInstanceOf(List.class, o);
+                    assertEquals(expectedCount, list.size());
+                } catch (NoSuchFieldException | IllegalAccessException e) {
+                    throw new RuntimeException(e);
+                }
+                return consumerPool;
+            }
+        };
+
+        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(ConsumeKafka_2_6.BOOTSTRAP_SERVERS, 
"localhost:1234");
+        runner.setProperty(ConsumeKafka_2_6.TOPICS, topics);
+        runner.setProperty(ConsumeKafka_2_6.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka_2_6.AUTO_OFFSET_RESET, 
ConsumeKafka_2_6.OFFSET_EARLIEST);
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        runner.run();
+    }
+
     @Test
     public void validateCustomValidatorSettings() {
         ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6();

Reply via email to