NIFI-2444-0.x addressed PR comments
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7a9cbe13 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7a9cbe13 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7a9cbe13 Branch: refs/heads/0.x Commit: 7a9cbe13af348b07e84ff61b29f711848561680d Parents: 7040b4b Author: Oleg Zhurakousky <o...@suitcase.io> Authored: Tue Aug 2 18:27:55 2016 -0400 Committer: Oleg Zhurakousky <o...@suitcase.io> Committed: Tue Aug 2 18:46:36 2016 -0400 ---------------------------------------------------------------------- .../nifi/processors/kafka/KafkaPublisher.java | 26 ++++++-------------- .../apache/nifi/processors/kafka/PutKafka.java | 3 +-- .../processors/kafka/KafkaPublisherTest.java | 12 +++++---- .../processors/kafka/pubsub/KafkaPublisher.java | 26 ++++++-------------- .../processors/kafka/pubsub/PublishKafka.java | 3 +-- .../kafka/pubsub/KafkaPublisherTest.java | 14 ++++++----- .../kafka/pubsub/StubPublishKafka.java | 6 ++++- 7 files changed, 36 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java index 2934799..b83edde 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -34,7 +34,6 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.stream.io.util.StreamDemarcator; import kafka.producer.Partitioner; @@ -49,14 +48,14 @@ class KafkaPublisher implements Closeable { private long ackWaitTime = 30000; - private ProcessorLog processLog; + private final ComponentLog componentLog; private final Partitioner partitioner; private final int ackCheckSize; - KafkaPublisher(Properties kafkaProperties) { - this(kafkaProperties, 100); + KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) { + this(kafkaProperties, 100, componentLog); } /** @@ -68,7 +67,7 @@ class KafkaPublisher implements Closeable { * instance of {@link Properties} used to bootstrap * {@link KafkaProducer} */ - KafkaPublisher(Properties kafkaProperties, int ackCheckSize) { + KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) { kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.kafkaProducer = new KafkaProducer<>(kafkaProperties); @@ -82,6 +81,7 @@ class KafkaPublisher implements Closeable { } catch (Exception e) { throw new IllegalStateException("Failed to create partitioner", e); } + this.componentLog = componentLog; } /** @@ -218,25 +218,13 @@ class KafkaPublisher implements Closeable { } /** - * Will set {@link ProcessorLog} as an additional logger to forward log - * messages to NiFi bulletin - */ - void setProcessLog(ProcessorLog processLog) { - this.processLog = processLog; - } - - /** * */ private void warnOrError(String message, Exception e) { if (e == null) { - if (this.processLog != null) { - this.processLog.warn(message); - } + this.componentLog.warn(message); } else { - if (this.processLog != null) { - this.processLog.error(message, e); - } + this.componentLog.error(message); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 3f6aec4..12a9b89 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -390,8 +390,7 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { @Override protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException { - KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context)); - kafkaPublisher.setProcessLog(this.getLogger()); + KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context), this.getLogger()); return kafkaPublisher; } http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java index c4fc9a8..5bb7c3c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.kafka; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -29,6 +30,7 @@ import java.util.Map; import java.util.Properties; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult; import org.apache.nifi.processors.kafka.test.EmbeddedKafka; import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; @@ -70,7 +72,7 @@ public class KafkaPublisherTest { String topicName = "validateSuccessfulSendAsWhole"; Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); KafkaPublisherResult result = publisher.publish(publishingContext); @@ -96,7 +98,7 @@ public class KafkaPublisherTest { String topicName = "validateSuccessfulSendAsDelimited"; Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); @@ -132,7 +134,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); // simulates the first re-try int lastAckedMessageIndex = 1; @@ -179,7 +181,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); // simulates the first re-try int lastAckedMessageIndex = 3; @@ -221,7 +223,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publisher.publish(publishingContext); http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java index 3c9d278..0ba381a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java @@ -32,7 +32,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.stream.io.util.StreamDemarcator; /** @@ -44,12 +43,12 @@ class KafkaPublisher implements Closeable { private volatile long ackWaitTime = 30000; - private volatile ComponentLog processLog; + private final ComponentLog componentLog; private final int ackCheckSize; - KafkaPublisher(Properties kafkaProperties) { - this(kafkaProperties, 100); + KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) { + this(kafkaProperties, 100, componentLog); } /** @@ -61,9 +60,10 @@ class KafkaPublisher implements Closeable { * instance of {@link Properties} used to bootstrap * {@link KafkaProducer} */ - KafkaPublisher(Properties kafkaProperties, int ackCheckSize) { + KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) { this.kafkaProducer = new KafkaProducer<>(kafkaProperties); this.ackCheckSize = ackCheckSize; + this.componentLog = componentLog; } /** @@ -196,25 +196,13 @@ class KafkaPublisher implements Closeable { } /** - * Will set {@link ProcessorLog} as an additional logger to forward log - * messages to NiFi bulletin - */ - void setProcessLog(ProcessorLog processLog) { - this.processLog = processLog; - } - - /** * */ private void warnOrError(String message, Exception e) { if (e == null) { - if (this.processLog != null) { - this.processLog.warn(message); - } + this.componentLog.warn(message); } else { - if (this.processLog != null) { - this.processLog.error(message, e); - } + this.componentLog.error(message); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java index ddd6d3f..6703c04 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -228,8 +228,7 @@ public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> { kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); - publisher.setProcessLog(this.getLogger()); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, this.getLogger()); return publisher; } http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java index 2c45d37..6b8b042 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -32,6 +33,7 @@ import java.util.Properties; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult; import org.apache.nifi.processors.kafka.test.EmbeddedKafka; import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; @@ -71,7 +73,7 @@ public class KafkaPublisherTest { String topicName = "validateSuccessfulSendAsWhole"; Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); KafkaPublisherResult result = publisher.publish(publishingContext); @@ -97,7 +99,7 @@ public class KafkaPublisherTest { String topicName = "validateSuccessfulSendAsDelimited"; Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); @@ -133,7 +135,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); // simulates the first re-try int lastAckedMessageIndex = 1; @@ -180,7 +182,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); // simulates the first re-try int lastAckedMessageIndex = 3; @@ -221,7 +223,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publisher.publish(publishingContext); @@ -240,7 +242,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName()); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8)); http://git-wip-us.apache.org/repos/asf/nifi/blob/7a9cbe13/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java index 0893614..780ea4a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java @@ -34,6 +34,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; @@ -83,6 +84,10 @@ public class StubPublishKafka extends PublishKafka { kf.setAccessible(true); kf.set(publisher, producer); + Field componentLogF = KafkaPublisher.class.getDeclaredField("componentLog"); + componentLogF.setAccessible(true); + componentLogF.set(publisher, mock(ComponentLog.class)); + Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize"); ackCheckSizeField.setAccessible(true); ackCheckSizeField.set(publisher, this.ackCheckSize); @@ -110,7 +115,6 @@ public class StubPublishKafka extends PublishKafka { @Override public RecordMetadata call() throws Exception { if ("futurefail".equals(value) && !StubPublishKafka.this.failed) { - // System.out.println("FAIL"); StubPublishKafka.this.failed = true; throw new TopicAuthorizationException("Unauthorized"); } else {