[NIFI-413] Updating PutKafka properties to follow NiFi standards. Added 
validation for asynchronous properties.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8af84f3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8af84f3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8af84f3f

Branch: refs/heads/develop
Commit: 8af84f3f73bd7890c44c1f3b7597330023e34d16
Parents: 5b0648c
Author: Brian Ghigiarelli <briang...@gmail.com>
Authored: Fri Jun 19 18:49:21 2015 -0400
Committer: Brian Ghigiarelli <briang...@gmail.com>
Committed: Fri Jun 19 18:49:21 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/PutKafka.java  | 129 +++++++++++++------
 .../nifi/processors/kafka/TestPutKafka.java     |  78 ++++++++++-
 2 files changed, 167 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8af84f3f/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 5bd0d2b..e572622 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
@@ -39,6 +40,8 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
@@ -74,6 +77,32 @@ public class PutKafka extends AbstractProcessor {
             + " successfully writing the content to a Kafka node, without 
waiting for a response. This provides the best performance but may result"
             + " in data loss.");
 
+    /**
+     * AllowableValue for a Producer Type that synchronously sends messages to 
Kafka
+     */
+    public static final AllowableValue PRODUCTER_TYPE_SYNCHRONOUS = new 
AllowableValue("sync", "Synchronous", "Send FlowFiles to Kafka immediately.");
+
+    /**
+     * AllowableValue for a Producer Type that asynchronously sends messages 
to Kafka
+     */
+    public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new 
AllowableValue("async", "Asynchronous", "Batch messages before sending them to 
Kafka."
+           + " While this will improve throughput, it opens the possibility 
that a failure on the client machine will drop unsent data.");
+
+    /**
+     * AllowableValue for sending messages to Kafka without compression
+     */
+    public static final AllowableValue COMPRESSION_CODEC_NONE = new 
AllowableValue("none", "None", "Compression will not be used for any topic.");
+
+    /**
+     * AllowableValue for sending messages to Kafka with GZIP compression
+     */
+    public static final AllowableValue COMPRESSION_CODEC_GZIP = new 
AllowableValue("gzip", "GZIP", "Compress messages using GZIP");
+
+    /**
+     * AllowableValue for sending messages to Kafka with Snappy compression
+     */
+    public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new 
AllowableValue("snappy", "Snappy", "Compress messages using Snappy");
+
     public static final PropertyDescriptor SEED_BROKERS = new 
PropertyDescriptor.Builder()
             .name("Known Brokers")
             .description("A comma-separated list of known Kafka Brokers in the 
format <host>:<port>")
@@ -138,64 +167,66 @@ public class PutKafka extends AbstractProcessor {
             .build();
     public static final PropertyDescriptor PRODUCER_TYPE = new 
PropertyDescriptor.Builder()
             .name("Producer Type")
-            .description("This parameter specifies whether the messages are 
sent asynchronously in a background thread."
-                    + " Valid values are (1) async for asynchronous send and 
(2) sync for synchronous send."
-                    + " By setting the producer to async we allow batching 
together of requests (which is great for throughput)"
-                    + " but open the possibility of a failure of the client 
machine dropping unsent data.")
+            .description("This parameter specifies whether the messages are 
sent asynchronously in a background thread.")
             .required(true)
-            .allowableValues("sync", "async")
+            .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, 
PRODUCTER_TYPE_ASYNCHRONOUS)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(false)
-            .defaultValue("sync")
+            .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue())
             .build();
         public static final PropertyDescriptor BATCH_NUM_MESSAGES = new 
PropertyDescriptor.Builder()
-            .name("Async Message Batch Size (batch.num.messages)")
-            .description("Used only if Producer Type is set to \"async\". The 
number of messages to send in one batch when using async mode."
+            .name("Async Batch Size")
+            .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+                       + " The number of messages to send in one batch when 
using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode."
                     + " The producer will wait until either this number of 
messages are ready"
-                    + " to send or queue.buffer.max.ms is reached.")
+                    + " to send or \"Queue Buffering Max Time\" is reached.")
             .required(true)
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .defaultValue("200").build();
-        public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MS = new 
PropertyDescriptor.Builder()
-            .name("Queue Buffering Max Time (queue.buffering.max.ms)")
-            .description("Used only if Producer Type is set to \"async\". 
Maximum time to buffer data when using async mode. For example a setting of 100"
+        public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new 
PropertyDescriptor.Builder()
+            .name("Queue Buffering Max Time")
+            .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+                       + " Maximum time to buffer data when using " + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 
100 ms"
                     + " will try to batch together 100ms of messages to send 
at once. This will improve"
                     + " throughput but adds message delivery latency due to 
the buffering.")
             .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("5000").build();
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("5 secs").build();
         public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = 
new PropertyDescriptor.Builder()
-            .name("Queue Buffer Max Count (queue.buffering.max.messages)")
-            .description("Used only if Producer Type is set to \"async\". The 
maximum number of unsent messages that can be queued up the producer when"
-                    + " using async mode before either the producer must be 
blocked or data must be dropped.")
+            .name("Queue Buffer Max Count")
+            .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+                       + " The maximum number of unsent messages that can be 
queued up in the producer when"
+                    + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() 
+ " mode before either the producer must be blocked or data must be dropped.")
             .required(true)
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .defaultValue("10000").build();
-        public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT_MS = new 
PropertyDescriptor.Builder()
-            .name("Queue Enqueue Timeout (queue.enqueue.timeout.ms)")
-            .description("Used only if Producer Type is set to \"async\". The 
amount of time to block before dropping messages when running in async mode"
-                    + " and the buffer has reached 
queue.buffering.max.messages. If set to 0 events will"
+        public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Queue Enqueue Timeout")
+            .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+                       + " The amount of time to block before dropping 
messages when running in "
+                       + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode"
+                    + " and the buffer has reached the \"Queue Buffer Max 
Count\". If set to 0, events will"
                     + " be enqueued immediately or dropped if the queue is 
full (the producer send call will"
-                    + " never block). If set to -1 the producer will block 
indefinitely and never willingly"
+                    + " never block). If not set, the producer will block 
indefinitely and never willingly"
                     + " drop a send.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue("-1").build();
+            .required(false)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
         public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
-            .name("Compression Codec (compression.codec)")
+            .name("Compression Codec")
             .description("This parameter allows you to specify the compression 
codec for all"
-                    + " data generated by this producer. Valid values are 
\"none\", \"gzip\" and \"snappy\".")
+                    + " data generated by this producer.")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .allowableValues("none", "gzip", "snappy")
-            .defaultValue("none").build();
+            .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, 
COMPRESSION_CODEC_SNAPPY)
+            .defaultValue(COMPRESSION_CODEC_NONE.getValue()).build();
         public static final PropertyDescriptor COMPRESSED_TOPICS = new 
PropertyDescriptor.Builder()
-            .name("Compressed Topics (compressed.topics)")
+            .name("Compressed Topics")
             .description("This parameter allows you to set whether compression 
should be turned on"
                     + " for particular topics. If the compression codec is 
anything other than"
-                    + " NoCompressionCodec, enable compression only for 
specified topics if any."
+                    + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", 
enable compression only for specified topics if any."
                     + " If the list of compressed topics is empty, then enable 
the specified"
-                    + " compression codec for all topics. If the compression 
codec is NoCompressionCodec,"
+                    + " compression codec for all topics. If the compression 
codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + ","
                     + " compression is disabled for all topics")
             .required(false).build();
 
@@ -228,8 +259,8 @@ public class PutKafka extends AbstractProcessor {
         props.add(PRODUCER_TYPE);
         props.add(BATCH_NUM_MESSAGES);
         props.add(QUEUE_BUFFERING_MAX_MESSAGES);
-        props.add(QUEUE_BUFFERING_MAX_MS);
-        props.add(QUEUE_ENQUEUE_TIMEOUT_MS);
+        props.add(QUEUE_BUFFERING_MAX);
+        props.add(QUEUE_ENQUEUE_TIMEOUT);
         props.add(COMPRESSION_CODEC);
         props.add(COMPRESSED_TOPICS);
         props.add(clientName);
@@ -237,6 +268,20 @@ public class PutKafka extends AbstractProcessor {
     }
 
     @Override
+    public Collection<ValidationResult> customValidate(final ValidationContext 
context) {
+       final List<ValidationResult> errors = new 
ArrayList<>(super.customValidate(context));
+
+        final Integer batchMessages = 
context.getProperty(BATCH_NUM_MESSAGES).asInteger();
+        final Integer bufferMaxMessages = 
context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger();
+
+        if (batchMessages > bufferMaxMessages) {
+            errors.add(new ValidationResult.Builder().subject("Batch Size, 
Queue Buffer").valid(false).explanation("Batch Size (" + batchMessages + ") 
must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages 
+ ")").build());
+        }
+
+        return errors;
+    }
+
+    @Override
     public Set<Relationship> getRelationships() {
         final Set<Relationship> relationships = new HashSet<>(1);
         relationships.add(REL_SUCCESS);
@@ -265,10 +310,20 @@ public class PutKafka extends AbstractProcessor {
         properties.setProperty("message.send.max.retries", "1");
         properties.setProperty("producer.type", 
context.getProperty(PRODUCER_TYPE).getValue());
         properties.setProperty("batch.num.messages", 
context.getProperty(BATCH_NUM_MESSAGES).getValue());
-        properties.setProperty("queue.buffering.max.ms", 
context.getProperty(QUEUE_BUFFERING_MAX_MS).getValue());
+
+        Long queueBufferingMillis = 
context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
+        if(queueBufferingMillis != null) {
+               properties.setProperty("queue.buffering.max.ms", 
String.valueOf(queueBufferingMillis));
+        }
         properties.setProperty("queue.buffering.max.messages", 
context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue());
-        properties.setProperty("queue.enqueue.timeout.ms", 
context.getProperty(QUEUE_ENQUEUE_TIMEOUT_MS).getValue());
-        properties.setProperty("compression.codec", 
context.getProperty(COMPRESSION_CODEC).getValue());
+
+        Long queueEnqueueTimeoutMillis = 
context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+        if(queueEnqueueTimeoutMillis != null) {
+               properties.setProperty("queue.enqueue.timeout.ms", 
String.valueOf(queueEnqueueTimeoutMillis));
+        }
+
+        String compressionCodec = 
context.getProperty(COMPRESSION_CODEC).getValue();
+        properties.setProperty("compression.codec", compressionCodec);
         
         String compressedTopics = 
context.getProperty(COMPRESSED_TOPICS).getValue();
         if(compressedTopics != null) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8af84f3f/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index ded0afa..5d1eacf 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -205,7 +205,7 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
         runner.setProperty(PutKafka.KEY, "${kafka.key}");
         runner.setProperty(PutKafka.TIMEOUT, "3 secs");
-        runner.setProperty(PutKafka.PRODUCER_TYPE, "async");
+        runner.setProperty(PutKafka.PRODUCER_TYPE, 
PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
         runner.setProperty(PutKafka.DELIVERY_GUARANTEE, 
PutKafka.DELIVERY_REPLICATED.getValue());
 
         keyValuePutExecute(runner);
@@ -269,8 +269,8 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-        runner.setProperty(PutKafka.PRODUCER_TYPE, "async");
-        runner.setProperty(PutKafka.COMPRESSION_CODEC, "snappy");
+        runner.setProperty(PutKafka.PRODUCER_TYPE, 
PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
+        runner.setProperty(PutKafka.COMPRESSION_CODEC, 
PutKafka.COMPRESSION_CODEC_SNAPPY.getValue());
         runner.setProperty(PutKafka.COMPRESSED_TOPICS, 
"topic01,topic02,topic03");
 
         ProcessContext context = runner.getProcessContext();
@@ -292,6 +292,78 @@ public class TestPutKafka {
         assertEquals("async", actualProducerType);
 
     }
+    
+    @Test
+    public void testProducerConfigAsyncQueueThresholds() {
+
+       final TestableProcessor processor = new TestableProcessor();
+       TestRunner runner = TestRunners.newTestRunner(processor);
+
+       runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        runner.setProperty(PutKafka.PRODUCER_TYPE, 
PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
+        runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX, "7 secs");
+        runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535");
+        runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms");
+
+        ProcessContext context = runner.getProcessContext();
+        ProducerConfig config = processor.createConfig(context);
+
+        // Check that the queue thresholds were properly translated
+        assertEquals(7000, config.queueBufferingMaxMs());
+        assertEquals(535, config.queueBufferingMaxMessages());
+        assertEquals(200, config.queueEnqueueTimeoutMs());
+        
+        // Check the producer type
+        String actualProducerType = config.producerType();
+        assertEquals("async", actualProducerType);
+
+    }
+    
+    @Test
+    public void testProducerConfigInvalidBatchSize() {
+
+       final TestableProcessor processor = new TestableProcessor();
+       TestRunner runner = TestRunners.newTestRunner(processor);
+
+       runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        runner.setProperty(PutKafka.PRODUCER_TYPE, 
PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
+        runner.setProperty(PutKafka.BATCH_NUM_MESSAGES, "200");
+        runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "100");
+
+        runner.assertNotValid();
+
+    }
+    
+    @Test
+    public void testProducerConfigAsyncDefaultEnqueueTimeout() {
+
+       final TestableProcessor processor = new TestableProcessor();
+       TestRunner runner = TestRunners.newTestRunner(processor);
+
+       runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        runner.setProperty(PutKafka.PRODUCER_TYPE, 
PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
+        // Do not set QUEUE_ENQUEUE_TIMEOUT
+
+        ProcessContext context = runner.getProcessContext();
+        ProducerConfig config = processor.createConfig(context);
+
+        // Check that the enqueue timeout defaults to -1
+        assertEquals(-1, config.queueEnqueueTimeoutMs());
+
+        // Check the producer type
+        String actualProducerType = config.producerType();
+        assertEquals("async", actualProducerType);
+
+    }
 
     private static class TestableProcessor extends PutKafka {
 

Reply via email to