NIFI-413: Formatted code to fix checkstyle failures

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/f2f90560
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/f2f90560
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/f2f90560

Branch: refs/heads/NIFI-680
Commit: f2f90560557de70ef4404672ce53c4593995a5f1
Parents: f5226ad
Author: Mark Payne <marka...@hotmail.com>
Authored: Mon Jun 22 12:05:02 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Mon Jun 22 12:05:02 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/PutKafka.java  | 319 ++++++++++---------
 .../nifi/processors/kafka/TestPutKafka.java     |  79 ++---
 2 files changed, 203 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f2f90560/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index e572622..d83c7bf 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -61,7 +61,7 @@ import org.apache.nifi.util.LongHolder;
 import scala.actors.threadpool.Arrays;
 
 @SupportsBatching
-@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
+@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
 @CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka")
 public class PutKafka extends AbstractProcessor {
 
@@ -69,13 +69,13 @@ public class PutKafka extends AbstractProcessor {
     private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + 
"(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
 
     public static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed 
to"
-            + " failure unless the message is replicated to the appropriate 
number of Kafka Nodes according to the Topic configuration");
+        + " failure unless the message is replicated to the appropriate number 
of Kafka Nodes according to the Topic configuration");
     public static final AllowableValue DELIVERY_ONE_NODE = new 
AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed"
-            + " to success if the message is received by a single Kafka node, 
whether or not it is replicated. This is faster than"
-            + " <Guarantee Replicated Delivery> but can result in data loss if 
a Kafka node crashes");
+        + " to success if the message is received by a single Kafka node, 
whether or not it is replicated. This is faster than"
+        + " <Guarantee Replicated Delivery> but can result in data loss if a 
Kafka node crashes");
     public static final AllowableValue DELIVERY_BEST_EFFORT = new 
AllowableValue("0", "Best Effort", "FlowFile will be routed to success after"
-            + " successfully writing the content to a Kafka node, without 
waiting for a response. This provides the best performance but may result"
-            + " in data loss.");
+        + " successfully writing the content to a Kafka node, without waiting 
for a response. This provides the best performance but may result"
+        + " in data loss.");
 
     /**
      * AllowableValue for a Producer Type that synchronously sends messages to 
Kafka
@@ -86,7 +86,7 @@ public class PutKafka extends AbstractProcessor {
      * AllowableValue for a Producer Type that asynchronously sends messages 
to Kafka
      */
     public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new 
AllowableValue("async", "Asynchronous", "Batch messages before sending them to 
Kafka."
-           + " While this will improve throughput, it opens the possibility 
that a failure on the client machine will drop unsent data.");
+        + " While this will improve throughput, it opens the possibility that 
a failure on the client machine will drop unsent data.");
 
     /**
      * AllowableValue for sending messages to Kafka without compression
@@ -103,150 +103,156 @@ public class PutKafka extends AbstractProcessor {
      */
     public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new 
AllowableValue("snappy", "Snappy", "Compress messages using Snappy");
 
+
     public static final PropertyDescriptor SEED_BROKERS = new 
PropertyDescriptor.Builder()
-            .name("Known Brokers")
-            .description("A comma-separated list of known Kafka Brokers in the 
format <host>:<port>")
-            .required(true)
-            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
-            .expressionLanguageSupported(false)
-            .build();
+        .name("Known Brokers")
+        .description("A comma-separated list of known Kafka Brokers in the 
format <host>:<port>")
+        .required(true)
+        
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
+        .expressionLanguageSupported(false)
+        .build();
     public static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
-            .name("Topic Name")
-            .description("The Kafka Topic of interest")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Topic Name")
+        .description("The Kafka Topic of interest")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
-            .name("Kafka Key")
-            .description("The Key to use for the Message")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Kafka Key")
+        .description("The Key to use for the Message")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor DELIVERY_GUARANTEE = new 
PropertyDescriptor.Builder()
-            .name("Delivery Guarantee")
-            .description("Specifies the requirement for guaranteeing that a 
message is sent to Kafka")
-            .required(true)
-            .expressionLanguageSupported(false)
-            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, 
DELIVERY_REPLICATED)
-            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
-            .build();
+        .name("Delivery Guarantee")
+        .description("Specifies the requirement for guaranteeing that a 
message is sent to Kafka")
+        .required(true)
+        .expressionLanguageSupported(false)
+        .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, 
DELIVERY_REPLICATED)
+        .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+        .build();
     public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
-            .name("Message Delimiter")
-            .description("Specifies the delimiter to use for splitting apart 
multiple messages within a single FlowFile. "
-                    + "If not specified, the entire content of the FlowFile 
will be used as a single message. "
-                    + "If specified, the contents of the FlowFile will be 
split on this delimiter and each section "
-                    + "sent as a separate Kafka message.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Message Delimiter")
+        .description("Specifies the delimiter to use for splitting apart 
multiple messages within a single FlowFile. "
+            + "If not specified, the entire content of the FlowFile will be 
used as a single message. "
+            + "If specified, the contents of the FlowFile will be split on 
this delimiter and each section "
+            + "sent as a separate Kafka message.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Max Buffer Size")
-            .description("The maximum amount of data to buffer in memory 
before sending to Kafka")
-            .required(true)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .defaultValue("1 MB")
-            .build();
+        .name("Max Buffer Size")
+        .description("The maximum amount of data to buffer in memory before 
sending to Kafka")
+        .required(true)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .defaultValue("1 MB")
+        .build();
     public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
-            .name("Communications Timeout")
-            .description("The amount of time to wait for a response from Kafka 
before determining that there is a communications error")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .defaultValue("30 secs")
-            .build();
+        .name("Communications Timeout")
+        .description("The amount of time to wait for a response from Kafka 
before determining that there is a communications error")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .defaultValue("30 secs")
+        .build();
     public static final PropertyDescriptor CLIENT_NAME = new 
PropertyDescriptor.Builder()
-            .name("Client Name")
-            .description("Client Name to use when communicating with Kafka")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .build();
+        .name("Client Name")
+        .description("Client Name to use when communicating with Kafka")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
     public static final PropertyDescriptor PRODUCER_TYPE = new 
PropertyDescriptor.Builder()
-            .name("Producer Type")
-            .description("This parameter specifies whether the messages are 
sent asynchronously in a background thread.")
-            .required(true)
-            .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, 
PRODUCTER_TYPE_ASYNCHRONOUS)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue())
-            .build();
-        public static final PropertyDescriptor BATCH_NUM_MESSAGES = new 
PropertyDescriptor.Builder()
-            .name("Async Batch Size")
-            .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
-                       + " The number of messages to send in one batch when 
using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode."
-                    + " The producer will wait until either this number of 
messages are ready"
-                    + " to send or \"Queue Buffering Max Time\" is reached.")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("200").build();
-        public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new 
PropertyDescriptor.Builder()
-            .name("Queue Buffering Max Time")
-            .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
-                       + " Maximum time to buffer data when using " + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 
100 ms"
-                    + " will try to batch together 100ms of messages to send 
at once. This will improve"
-                    + " throughput but adds message delivery latency due to 
the buffering.")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .defaultValue("5 secs").build();
-        public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = 
new PropertyDescriptor.Builder()
-            .name("Queue Buffer Max Count")
-            .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
-                       + " The maximum number of unsent messages that can be 
queued up in the producer when"
-                    + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() 
+ " mode before either the producer must be blocked or data must be dropped.")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("10000").build();
-        public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new 
PropertyDescriptor.Builder()
-            .name("Queue Enqueue Timeout")
-            .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
-                       + " The amount of time to block before dropping 
messages when running in "
-                       + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode"
-                    + " and the buffer has reached the \"Queue Buffer Max 
Count\". If set to 0, events will"
-                    + " be enqueued immediately or dropped if the queue is 
full (the producer send call will"
-                    + " never block). If not set, the producer will block 
indefinitely and never willingly"
-                    + " drop a send.")
-            .required(false)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
-        public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
-            .name("Compression Codec")
-            .description("This parameter allows you to specify the compression 
codec for all"
-                    + " data generated by this producer.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, 
COMPRESSION_CODEC_SNAPPY)
-            .defaultValue(COMPRESSION_CODEC_NONE.getValue()).build();
-        public static final PropertyDescriptor COMPRESSED_TOPICS = new 
PropertyDescriptor.Builder()
-            .name("Compressed Topics")
-            .description("This parameter allows you to set whether compression 
should be turned on"
-                    + " for particular topics. If the compression codec is 
anything other than"
-                    + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", 
enable compression only for specified topics if any."
-                    + " If the list of compressed topics is empty, then enable 
the specified"
-                    + " compression codec for all topics. If the compression 
codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + ","
-                    + " compression is disabled for all topics")
-            .required(false).build();
+        .name("Producer Type")
+        .description("This parameter specifies whether the messages are sent 
asynchronously in a background thread.")
+        .required(true)
+        .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, 
PRODUCTER_TYPE_ASYNCHRONOUS)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue())
+        .build();
+    public static final PropertyDescriptor BATCH_NUM_MESSAGES = new 
PropertyDescriptor.Builder()
+        .name("Async Batch Size")
+        .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+            + " The number of messages to send in one batch when using " + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode."
+            + " The producer will wait until either this number of messages 
are ready"
+            + " to send or \"Queue Buffering Max Time\" is reached.")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("200")
+        .build();
+    public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new 
PropertyDescriptor.Builder()
+        .name("Queue Buffering Max Time")
+        .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+            + " Maximum time to buffer data when using " + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 
100 ms"
+            + " will try to batch together 100ms of messages to send at once. 
This will improve"
+            + " throughput but adds message delivery latency due to the 
buffering.")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .defaultValue("5 secs")
+        .build();
+    public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new 
PropertyDescriptor.Builder()
+        .name("Queue Buffer Max Count")
+        .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+            + " The maximum number of unsent messages that can be queued up in 
the producer when"
+            + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " 
mode before either the producer must be blocked or data must be dropped.")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10000")
+        .build();
+    public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new 
PropertyDescriptor.Builder()
+        .name("Queue Enqueue Timeout")
+        .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+            + " The amount of time to block before dropping messages when 
running in "
+            + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode"
+            + " and the buffer has reached the \"Queue Buffer Max Count\". If 
set to 0, events will"
+            + " be enqueued immediately or dropped if the queue is full (the 
producer send call will"
+            + " never block). If not set, the producer will block indefinitely 
and never willingly"
+            + " drop a send.")
+        .required(false)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .build();
+    public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+        .name("Compression Codec")
+        .description("This parameter allows you to specify the compression 
codec for all"
+            + " data generated by this producer.")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, 
COMPRESSION_CODEC_SNAPPY)
+        .defaultValue(COMPRESSION_CODEC_NONE.getValue())
+        .build();
+    public static final PropertyDescriptor COMPRESSED_TOPICS = new 
PropertyDescriptor.Builder()
+        .name("Compressed Topics")
+        .description("This parameter allows you to set whether compression 
should be turned on"
+            + " for particular topics. If the compression codec is anything 
other than"
+            + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable 
compression only for specified topics if any."
+            + " If the list of compressed topics is empty, then enable the 
specified"
+            + " compression codec for all topics. If the compression codec is 
" + COMPRESSION_CODEC_NONE.getDisplayName() + ","
+            + " compression is disabled for all topics")
+        .required(false)
+        .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Any FlowFile that is successfully sent to Kafka will 
be routed to this Relationship")
-            .build();
+        .name("success")
+        .description("Any FlowFile that is successfully sent to Kafka will be 
routed to this Relationship")
+        .build();
     public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("Any FlowFile that cannot be sent to Kafka will be 
routed to this Relationship")
-            .build();
+        .name("failure")
+        .description("Any FlowFile that cannot be sent to Kafka will be routed 
to this Relationship")
+        .build();
 
     private final BlockingQueue<Producer<byte[], byte[]>> producers = new 
LinkedBlockingQueue<>();
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
-                .fromPropertyDescriptor(CLIENT_NAME)
-                .defaultValue("NiFi-" + getIdentifier())
-                .build();
+            .fromPropertyDescriptor(CLIENT_NAME)
+            .defaultValue("NiFi-" + getIdentifier())
+            .build();
 
         final List<PropertyDescriptor> props = new ArrayList<>();
         props.add(SEED_BROKERS);
@@ -269,13 +275,14 @@ public class PutKafka extends AbstractProcessor {
 
     @Override
     public Collection<ValidationResult> customValidate(final ValidationContext 
context) {
-       final List<ValidationResult> errors = new 
ArrayList<>(super.customValidate(context));
+        final List<ValidationResult> errors = new 
ArrayList<>(super.customValidate(context));
 
         final Integer batchMessages = 
context.getProperty(BATCH_NUM_MESSAGES).asInteger();
         final Integer bufferMaxMessages = 
context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger();
 
         if (batchMessages > bufferMaxMessages) {
-            errors.add(new ValidationResult.Builder().subject("Batch Size, 
Queue Buffer").valid(false).explanation("Batch Size (" + batchMessages + ") 
must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages 
+ ")").build());
+            errors.add(new ValidationResult.Builder().subject("Batch Size, 
Queue Buffer").valid(false)
+                .explanation("Batch Size (" + batchMessages + ") must be equal 
to or less than the Queue Buffer Max Count (" + bufferMaxMessages + 
")").build());
         }
 
         return errors;
@@ -311,23 +318,23 @@ public class PutKafka extends AbstractProcessor {
         properties.setProperty("producer.type", 
context.getProperty(PRODUCER_TYPE).getValue());
         properties.setProperty("batch.num.messages", 
context.getProperty(BATCH_NUM_MESSAGES).getValue());
 
-        Long queueBufferingMillis = 
context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
-        if(queueBufferingMillis != null) {
-               properties.setProperty("queue.buffering.max.ms", 
String.valueOf(queueBufferingMillis));
+        final Long queueBufferingMillis = 
context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
+        if (queueBufferingMillis != null) {
+            properties.setProperty("queue.buffering.max.ms", 
String.valueOf(queueBufferingMillis));
         }
         properties.setProperty("queue.buffering.max.messages", 
context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue());
 
-        Long queueEnqueueTimeoutMillis = 
context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
-        if(queueEnqueueTimeoutMillis != null) {
-               properties.setProperty("queue.enqueue.timeout.ms", 
String.valueOf(queueEnqueueTimeoutMillis));
+        final Long queueEnqueueTimeoutMillis = 
context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+        if (queueEnqueueTimeoutMillis != null) {
+            properties.setProperty("queue.enqueue.timeout.ms", 
String.valueOf(queueEnqueueTimeoutMillis));
         }
 
-        String compressionCodec = 
context.getProperty(COMPRESSION_CODEC).getValue();
+        final String compressionCodec = 
context.getProperty(COMPRESSION_CODEC).getValue();
         properties.setProperty("compression.codec", compressionCodec);
-        
-        String compressedTopics = 
context.getProperty(COMPRESSED_TOPICS).getValue();
-        if(compressedTopics != null) {
-               properties.setProperty("compressed.topics", compressedTopics);
+
+        final String compressedTopics = 
context.getProperty(COMPRESSED_TOPICS).getValue();
+        if (compressedTopics != null) {
+            properties.setProperty("compressed.topics", compressedTopics);
         }
 
         return new ProducerConfig(properties);
@@ -338,7 +345,7 @@ public class PutKafka extends AbstractProcessor {
     }
 
     private Producer<byte[], byte[]> borrowProducer(final ProcessContext 
context) {
-        Producer<byte[], byte[]> producer = producers.poll();
+        final Producer<byte[], byte[]> producer = producers.poll();
         return producer == null ? createProducer(context) : producer;
     }
 
@@ -348,7 +355,7 @@ public class PutKafka extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        FlowFile flowFile = session.get();
+        final FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
         }
@@ -356,7 +363,7 @@ public class PutKafka extends AbstractProcessor {
         final long start = System.nanoTime();
         final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
         final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-        final byte[] keyBytes = (key == null) ? null : 
key.getBytes(StandardCharsets.UTF_8);
+        final byte[] keyBytes = key == null ? null : 
key.getBytes(StandardCharsets.UTF_8);
         String delimiter = 
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
         if (delimiter != null) {
             delimiter = delimiter.replace("\\n", "\n").replace("\\r", 
"\r").replace("\\t", "\t");
@@ -389,9 +396,9 @@ public class PutKafka extends AbstractProcessor {
 
                 session.getProvenanceReporter().send(flowFile, "kafka://" + 
topic);
                 session.transfer(flowFile, REL_SUCCESS);
-                getLogger().info("Successfully sent {} to Kafka in {} millis", 
new Object[]{flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+                getLogger().info("Successfully sent {} to Kafka in {} millis", 
new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) });
             } catch (final Exception e) {
-                getLogger().error("Failed to send {} to Kafka due to {}; 
routing to failure", new Object[]{flowFile, e});
+                getLogger().error("Failed to send {} to Kafka due to {}; 
routing to failure", new Object[] { flowFile, e });
                 session.transfer(flowFile, REL_FAILURE);
                 error = true;
             } finally {
@@ -426,7 +433,7 @@ public class PutKafka extends AbstractProcessor {
 
                         int nextByte;
                         try (final InputStream bufferedIn = new 
BufferedInputStream(rawIn);
-                                final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
+                            final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
 
                             // read until we're out of data.
                             while (!streamFinished) {
@@ -514,7 +521,7 @@ public class PutKafka extends AbstractProcessor {
                 final long nanos = System.nanoTime() - start;
                 session.getProvenanceReporter().send(flowFile, "kafka://" + 
topic, "Sent " + messagesSent.get() + " messages");
                 session.transfer(flowFile, REL_SUCCESS);
-                getLogger().info("Successfully sent {} messages to Kafka for 
{} in {} millis", new Object[]{messagesSent.get(), flowFile, 
TimeUnit.NANOSECONDS.toMillis(nanos)});
+                getLogger().info("Successfully sent {} messages to Kafka for 
{} in {} millis", new Object[] { messagesSent.get(), flowFile, 
TimeUnit.NANOSECONDS.toMillis(nanos) });
             } catch (final ProcessException pe) {
                 error = true;
 
@@ -524,7 +531,7 @@ public class PutKafka extends AbstractProcessor {
                 final long offset = lastMessageOffset.get();
                 if (offset == 0L) {
                     // all of the messages failed to send. Route FlowFile to 
failure
-                    getLogger().error("Failed to send {} to Kafka due to {}; 
routing to fialure", new Object[]{flowFile, pe.getCause()});
+                    getLogger().error("Failed to send {} to Kafka due to {}; 
routing to fialure", new Object[] { flowFile, pe.getCause() });
                     session.transfer(flowFile, REL_FAILURE);
                 } else {
                     // Some of the messages were sent successfully. We want to 
split off the successful messages from the failed messages.
@@ -532,8 +539,8 @@ public class PutKafka extends AbstractProcessor {
                     final FlowFile failedMessages = session.clone(flowFile, 
offset, flowFile.getSize() - offset);
 
                     getLogger().error("Successfully sent {} of the messages 
from {} but then failed to send the rest. Original FlowFile split into"
-                            + " two: {} routed to 'success', {} routed to 
'failure'. Failure was due to {}", new Object[]{
-                        messagesSent.get(), flowFile, successfulMessages, 
failedMessages, pe.getCause()});
+                        + " two: {} routed to 'success', {} routed to 
'failure'. Failure was due to {}", new Object[] {
+                        messagesSent.get(), flowFile, successfulMessages, 
failedMessages, pe.getCause() });
 
                     session.transfer(successfulMessages, REL_SUCCESS);
                     session.transfer(failedMessages, REL_FAILURE);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f2f90560/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 5d1eacf..750d406 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -212,7 +212,7 @@ public class TestPutKafka {
     }
 
     private void keyValuePutExecute(final TestRunner runner) {
-               final Map<String, String> attributes = new HashMap<>();
+        final Map<String, String> attributes = new HashMap<>();
         attributes.put("kafka.topic", "test");
         attributes.put("kafka.key", "key3");
 
@@ -229,32 +229,32 @@ public class TestPutKafka {
         final MockFlowFile mff = mffs.get(0);
 
         assertTrue(Arrays.equals(data, mff.toByteArray()));
-       }
+    }
 
     @Test
     public void testProducerConfigDefault() {
 
-       final TestableProcessor processor = new TestableProcessor();
-       TestRunner runner = TestRunners.newTestRunner(processor);
+        final TestableProcessor processor = new TestableProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(processor);
 
-       runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.TOPIC, "topic1");
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
 
-        ProcessContext context = runner.getProcessContext();
-        ProducerConfig config = processor.createConfig(context);
+        final ProcessContext context = runner.getProcessContext();
+        final ProducerConfig config = processor.createConfig(context);
 
         // Check the codec
-        CompressionCodec codec = config.compressionCodec();
+        final CompressionCodec codec = config.compressionCodec();
         assertTrue(codec instanceof kafka.message.NoCompressionCodec$);
 
         // Check compressed topics
-        Seq<String> compressedTopics = config.compressedTopics();
+        final Seq<String> compressedTopics = config.compressedTopics();
         assertEquals(0, compressedTopics.size());
 
         // Check the producer type
-        String actualProducerType = config.producerType();
+        final String actualProducerType = config.producerType();
         assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), 
actualProducerType);
 
     }
@@ -262,10 +262,10 @@ public class TestPutKafka {
     @Test
     public void testProducerConfigAsyncWithCompression() {
 
-       final TestableProcessor processor = new TestableProcessor();
-       TestRunner runner = TestRunners.newTestRunner(processor);
+        final TestableProcessor processor = new TestableProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(processor);
 
-       runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.TOPIC, "topic1");
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
@@ -273,33 +273,33 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.COMPRESSION_CODEC, 
PutKafka.COMPRESSION_CODEC_SNAPPY.getValue());
         runner.setProperty(PutKafka.COMPRESSED_TOPICS, 
"topic01,topic02,topic03");
 
-        ProcessContext context = runner.getProcessContext();
-        ProducerConfig config = processor.createConfig(context);
+        final ProcessContext context = runner.getProcessContext();
+        final ProducerConfig config = processor.createConfig(context);
 
         // Check that the codec is snappy
-        CompressionCodec codec = config.compressionCodec();
+        final CompressionCodec codec = config.compressionCodec();
         assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$);
 
         // Check compressed topics
-        Seq<String> compressedTopics = config.compressedTopics();
+        final Seq<String> compressedTopics = config.compressedTopics();
         assertEquals(3, compressedTopics.size());
         assertTrue(compressedTopics.contains("topic01"));
         assertTrue(compressedTopics.contains("topic02"));
         assertTrue(compressedTopics.contains("topic03"));
 
         // Check the producer type
-        String actualProducerType = config.producerType();
+        final String actualProducerType = config.producerType();
         assertEquals("async", actualProducerType);
 
     }
-    
+
     @Test
     public void testProducerConfigAsyncQueueThresholds() {
 
-       final TestableProcessor processor = new TestableProcessor();
-       TestRunner runner = TestRunners.newTestRunner(processor);
+        final TestableProcessor processor = new TestableProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(processor);
 
-       runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.TOPIC, "topic1");
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
@@ -308,27 +308,27 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535");
         runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms");
 
-        ProcessContext context = runner.getProcessContext();
-        ProducerConfig config = processor.createConfig(context);
+        final ProcessContext context = runner.getProcessContext();
+        final ProducerConfig config = processor.createConfig(context);
 
         // Check that the queue thresholds were properly translated
         assertEquals(7000, config.queueBufferingMaxMs());
         assertEquals(535, config.queueBufferingMaxMessages());
         assertEquals(200, config.queueEnqueueTimeoutMs());
-        
+
         // Check the producer type
-        String actualProducerType = config.producerType();
+        final String actualProducerType = config.producerType();
         assertEquals("async", actualProducerType);
 
     }
-    
+
     @Test
     public void testProducerConfigInvalidBatchSize() {
 
-       final TestableProcessor processor = new TestableProcessor();
-       TestRunner runner = TestRunners.newTestRunner(processor);
+        final TestableProcessor processor = new TestableProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(processor);
 
-       runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.TOPIC, "topic1");
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
@@ -339,28 +339,28 @@ public class TestPutKafka {
         runner.assertNotValid();
 
     }
-    
+
     @Test
     public void testProducerConfigAsyncDefaultEnqueueTimeout() {
 
-       final TestableProcessor processor = new TestableProcessor();
-       TestRunner runner = TestRunners.newTestRunner(processor);
+        final TestableProcessor processor = new TestableProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(processor);
 
-       runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.TOPIC, "topic1");
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
         runner.setProperty(PutKafka.PRODUCER_TYPE, 
PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
         // Do not set QUEUE_ENQUEUE_TIMEOUT
 
-        ProcessContext context = runner.getProcessContext();
-        ProducerConfig config = processor.createConfig(context);
+        final ProcessContext context = runner.getProcessContext();
+        final ProducerConfig config = processor.createConfig(context);
 
         // Check that the enqueue timeout defaults to -1
         assertEquals(-1, config.queueEnqueueTimeoutMs());
 
         // Check the producer type
-        String actualProducerType = config.producerType();
+        final String actualProducerType = config.producerType();
         assertEquals("async", actualProducerType);
 
     }
@@ -391,12 +391,13 @@ public class TestPutKafka {
         public MockProducer getProducer() {
             return producer;
         }
-        
+
         /**
          * Exposed for test verification
          */
+        @Override
         public ProducerConfig createConfig(final ProcessContext context) {
-               return super.createConfig(context);
+            return super.createConfig(context);
         }
     }
 

Reply via email to