This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 50a99b291f NIFI-12851 - ConsumeKafka, remove limitation on count of subscribed topics 50a99b291f is described below commit 50a99b291f35ecd39bdb81697bce4dbeea37f854 Author: Paul Grey <gre...@yahoo.com> AuthorDate: Wed Feb 28 16:39:10 2024 -0500 NIFI-12851 - ConsumeKafka, remove limitation on count of subscribed topics Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #8460. --- .../kafka/pubsub/ConsumeKafkaRecord_2_6.java | 2 +- .../processors/kafka/pubsub/ConsumeKafka_2_6.java | 2 +- .../kafka/pubsub/TestConsumeKafka_2_6.java | 38 ++++++++++++++++++++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java index bdf42b8c02..a7b5bc746b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java @@ -467,7 +467,7 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements KafkaCl } if (topicType.equals(TOPIC_NAME.getValue())) { - for (final String topic : topicListing.split(",", 100)) { + for (final String topic : topicListing.split(",")) { final String trimmedName = topic.trim(); if (!trimmedName.isEmpty()) { topics.add(trimmedName); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java index 019f7daa55..9c4b8e8235 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java @@ -413,7 +413,7 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo } if (topicType.equals(TOPIC_NAME.getValue())) { - for (final String topic : topicListing.split(",", 100)) { + for (final String topic : topicListing.split(",")) { final String trimmedName = topic.trim(); if (!trimmedName.isEmpty()) { topics.add(trimmedName); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java index 31d2d10aaa..14028f623f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java @@ -23,12 +23,20 @@ import org.apache.nifi.kafka.shared.property.SecurityProtocol; import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.kerberos.SelfContainedKerberosUserService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -45,6 +53,36 @@ public class TestConsumeKafka_2_6 { mockConsumerPool = mock(ConsumerPool.class); } + @Test + public void validateNoLimitToTopicCount() { + final int expectedCount = 101; + final String topics = String.join(",", Collections.nCopies(expectedCount, "foo")); + final ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6() { + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + final ConsumerPool consumerPool = super.createConsumerPool(context, log); + try { + final Field topicsField = ConsumerPool.class.getDeclaredField("topics"); + topicsField.setAccessible(true); + final Object o = topicsField.get(consumerPool); + final List<?> list = assertInstanceOf(List.class, o); + assertEquals(expectedCount, list.size()); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + return consumerPool; + } + }; + + TestRunner runner = TestRunners.newTestRunner(consumeKafka); + runner.setValidateExpressionUsage(false); + runner.setProperty(ConsumeKafka_2_6.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(ConsumeKafka_2_6.TOPICS, topics); + runner.setProperty(ConsumeKafka_2_6.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafka_2_6.AUTO_OFFSET_RESET, ConsumeKafka_2_6.OFFSET_EARLIEST); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + runner.run(); + } + @Test public void validateCustomValidatorSettings() { ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6();