Repository: nifi
Updated Branches:
  refs/heads/master 282e1a7b1 -> fd92999da


NIFI-3189: ConsumeKafka 0.9 and 0.10 with downstream backpressure

Currently, NiFi Kafka consumer processors have following issue.

While downstream connections are full, ConsumeKafka is not scheduled to run 
onTrigger.
It stopps executing poll to tell Kafka server that this client is alive.
Thus, after a while in that situation, Kafka server rebalances the client.
When downstream connections back to normal, although ConsumeKafka is scheduled 
again,
the client is no longer a part of a consumer group.

If this happens, Kafka client succeeds polling messages when ConsumeKafka 
processor resumes, but fails to commit offset.
Received messages are already committed into NiFi flow, but since consumer 
offset is not updated, those will be consumed again, duplicated.

In order to address above issue:

- For ConsumeKafka_0_10, use latest client library

    Above issue has been addressed by KIP-62.
    The latest Kafka consumer poll checks if the client instance is still 
valid, and rejoin the group if not, before consuming messages.

- For ConsumeKafka (0.9), added manual retention logic using pause/resume

    Kafka client 0.9 doesn't have background thread heartbeat, so similar 
machanism is added manually.
    Use Kafka pause/resume consumer API to tell Kafka server that the client 
stops consuming messages but is still alive.
    Another internal thread is used to perform paused poll periodically based 
on the time passed since the last onTrigger(poll) is executed.

This closes #1527.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fd92999d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fd92999d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fd92999d

Branch: refs/heads/master
Commit: fd92999dafc040940011c87bb2ee2c8edf5f96a2
Parents: 282e1a7
Author: Koji Kawamura <[email protected]>
Authored: Tue Feb 28 12:57:04 2017 +0900
Committer: Bryan Bende <[email protected]>
Committed: Thu Mar 30 16:29:40 2017 -0400

----------------------------------------------------------------------
 .../processors/kafka/pubsub/ConsumerLease.java  | 16 ++---
 .../processors/kafka/pubsub/ConsumeKafka.java   | 72 ++++++++++++++++++++
 .../processors/kafka/pubsub/ConsumerLease.java  | 35 ++++++++++
 .../processors/kafka/pubsub/ConsumerPool.java   |  6 ++
 .../kafka/pubsub/ConsumeKafkaTest.java          | 36 ++++++++++
 nifi-nar-bundles/nifi-kafka-bundle/pom.xml      |  2 +-
 6 files changed, 157 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fd92999d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 97ebfc6..7ea180d 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -130,15 +130,13 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
      */
     void poll() {
         /**
-         * Implementation note: If we take too long (30 secs?) between kafka
-         * poll calls and our own record processing to any subsequent poll 
calls
-         * or the commit we can run into a situation where the commit will
-         * succeed to the session but fail on committing offsets. This is
-         * apparently different than the Kafka scenario of electing to 
rebalance
-         * for other reasons but in this case is due a session timeout. It
-         * appears Kafka KIP-62 aims to offer more control over the meaning of
-         * various timeouts. If we do run into this case it could result in
-         * duplicates.
+         * Implementation note:
+         * Even if ConsumeKafka is not scheduled to poll due to downstream 
connection back-pressure is engaged,
+         * for longer than session.timeout.ms (defaults to 10 sec), Kafka 
consumer sends heartbeat from background thread.
+         * If this situation lasts longer than max.poll.interval.ms (defaults 
to 5 min), Kafka consumer sends
+         * Leave Group request to Group Coordinator. When ConsumeKafka 
processor is scheduled again, Kafka client checks
+         * if this client instance is still a part of consumer group. If not, 
it rejoins before polling messages.
+         * This behavior has been fixed via Kafka KIP-62 and available from 
Kafka client 0.10.1.0.
          */
         try {
             final ConsumerRecords<byte[], byte[]> records = 
kafkaConsumer.poll(10);

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd92999d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 3e01e51..ff01746 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -25,6 +25,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.KafkaException;
@@ -36,10 +38,12 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.logging.ComponentLog;
@@ -162,6 +166,10 @@ public class ConsumeKafka extends AbstractProcessor {
     private volatile ConsumerPool consumerPool = null;
     private final Set<ConsumerLease> activeLeases = 
Collections.synchronizedSet(new HashSet<>());
 
+    private int heartbeatCheckIntervalMillis;
+    private volatile long lastTriggeredTimestamp = -1L;
+    private volatile ScheduledExecutorService connectionRetainer;
+
     static {
         List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
@@ -275,8 +283,72 @@ public class ConsumeKafka extends AbstractProcessor {
         activeLeases.clear();
     }
 
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+
+        final PropertyValue heartbeatIntervalMsConfig = 
context.getProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+        if (heartbeatIntervalMsConfig != null && 
heartbeatIntervalMsConfig.isSet()) {
+            heartbeatCheckIntervalMillis = 
heartbeatIntervalMsConfig.asInteger();
+        } else {
+            // Derived from org.apache.kafka.clients.consumer.ConsumerConfig.
+            heartbeatCheckIntervalMillis = 3_000;
+        }
+
+        // Without this, it remains -1 if downstream connections are full when 
this processor is scheduled at the 1st run after restart.
+        lastTriggeredTimestamp = System.currentTimeMillis();
+
+        // Stop previous connectionRetainer, if any, just in case, this 
shouldn't happen though
+        final ComponentLog logger = getLogger();
+        if (connectionRetainer != null) {
+            logger.warn("Connection retainer {} is still running, indicating 
something had happened.",
+                    new Object[]{connectionRetainer});
+            stopConnectionRetainer();
+        }
+        connectionRetainer = Executors.newSingleThreadScheduledExecutor();
+        connectionRetainer.scheduleAtFixedRate(() -> {
+            final long now = System.currentTimeMillis();
+            if (lastTriggeredTimestamp < 0
+                    || lastTriggeredTimestamp > now - 
heartbeatCheckIntervalMillis) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("No need to retain connection. Triggered at 
{}, {} millis ago.",
+                            new Object[]{lastTriggeredTimestamp, now - 
lastTriggeredTimestamp});
+                }
+                return;
+            }
+            try {
+                final ConsumerPool pool = getConsumerPool(context);
+                if (logger.isDebugEnabled()) {
+                    final ConsumerPool.PoolStats stats = pool.getPoolStats();
+                    logger.debug("Trying to retain connection. Obtained 
pool={}," +
+                                    " leaseObtainedCount={}, 
consumerCreatedCount={}, consumerClosedCount={}",
+                            new Object[]{pool, stats.leasesObtainedCount, 
stats.consumerCreatedCount, stats.consumerClosedCount});
+                }
+                pool.retainConsumers();
+            } catch (final Exception e) {
+                logger.warn("Failed to retain connection due to {}", new 
Object[]{e}, e);
+            }
+        }, heartbeatCheckIntervalMillis, heartbeatCheckIntervalMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    @OnUnscheduled
+    public void stopConnectionRetainer() {
+        if (connectionRetainer != null) {
+            final ComponentLog logger = getLogger();
+            logger.debug("Canceling connectionRetainer... {}", new 
Object[]{connectionRetainer});
+            try {
+                connectionRetainer.shutdownNow();
+            } catch (final Exception e) {
+                logger.warn("Failed to shutdown connection retainer {} due to 
{}", new Object[]{connectionRetainer, e}, e);
+            }
+            connectionRetainer = null;
+        }
+    }
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+        lastTriggeredTimestamp = System.currentTimeMillis();
+
         final ConsumerPool pool = getConsumerPool(context);
         if (pool == null) {
             context.yield();

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd92999d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index cd9365d..53e7a23 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -23,7 +23,9 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.xml.bind.DatatypeConverter;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -64,6 +66,7 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
     private long leaseStartNanos = -1;
     private boolean lastPollEmpty = false;
     private int totalFlowFiles = 0;
+    private ReentrantLock pollingLock = new ReentrantLock();
 
     ConsumerLease(
             final long maxWaitMillis,
@@ -139,7 +142,9 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
          * appears Kafka KIP-62 aims to offer more control over the meaning of
          * various timeouts. If we do run into this case it could result in
          * duplicates.
+         * This can be avoided by calling retainConnection() periodically.
          */
+        pollingLock.lock();
         try {
             final ConsumerRecords<byte[], byte[]> records = 
kafkaConsumer.poll(10);
             lastPollEmpty = records.count() == 0;
@@ -147,6 +152,36 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
         } catch (final Throwable t) {
             this.poison();
             throw t;
+        } finally {
+            pollingLock.unlock();
+        }
+    }
+
+    /**
+     * Execute poll using pause API just for sending heartbeat, not polling 
messages.
+     */
+    void retainConnection() {
+        pollingLock.lock();
+        TopicPartition[] assignments = null;
+        try {
+            final Set<TopicPartition> assignmentSet = 
kafkaConsumer.assignment();
+            if (assignmentSet.isEmpty()) {
+                return;
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("Pausing " + assignmentSet);
+            }
+            assignments = assignmentSet.toArray(new 
TopicPartition[assignmentSet.size()]);
+            kafkaConsumer.pause(assignments);
+            kafkaConsumer.poll(0);
+            if (logger.isDebugEnabled()) {
+                logger.debug("Resuming " + assignments);
+            }
+        } finally {
+            if (assignments != null) {
+                kafkaConsumer.resume(assignments);
+            }
+            pollingLock.unlock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd92999d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index e13a8c3..faa4643 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -22,6 +22,7 @@ import org.apache.nifi.logging.ComponentLog;
 
 import java.io.Closeable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -127,6 +128,11 @@ public class ConsumerPool implements Closeable {
         return lease;
     }
 
+    public void retainConsumers() {
+        Arrays.stream(pooledLeases.toArray(new ConsumerLease[]{}))
+                .forEach(ConsumerLease::retainConnection);
+    }
+
     /**
      * Exposed as protected method for easier unit testing
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd92999d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index f6febd2..ce10923 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -20,8 +20,10 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
@@ -29,6 +31,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import org.junit.Before;
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -164,4 +167,37 @@ public class ConsumeKafkaTest {
         verifyNoMoreInteractions(mockLease);
     }
 
+    @Test
+    public void validateConsumerRetainer() throws Exception {
+        final ConsumerPool consumerPool = mock(ConsumerPool.class);
+
+        final ConsumeKafka processor = new ConsumeKafka() {
+            @Override
+            protected ConsumerPool createConsumerPool(ProcessContext context, 
ComponentLog log) {
+                return consumerPool;
+            }
+        };
+
+        final ComponentLog logger = mock(ComponentLog.class);
+        final ProcessorInitializationContext initializationContext = 
mock(ProcessorInitializationContext.class);
+        when(initializationContext.getLogger()).thenReturn(logger);
+        processor.initialize(initializationContext);
+
+        final ProcessContext processContext = mock(ProcessContext.class);
+        final PropertyValue heartbeatInternalMsConfig = 
mock(PropertyValue.class);
+        when(heartbeatInternalMsConfig.isSet()).thenReturn(true);
+        when(heartbeatInternalMsConfig.asInteger()).thenReturn(100);
+        
when(processContext.getProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).thenReturn(heartbeatInternalMsConfig);
+        processor.onScheduled(processContext);
+
+        // retainConsumers should be called at least 1 time if it passed 
longer than heartbeat interval milliseconds.
+        Thread.sleep(200);
+        verify(consumerPool, atLeast(1)).retainConsumers();
+
+        processor.stopConnectionRetainer();
+
+        // After stopping connection retainer, it shouldn't interact with 
consumerPool.
+        Thread.sleep(200);
+        verifyNoMoreInteractions(consumerPool);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd92999d/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml 
b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
index 99d8046..130609d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
@@ -25,7 +25,7 @@
     <properties>
       <kafka8.version>0.8.2.2</kafka8.version>
       <kafka9.version>0.9.0.1</kafka9.version>
-      <kafka10.version>0.10.0.1</kafka10.version>
+      <kafka10.version>0.10.2.0</kafka10.version>
     </properties>
 
     <modules>

Reply via email to