This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 783633c  NIFI-8021: Fixed bug in ConsumeKafka_2_6 and 
ConsumeKafkaRecord_2_6 where explicit partition assignment causes issues with 
more than 1 concurrent task. Also fixed bug that prevented more nifi nodes than 
partitions because it didn't properly handle empty string for the list of 
partitions
783633c is described below

commit 783633cac5e02e96086ab2d36c92d8057df93c01
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Wed Jan 6 09:39:17 2021 -0500

    NIFI-8021: Fixed bug in ConsumeKafka_2_6 and ConsumeKafkaRecord_2_6 where 
explicit partition assignment causes issues with more than 1 concurrent task. 
Also fixed bug that prevented more nifi nodes than partitions because it didn't 
properly handle empty string for the list of partitions
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #4744.
---
 .../kafka/pubsub/ConsumeKafkaRecord_2_6.java       |  6 ++-
 .../processors/kafka/pubsub/ConsumeKafka_2_6.java  |  6 ++-
 .../nifi/processors/kafka/pubsub/ConsumerPool.java | 52 +++++++++++++++-------
 3 files changed, 45 insertions(+), 19 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index f6dda5b..b51c644 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -344,8 +344,10 @@ public class ConsumeKafkaRecord_2_6 extends 
AbstractProcessor {
 
         final ConsumerPool consumerPool = createConsumerPool(context, 
getLogger());
 
-        final int numAssignedPartitions = 
ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
-        if (numAssignedPartitions > 0) {
+        final boolean explicitAssignment = 
ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties());
+        if (explicitAssignment) {
+            final int numAssignedPartitions = 
ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+
             // Request from Kafka the number of partitions for the topics that 
we are consuming from. Then ensure that we have
             // all of the partitions assigned.
             final int partitionCount = consumerPool.getPartitionCount();
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index 5e432b3..1f54c70 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -318,8 +318,10 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
 
         final ConsumerPool consumerPool = createConsumerPool(context, 
getLogger());
 
-        final int numAssignedPartitions = 
ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
-        if (numAssignedPartitions > 0) {
+        final boolean explicitAssignment = 
ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties());
+        if (explicitAssignment) {
+            final int numAssignedPartitions = 
ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+
             // Request from Kafka the number of partitions for the topics that 
we are consuming from. Then ensure that we have
             // all of the partitions assigned.
             final int partitionCount = consumerPool.getPartitionCount();
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 46bf97a..591480a 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -34,8 +34,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 
@@ -102,7 +102,7 @@ public class ConsumerPool implements Closeable {
             final Charset headerCharacterSet,
             final Pattern headerNamePattern,
             final int[] partitionsToConsume) {
-        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.pooledLeases = new LinkedBlockingQueue<>();
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
         this.demarcatorBytes = demarcator;
@@ -119,6 +119,7 @@ public class ConsumerPool implements Closeable {
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
         this.partitionsToConsume = partitionsToConsume;
+        enqueueLeases(partitionsToConsume);
     }
 
     public ConsumerPool(
@@ -136,7 +137,7 @@ public class ConsumerPool implements Closeable {
             final Charset headerCharacterSet,
             final Pattern headerNamePattern,
             final int[] partitionsToConsume) {
-        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.pooledLeases = new LinkedBlockingQueue<>();
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
         this.demarcatorBytes = demarcator;
@@ -153,6 +154,7 @@ public class ConsumerPool implements Closeable {
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
         this.partitionsToConsume = partitionsToConsume;
+        enqueueLeases(partitionsToConsume);
     }
 
     public ConsumerPool(
@@ -171,7 +173,7 @@ public class ConsumerPool implements Closeable {
             final boolean separateByKey,
             final String keyEncoding,
             final int[] partitionsToConsume) {
-        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.pooledLeases = new LinkedBlockingQueue<>();
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
         this.demarcatorBytes = null;
@@ -188,6 +190,7 @@ public class ConsumerPool implements Closeable {
         this.separateByKey = separateByKey;
         this.keyEncoding = keyEncoding;
         this.partitionsToConsume = partitionsToConsume;
+        enqueueLeases(partitionsToConsume);
     }
 
     public ConsumerPool(
@@ -206,7 +209,7 @@ public class ConsumerPool implements Closeable {
             final boolean separateByKey,
             final String keyEncoding,
             final int[] partitionsToConsume) {
-        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.pooledLeases = new LinkedBlockingQueue<>();
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
         this.demarcatorBytes = null;
@@ -223,6 +226,7 @@ public class ConsumerPool implements Closeable {
         this.separateByKey = separateByKey;
         this.keyEncoding = keyEncoding;
         this.partitionsToConsume = partitionsToConsume;
+        enqueueLeases(partitionsToConsume);
     }
 
     public int getPartitionCount() {
@@ -282,16 +286,8 @@ public class ConsumerPool implements Closeable {
                     consumer.subscribe(topicPattern, lease);
                 }
             } else {
-                final List<TopicPartition> topicPartitions = new ArrayList<>();
-
-                for (final String topic : topics) {
-                    for (final int partition : partitionsToConsume) {
-                        final TopicPartition topicPartition = new 
TopicPartition(topic, partition);
-                        topicPartitions.add(topicPartition);
-                    }
-                }
-
-                consumer.assign(topicPartitions);
+                logger.debug("Cannot obtain lease to communicate with Kafka. 
Since partitions are explicitly assigned, cannot create a new lease.");
+                return null;
             }
         }
         lease.setProcessSession(session, processContext);
@@ -300,6 +296,32 @@ public class ConsumerPool implements Closeable {
         return lease;
     }
 
+    private SimpleConsumerLease createConsumerLease(final int partition) {
+        final List<TopicPartition> topicPartitions = new ArrayList<>();
+        for (final String topic : topics) {
+            final TopicPartition topicPartition = new TopicPartition(topic, 
partition);
+            topicPartitions.add(topicPartition);
+        }
+
+        final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+        consumerCreatedCountRef.incrementAndGet();
+        consumer.assign(topicPartitions);
+
+        final SimpleConsumerLease lease = new SimpleConsumerLease(consumer);
+        return lease;
+    }
+
+    private void enqueueLeases(final int[] partitionsToConsume) {
+        if (partitionsToConsume == null) {
+            return;
+        }
+
+        for (final int partition : partitionsToConsume) {
+            final SimpleConsumerLease lease = createConsumerLease(partition);
+            pooledLeases.add(lease);
+        }
+    }
+
     /**
      * Exposed as protected method for easier unit testing
      *

Reply via email to