[NIFI-413] Adding properties to PutKafka to support asynchronous production 
with configurable batching. Also added user-defined control over compression 
codec and compressed topics. Producer type remains synchronous by default.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/9653770a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/9653770a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/9653770a

Branch: refs/heads/develop
Commit: 9653770ac4a050f4439c07f7ae6e34658f08e5a0
Parents: 421ad8f
Author: Brian Ghigiarelli <briang...@gmail.com>
Authored: Thu May 14 08:55:04 2015 -0400
Committer: Brian Ghigiarelli <briang...@gmail.com>
Committed: Thu May 14 08:55:04 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/PutKafka.java  | 81 ++++++++++++++++-
 .../nifi/processors/kafka/TestPutKafka.java     | 93 +++++++++++++++++++-
 2 files changed, 171 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9653770a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 44b6584..5bd0d2b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -136,6 +136,68 @@ public class PutKafka extends AbstractProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(false)
             .build();
+    public static final PropertyDescriptor PRODUCER_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Producer Type")
+            .description("This parameter specifies whether the messages are 
sent asynchronously in a background thread."
+                    + " Valid values are (1) async for asynchronous send and 
(2) sync for synchronous send."
+                    + " By setting the producer to async we allow batching 
together of requests (which is great for throughput)"
+                    + " but open the possibility of a failure of the client 
machine dropping unsent data.")
+            .required(true)
+            .allowableValues("sync", "async")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("sync")
+            .build();
+        public static final PropertyDescriptor BATCH_NUM_MESSAGES = new 
PropertyDescriptor.Builder()
+            .name("Async Message Batch Size (batch.num.messages)")
+            .description("Used only if Producer Type is set to \"async\". The 
number of messages to send in one batch when using async mode."
+                    + " The producer will wait until either this number of 
messages are ready"
+                    + " to send or queue.buffer.max.ms is reached.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("200").build();
+        public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MS = new 
PropertyDescriptor.Builder()
+            .name("Queue Buffering Max Time (queue.buffering.max.ms)")
+            .description("Used only if Producer Type is set to \"async\". 
Maximum time to buffer data when using async mode. For example a setting of 100"
+                    + " will try to batch together 100ms of messages to send 
at once. This will improve"
+                    + " throughput but adds message delivery latency due to 
the buffering.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("5000").build();
+        public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = 
new PropertyDescriptor.Builder()
+            .name("Queue Buffer Max Count (queue.buffering.max.messages)")
+            .description("Used only if Producer Type is set to \"async\". The 
maximum number of unsent messages that can be queued up the producer when"
+                    + " using async mode before either the producer must be 
blocked or data must be dropped.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10000").build();
+        public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT_MS = new 
PropertyDescriptor.Builder()
+            .name("Queue Enqueue Timeout (queue.enqueue.timeout.ms)")
+            .description("Used only if Producer Type is set to \"async\". The 
amount of time to block before dropping messages when running in async mode"
+                    + " and the buffer has reached 
queue.buffering.max.messages. If set to 0 events will"
+                    + " be enqueued immediately or dropped if the queue is 
full (the producer send call will"
+                    + " never block). If set to -1 the producer will block 
indefinitely and never willingly"
+                    + " drop a send.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("-1").build();
+        public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+            .name("Compression Codec (compression.codec)")
+            .description("This parameter allows you to specify the compression 
codec for all"
+                    + " data generated by this producer. Valid values are 
\"none\", \"gzip\" and \"snappy\".")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues("none", "gzip", "snappy")
+            .defaultValue("none").build();
+        public static final PropertyDescriptor COMPRESSED_TOPICS = new 
PropertyDescriptor.Builder()
+            .name("Compressed Topics (compressed.topics)")
+            .description("This parameter allows you to set whether compression 
should be turned on"
+                    + " for particular topics. If the compression codec is 
anything other than"
+                    + " NoCompressionCodec, enable compression only for 
specified topics if any."
+                    + " If the list of compressed topics is empty, then enable 
the specified"
+                    + " compression codec for all topics. If the compression 
codec is NoCompressionCodec,"
+                    + " compression is disabled for all topics")
+            .required(false).build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -163,6 +225,13 @@ public class PutKafka extends AbstractProcessor {
         props.add(MESSAGE_DELIMITER);
         props.add(MAX_BUFFER_SIZE);
         props.add(TIMEOUT);
+        props.add(PRODUCER_TYPE);
+        props.add(BATCH_NUM_MESSAGES);
+        props.add(QUEUE_BUFFERING_MAX_MESSAGES);
+        props.add(QUEUE_BUFFERING_MAX_MS);
+        props.add(QUEUE_ENQUEUE_TIMEOUT_MS);
+        props.add(COMPRESSION_CODEC);
+        props.add(COMPRESSED_TOPICS);
         props.add(clientName);
         return props;
     }
@@ -194,7 +263,17 @@ public class PutKafka extends AbstractProcessor {
         properties.setProperty("request.timeout.ms", 
String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
 
         properties.setProperty("message.send.max.retries", "1");
-        properties.setProperty("producer.type", "sync");
+        properties.setProperty("producer.type", 
context.getProperty(PRODUCER_TYPE).getValue());
+        properties.setProperty("batch.num.messages", 
context.getProperty(BATCH_NUM_MESSAGES).getValue());
+        properties.setProperty("queue.buffering.max.ms", 
context.getProperty(QUEUE_BUFFERING_MAX_MS).getValue());
+        properties.setProperty("queue.buffering.max.messages", 
context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue());
+        properties.setProperty("queue.enqueue.timeout.ms", 
context.getProperty(QUEUE_ENQUEUE_TIMEOUT_MS).getValue());
+        properties.setProperty("compression.codec", 
context.getProperty(COMPRESSION_CODEC).getValue());
+        
+        String compressedTopics = 
context.getProperty(COMPRESSED_TOPICS).getValue();
+        if(compressedTopics != null) {
+               properties.setProperty("compressed.topics", compressedTopics);
+        }
 
         return new ProducerConfig(properties);
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9653770a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 9500e29..dd6b309 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -29,11 +29,11 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import kafka.common.FailedToSendMessageException;
 import kafka.javaapi.producer.Producer;
+import kafka.message.CompressionCodec;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.util.MockFlowFile;
@@ -49,6 +49,8 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 
+import scala.collection.Seq;
+
 public class TestPutKafka {
 
     @Test
@@ -217,7 +219,25 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.TIMEOUT, "3 secs");
         runner.setProperty(PutKafka.DELIVERY_GUARANTEE, 
PutKafka.DELIVERY_REPLICATED.getValue());
 
-        final Map<String, String> attributes = new HashMap<>();
+        keyValuePutExecute(runner);
+    }
+
+    @Test
+    @Ignore("Intended only for local testing; requires an actual running 
instance of Kafka & ZooKeeper...")
+    public void testKeyValuePutAsync() {
+        final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
+        runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
+        runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
+        runner.setProperty(PutKafka.KEY, "${kafka.key}");
+        runner.setProperty(PutKafka.TIMEOUT, "3 secs");
+        runner.setProperty(PutKafka.PRODUCER_TYPE, "async");
+        runner.setProperty(PutKafka.DELIVERY_GUARANTEE, 
PutKafka.DELIVERY_REPLICATED.getValue());
+
+        keyValuePutExecute(runner);
+    }
+
+    private void keyValuePutExecute(final TestRunner runner) {
+               final Map<String, String> attributes = new HashMap<>();
         attributes.put("kafka.topic", "test");
         attributes.put("kafka.key", "key3");
 
@@ -234,6 +254,68 @@ public class TestPutKafka {
         final MockFlowFile mff = mffs.get(0);
 
         assertTrue(Arrays.equals(data, mff.toByteArray()));
+       }
+
+    @Test
+    public void testProducerConfigDefault() {
+
+       final TestableProcessor processor = new TestableProcessor();
+       TestRunner runner = TestRunners.newTestRunner(processor);
+
+       runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+
+        ProcessContext context = runner.getProcessContext();
+        ProducerConfig config = processor.createConfig(context);
+
+        // Check the codec
+        CompressionCodec codec = config.compressionCodec();
+        assertTrue(codec instanceof kafka.message.NoCompressionCodec$);
+
+        // Check compressed topics
+        Seq<String> compressedTopics = config.compressedTopics();
+        assertEquals(0, compressedTopics.size());
+
+        // Check the producer type
+        String actualProducerType = config.producerType();
+        assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), 
actualProducerType);
+
+    }
+
+    @Test
+    public void testProducerConfigAsyncWithCompression() {
+
+       final TestableProcessor processor = new TestableProcessor();
+       TestRunner runner = TestRunners.newTestRunner(processor);
+
+       runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        runner.setProperty(PutKafka.PRODUCER_TYPE, "async");
+        runner.setProperty(PutKafka.COMPRESSION_CODEC, "snappy");
+        runner.setProperty(PutKafka.COMPRESSED_TOPICS, 
"topic01,topic02,topic03");
+
+        ProcessContext context = runner.getProcessContext();
+        ProducerConfig config = processor.createConfig(context);
+
+        // Check that the codec is snappy
+        CompressionCodec codec = config.compressionCodec();
+        assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$);
+
+        // Check compressed topics
+        Seq<String> compressedTopics = config.compressedTopics();
+        assertEquals(3, compressedTopics.size());
+        assertTrue(compressedTopics.contains("topic01"));
+        assertTrue(compressedTopics.contains("topic02"));
+        assertTrue(compressedTopics.contains("topic03"));
+
+        // Check the producer type
+        String actualProducerType = config.producerType();
+        assertEquals("async", actualProducerType);
+
     }
 
     private static class TestableProcessor extends PutKafka {
@@ -262,6 +344,13 @@ public class TestPutKafka {
         public MockProducer getProducer() {
             return producer;
         }
+        
+        /**
+         * Exposed for test verification
+         */
+        public ProducerConfig createConfig(final ProcessContext context) {
+               return super.createConfig(context);
+        }
     }
 
     private static class MockProducer extends Producer<byte[], byte[]> {

Reply via email to