NIFI-2444-0.x addressed PR comments

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7a9cbe13
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7a9cbe13
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7a9cbe13

Branch: refs/heads/0.x
Commit: 7a9cbe13af348b07e84ff61b29f711848561680d
Parents: 7040b4b
Author: Oleg Zhurakousky <o...@suitcase.io>
Authored: Tue Aug 2 18:27:55 2016 -0400
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Tue Aug 2 18:46:36 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/kafka/KafkaPublisher.java   | 26 ++++++--------------
 .../apache/nifi/processors/kafka/PutKafka.java  |  3 +--
 .../processors/kafka/KafkaPublisherTest.java    | 12 +++++----
 .../processors/kafka/pubsub/KafkaPublisher.java | 26 ++++++--------------
 .../processors/kafka/pubsub/PublishKafka.java   |  3 +--
 .../kafka/pubsub/KafkaPublisherTest.java        | 14 ++++++-----
 .../kafka/pubsub/StubPublishKafka.java          |  6 ++++-
 7 files changed, 36 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
index 2934799..b83edde 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -34,7 +34,6 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
 
 import kafka.producer.Partitioner;
@@ -49,14 +48,14 @@ class KafkaPublisher implements Closeable {
 
     private long ackWaitTime = 30000;
 
-    private ProcessorLog processLog;
+    private final ComponentLog componentLog;
 
     private final Partitioner partitioner;
 
     private final int ackCheckSize;
 
-    KafkaPublisher(Properties kafkaProperties) {
-        this(kafkaProperties, 100);
+    KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
+        this(kafkaProperties, 100, componentLog);
     }
 
     /**
@@ -68,7 +67,7 @@ class KafkaPublisher implements Closeable {
      *            instance of {@link Properties} used to bootstrap
      *            {@link KafkaProducer}
      */
-    KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
+    KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog 
componentLog) {
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
@@ -82,6 +81,7 @@ class KafkaPublisher implements Closeable {
         } catch (Exception e) {
             throw new IllegalStateException("Failed to create partitioner", e);
         }
+        this.componentLog = componentLog;
     }
 
     /**
@@ -218,25 +218,13 @@ class KafkaPublisher implements Closeable {
     }
 
     /**
-     * Will set {@link ProcessorLog} as an additional logger to forward log
-     * messages to NiFi bulletin
-     */
-    void setProcessLog(ProcessorLog processLog) {
-        this.processLog = processLog;
-    }
-
-    /**
      *
      */
     private void warnOrError(String message, Exception e) {
         if (e == null) {
-            if (this.processLog != null) {
-                this.processLog.warn(message);
-            }
+            this.componentLog.warn(message);
         } else {
-            if (this.processLog != null) {
-                this.processLog.error(message, e);
-            }
+            this.componentLog.error(message);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 3f6aec4..12a9b89 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -390,8 +390,7 @@ public class PutKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
     @Override
     protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session)
             throws ProcessException {
-        KafkaPublisher kafkaPublisher = new 
KafkaPublisher(this.buildKafkaConfigProperties(context));
-        kafkaPublisher.setProcessLog(this.getLogger());
+        KafkaPublisher kafkaPublisher = new 
KafkaPublisher(this.buildKafkaConfigProperties(context), this.getLogger());
         return kafkaPublisher;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
index c4fc9a8..5bb7c3c 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.kafka;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
@@ -29,6 +30,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult;
 import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
 import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
@@ -70,7 +72,7 @@ public class KafkaPublisherTest {
         String topicName = "validateSuccessfulSendAsWhole";
 
         Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
         KafkaPublisherResult result = publisher.publish(publishingContext);
@@ -96,7 +98,7 @@ public class KafkaPublisherTest {
         String topicName = "validateSuccessfulSendAsDelimited";
 
         Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
         
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
@@ -132,7 +134,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
 
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         // simulates the first re-try
         int lastAckedMessageIndex = 1;
@@ -179,7 +181,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
 
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         // simulates the first re-try
         int lastAckedMessageIndex = 3;
@@ -221,7 +223,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
 
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
 
         publisher.publish(publishingContext);

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
index 3c9d278..0ba381a 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
@@ -32,7 +32,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
 
 /**
@@ -44,12 +43,12 @@ class KafkaPublisher implements Closeable {
 
     private volatile long ackWaitTime = 30000;
 
-    private volatile ComponentLog processLog;
+    private final ComponentLog componentLog;
 
     private final int ackCheckSize;
 
-    KafkaPublisher(Properties kafkaProperties) {
-        this(kafkaProperties, 100);
+    KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
+        this(kafkaProperties, 100, componentLog);
     }
 
     /**
@@ -61,9 +60,10 @@ class KafkaPublisher implements Closeable {
      *            instance of {@link Properties} used to bootstrap
      *            {@link KafkaProducer}
      */
-    KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
+    KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog 
componentLog) {
         this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
         this.ackCheckSize = ackCheckSize;
+        this.componentLog = componentLog;
     }
 
     /**
@@ -196,25 +196,13 @@ class KafkaPublisher implements Closeable {
     }
 
     /**
-     * Will set {@link ProcessorLog} as an additional logger to forward log
-     * messages to NiFi bulletin
-     */
-    void setProcessLog(ProcessorLog processLog) {
-        this.processLog = processLog;
-    }
-
-    /**
      *
      */
     private void warnOrError(String message, Exception e) {
         if (e == null) {
-            if (this.processLog != null) {
-                this.processLog.warn(message);
-            }
+            this.componentLog.warn(message);
         } else {
-            if (this.processLog != null) {
-                this.processLog.error(message, e);
-            }
+            this.componentLog.error(message);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
index ddd6d3f..6703c04 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
@@ -228,8 +228,7 @@ public class PublishKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         this.brokers = 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
-        publisher.setProcessLog(this.getLogger());
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
this.getLogger());
         return publisher;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
index 2c45d37..6b8b042 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
@@ -32,6 +33,7 @@ import java.util.Properties;
 import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
 import 
org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
 import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
 import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
@@ -71,7 +73,7 @@ public class KafkaPublisherTest {
         String topicName = "validateSuccessfulSendAsWhole";
 
         Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
         KafkaPublisherResult result = publisher.publish(publishingContext);
@@ -97,7 +99,7 @@ public class KafkaPublisherTest {
         String topicName = "validateSuccessfulSendAsDelimited";
 
         Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
         
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
@@ -133,7 +135,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
 
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         // simulates the first re-try
         int lastAckedMessageIndex = 1;
@@ -180,7 +182,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
 
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
 
         // simulates the first re-try
         int lastAckedMessageIndex = 3;
@@ -221,7 +223,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
 
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
 
         publisher.publish(publishingContext);
@@ -240,7 +242,7 @@ public class KafkaPublisherTest {
 
         Properties kafkaProperties = this.buildProducerProperties();
         kafkaProperties.setProperty("partitioner.class", 
TestPartitioner.class.getName());
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
         PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
         
publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
index 0893614..780ea4a 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -83,6 +84,10 @@ public class StubPublishKafka extends PublishKafka {
             kf.setAccessible(true);
             kf.set(publisher, producer);
 
+            Field componentLogF = 
KafkaPublisher.class.getDeclaredField("componentLog");
+            componentLogF.setAccessible(true);
+            componentLogF.set(publisher, mock(ComponentLog.class));
+
             Field ackCheckSizeField = 
KafkaPublisher.class.getDeclaredField("ackCheckSize");
             ackCheckSizeField.setAccessible(true);
             ackCheckSizeField.set(publisher, this.ackCheckSize);
@@ -110,7 +115,6 @@ public class StubPublishKafka extends PublishKafka {
                     @Override
                     public RecordMetadata call() throws Exception {
                         if ("futurefail".equals(value) && 
!StubPublishKafka.this.failed) {
-                            // System.out.println("FAIL");
                             StubPublishKafka.this.failed = true;
                             throw new 
TopicAuthorizationException("Unauthorized");
                         } else {

Reply via email to