Repository: nifi
Updated Branches:
  refs/heads/master 52d97f966 -> 54549891e


NIFI-2444 NIFI-2445 fixed PublishKafka
- fixed the logging issue NIFI-2444 by ensuring the ProcessLog is added to 
KafkaPublisher
- fixed KafkaPublisher's isAllAcked operation to ensure that it properly 
reports that the flow file has failed.
- added additional test
This closes #758.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/54549891
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/54549891
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/54549891

Branch: refs/heads/master
Commit: 54549891e3c6fd6a3a625ec24facc0bd5de6d556
Parents: 52d97f9
Author: Oleg Zhurakousky <o...@suitcase.io>
Authored: Mon Aug 1 10:56:46 2016 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Tue Aug 2 20:30:34 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/kafka/KafkaPublisher.java   | 33 +++++------------
 .../apache/nifi/processors/kafka/PutKafka.java  |  3 +-
 .../processors/kafka/KafkaPublisherTest.java    | 12 ++++---
 .../processors/kafka/pubsub/KafkaPublisher.java | 34 +++++-------------
 .../processors/kafka/pubsub/PublishKafka.java   |  3 +-
 .../kafka/pubsub/KafkaPublisherTest.java        | 14 ++++----
 .../kafka/pubsub/PublishKafkaTest.java          | 32 ++++++++++++++++-
 .../kafka/pubsub/StubPublishKafka.java          | 38 ++++++++++++++++----
 8 files changed, 96 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/54549891/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
index 561f36b..5bc0e0e 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -35,8 +35,6 @@ import 
org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import kafka.producer.Partitioner;
 
@@ -46,20 +44,18 @@ import kafka.producer.Partitioner;
  */
 class KafkaPublisher implements Closeable {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(KafkaPublisher.class);
-
     private final Producer<byte[], byte[]> kafkaProducer;
 
     private long ackWaitTime = 30000;
 
-    private ComponentLog processLog;
+    private final ComponentLog componentLog;
 
     private final Partitioner partitioner;
 
     private final int ackCheckSize;
 
-    KafkaPublisher(Properties kafkaProperties) {
-        this(kafkaProperties, 100);
+    KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
+        this(kafkaProperties, 100, componentLog);
     }
 
     /**
@@ -71,7 +67,7 @@ class KafkaPublisher implements Closeable {
      *            instance of {@link Properties} used to bootstrap
      *            {@link KafkaProducer}
      */
-    KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
+    KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog 
componentLog) {
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
@@ -85,6 +81,7 @@ class KafkaPublisher implements Closeable {
         } catch (Exception e) {
             throw new IllegalStateException("Failed to create partitioner", e);
         }
+        this.componentLog = componentLog;
     }
 
     /**
@@ -221,27 +218,13 @@ class KafkaPublisher implements Closeable {
     }
 
     /**
-     * Will set {@link ComponentLog} as an additional logger to forward log
-     * messages to NiFi bulletin
-     */
-    void setProcessLog(ComponentLog processLog) {
-        this.processLog = processLog;
-    }
-
-    /**
      *
      */
     private void warnOrError(String message, Exception e) {
         if (e == null) {
-            logger.warn(message);
-            if (this.processLog != null) {
-                this.processLog.warn(message);
-            }
+            this.componentLog.warn(message);
         } else {
-            logger.error(message, e);
-            if (this.processLog != null) {
-                this.processLog.error(message, e);
-            }
+            this.componentLog.error(message);
         }
     }
 
@@ -262,7 +245,7 @@ class KafkaPublisher implements Closeable {
         }
 
         public boolean isAllAcked() {
-            return this.messagesSent - 1 == this.lastMessageAcked;
+            return this.lastMessageAcked > -1 && this.messagesSent - 1 == 
this.lastMessageAcked;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/54549891/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index dc0889a..4dc8d18 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -388,8 +388,7 @@ public class PutKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
     @Override
     protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session)
             throws ProcessException {
-        KafkaPublisher kafkaPublisher = new 
KafkaPublisher(this.buildKafkaConfigProperties(context));
-        kafkaPublisher.setProcessLog(this.getLogger());
+        KafkaPublisher kafkaPublisher = new 
KafkaPublisher(this.buildKafkaConfigProperties(context), this.getLogger());
         return kafkaPublisher;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/54549891/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
index c4fc9a8..5bb7c3c 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.kafka;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
@@ -29,6 +30,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult;
 import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
 import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
@@ -70,7 +72,7 @@ public class KafkaPublisherTest {
         String topicName = "validateSuccessfulSendAsWhole";
 
         Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
         KafkaPublisherResult result = publisher.publish(publishingContext);
@@ -96,7 +98,7 @@ public class KafkaPublisherTest {
         String topicName = "validateSuccessfulSendAsDelimited";
 
         Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
         
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
@@ -132,7 +134,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
 
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         // simulates the first re-try
         int lastAckedMessageIndex = 1;
@@ -179,7 +181,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
 
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         // simulates the first re-try
         int lastAckedMessageIndex = 3;
@@ -221,7 +223,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
 
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
 
         publisher.publish(publishingContext);

http://git-wip-us.apache.org/repos/asf/nifi/blob/54549891/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
index 366efef..f684bfa 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
@@ -33,27 +33,22 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
  * with sending contents of the {@link FlowFile}s to Kafka.
  */
 class KafkaPublisher implements Closeable {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(KafkaPublisher.class);
-
     private final Producer<byte[], byte[]> kafkaProducer;
 
     private volatile long ackWaitTime = 30000;
 
-    private volatile ComponentLog processLog;
+    private final ComponentLog componentLog;
 
     private final int ackCheckSize;
 
-    KafkaPublisher(Properties kafkaProperties) {
-        this(kafkaProperties, 100);
+    KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
+        this(kafkaProperties, 100, componentLog);
     }
 
     /**
@@ -65,9 +60,10 @@ class KafkaPublisher implements Closeable {
      *            instance of {@link Properties} used to bootstrap
      *            {@link KafkaProducer}
      */
-    KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
+    KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog 
componentLog) {
         this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
         this.ackCheckSize = ackCheckSize;
+        this.componentLog = componentLog;
     }
 
     /**
@@ -200,27 +196,13 @@ class KafkaPublisher implements Closeable {
     }
 
     /**
-     * Will set {@link ComponentLog} as an additional logger to forward log
-     * messages to NiFi bulletin
-     */
-    void setProcessLog(ComponentLog processLog) {
-        this.processLog = processLog;
-    }
-
-    /**
      *
      */
     private void warnOrError(String message, Exception e) {
         if (e == null) {
-            logger.warn(message);
-            if (this.processLog != null) {
-                this.processLog.warn(message);
-            }
+            this.componentLog.warn(message);
         } else {
-            logger.error(message, e);
-            if (this.processLog != null) {
-                this.processLog.error(message, e);
-            }
+            this.componentLog.error(message, e);
         }
     }
 
@@ -244,7 +226,7 @@ class KafkaPublisher implements Closeable {
         }
 
         public boolean isAllAcked() {
-            return this.messagesSent - 1 == this.lastMessageAcked;
+            return this.lastMessageAcked > -1 && this.messagesSent - 1 == 
this.lastMessageAcked;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/54549891/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
index 6235f0b..6703c04 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
@@ -228,7 +228,8 @@ public class PublishKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         this.brokers = 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
-        return new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
this.getLogger());
+        return publisher;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/54549891/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
index 2c45d37..6b8b042 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
@@ -32,6 +33,7 @@ import java.util.Properties;
 import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
 import 
org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
 import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
 import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
@@ -71,7 +73,7 @@ public class KafkaPublisherTest {
         String topicName = "validateSuccessfulSendAsWhole";
 
         Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
         KafkaPublisherResult result = publisher.publish(publishingContext);
@@ -97,7 +99,7 @@ public class KafkaPublisherTest {
         String topicName = "validateSuccessfulSendAsDelimited";
 
         Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
         
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
@@ -133,7 +135,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
 
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         // simulates the first re-try
         int lastAckedMessageIndex = 1;
@@ -180,7 +182,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
 
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         // simulates the first re-try
         int lastAckedMessageIndex = 3;
@@ -221,7 +223,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
 
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
 
         publisher.publish(publishingContext);
@@ -240,7 +242,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
         kafkaProperties.setProperty("partitioner.class", 
TestPartitioner.class.getName());
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
         
publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/54549891/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
index af550b4..be97578 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -153,7 +153,7 @@ public class PublishKafkaTest {
         runner.setProperty(PublishKafka.KEY, "key1");
         runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
         runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
+        runner.setProperty(PublishKafka.META_WAIT_TIME, "3000 millis");
 
         final String text = "Hello World\nGoodbye\nfail\n2";
         runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
@@ -164,6 +164,7 @@ public class PublishKafkaTest {
         Producer<byte[], byte[]> producer = putKafka.getProducer();
         verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
         runner.shutdown();
+        putKafka.destroy();
     }
 
     @SuppressWarnings("unchecked")
@@ -193,6 +194,35 @@ public class PublishKafkaTest {
 
     @SuppressWarnings("unchecked")
     @Test
+    public void 
validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws 
Exception {
+        String topicName = "validateSendFailureAndThenResendSuccess";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.KEY, "key1");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
+        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
+
+        final String text = "futurefail\nHello World\nGoodbye\n2";
+        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        MockFlowFile ff = 
runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0);
+        assertNotNull(ff);
+        runner.enqueue(ff);
+
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        // 6 sends due to duplication
+        verify(producer, times(5)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
     public void validateOnFutureGetFailureAndThenResendSuccess() throws 
Exception {
         String topicName = "validateSendFailureAndThenResendSuccess";
         StubPublishKafka putKafka = new StubPublishKafka(100);

http://git-wip-us.apache.org/repos/asf/nifi/blob/54549891/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
index 2236f30..3189356 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
@@ -22,14 +22,19 @@ import static org.mockito.Mockito.when;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -45,6 +50,8 @@ public class StubPublishKafka extends PublishKafka {
 
     private final int ackCheckSize;
 
+    private final ExecutorService executor = Executors.newCachedThreadPool();
+
     StubPublishKafka(int ackCheckSize) {
         this.ackCheckSize = ackCheckSize;
     }
@@ -53,6 +60,10 @@ public class StubPublishKafka extends PublishKafka {
         return producer;
     }
 
+    public void destroy() {
+        this.executor.shutdownNow();
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session)
@@ -66,12 +77,17 @@ public class StubPublishKafka extends PublishKafka {
             f.setAccessible(true);
             f.set(this, 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue());
             publisher = (KafkaPublisher) 
TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class);
+            publisher.setAckWaitTime(15000);
             producer = mock(Producer.class);
             this.instrumentProducer(producer, false);
             Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
             kf.setAccessible(true);
             kf.set(publisher, producer);
 
+            Field componentLogF = 
KafkaPublisher.class.getDeclaredField("componentLog");
+            componentLogF.setAccessible(true);
+            componentLogF.set(publisher, mock(ComponentLog.class));
+
             Field ackCheckSizeField = 
KafkaPublisher.class.getDeclaredField("ackCheckSize");
             ackCheckSizeField.setAccessible(true);
             ackCheckSizeField.set(publisher, this.ackCheckSize);
@@ -84,8 +100,8 @@ public class StubPublishKafka extends PublishKafka {
 
     @SuppressWarnings("unchecked")
     private void instrumentProducer(Producer<byte[], byte[]> producer, boolean 
failRandomly) {
+
         when(producer.send(Mockito.any(ProducerRecord.class))).then(new 
Answer<Future<RecordMetadata>>() {
-            @SuppressWarnings("rawtypes")
             @Override
             public Future<RecordMetadata> answer(InvocationOnMock invocation) 
throws Throwable {
                 ProducerRecord<byte[], byte[]> record = 
(ProducerRecord<byte[], byte[]>) invocation.getArguments()[0];
@@ -94,11 +110,19 @@ public class StubPublishKafka extends PublishKafka {
                     StubPublishKafka.this.failed = true;
                     throw new RuntimeException("intentional");
                 }
-                Future future = mock(Future.class);
-                if ("futurefail".equals(value) && 
!StubPublishKafka.this.failed) {
-                    StubPublishKafka.this.failed = true;
-                    when(future.get(Mockito.anyLong(), 
Mockito.any())).thenThrow(ExecutionException.class);
-                }
+                Future<RecordMetadata> future = executor.submit(new 
Callable<RecordMetadata>() {
+                    @Override
+                    public RecordMetadata call() throws Exception {
+                        if ("futurefail".equals(value) && 
!StubPublishKafka.this.failed) {
+                            StubPublishKafka.this.failed = true;
+                            throw new 
TopicAuthorizationException("Unauthorized");
+                        } else {
+                            TopicPartition partition = new 
TopicPartition("foo", 0);
+                            RecordMetadata meta = new 
RecordMetadata(partition, 0, 0);
+                            return meta;
+                        }
+                    }
+                });
                 return future;
             }
         });

Reply via email to