[NIFI-413] Adding properties to PutKafka to support asynchronous production with configurable batching. Also added user-defined control over compression codec and compressed topics. Producer type remains synchronous by default.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/9653770a Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/9653770a Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/9653770a Branch: refs/heads/develop Commit: 9653770ac4a050f4439c07f7ae6e34658f08e5a0 Parents: 421ad8f Author: Brian Ghigiarelli <briang...@gmail.com> Authored: Thu May 14 08:55:04 2015 -0400 Committer: Brian Ghigiarelli <briang...@gmail.com> Committed: Thu May 14 08:55:04 2015 -0400 ---------------------------------------------------------------------- .../apache/nifi/processors/kafka/PutKafka.java | 81 ++++++++++++++++- .../nifi/processors/kafka/TestPutKafka.java | 93 +++++++++++++++++++- 2 files changed, 171 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9653770a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 44b6584..5bd0d2b 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -136,6 +136,68 @@ public class PutKafka extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) .build(); + public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder() + .name("Producer Type") + .description("This parameter specifies whether the messages are sent asynchronously in a background thread." + + " Valid values are (1) async for asynchronous send and (2) sync for synchronous send." + + " By setting the producer to async we allow batching together of requests (which is great for throughput)" + + " but open the possibility of a failure of the client machine dropping unsent data.") + .required(true) + .allowableValues("sync", "async") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("sync") + .build(); + public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() + .name("Async Message Batch Size (batch.num.messages)") + .description("Used only if Producer Type is set to \"async\". The number of messages to send in one batch when using async mode." + + " The producer will wait until either this number of messages are ready" + + " to send or queue.buffer.max.ms is reached.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("200").build(); + public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MS = new PropertyDescriptor.Builder() + .name("Queue Buffering Max Time (queue.buffering.max.ms)") + .description("Used only if Producer Type is set to \"async\". Maximum time to buffer data when using async mode. For example a setting of 100" + + " will try to batch together 100ms of messages to send at once. This will improve" + + " throughput but adds message delivery latency due to the buffering.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("5000").build(); + public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder() + .name("Queue Buffer Max Count (queue.buffering.max.messages)") + .description("Used only if Producer Type is set to \"async\". The maximum number of unsent messages that can be queued up the producer when" + + " using async mode before either the producer must be blocked or data must be dropped.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000").build(); + public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT_MS = new PropertyDescriptor.Builder() + .name("Queue Enqueue Timeout (queue.enqueue.timeout.ms)") + .description("Used only if Producer Type is set to \"async\". The amount of time to block before dropping messages when running in async mode" + + " and the buffer has reached queue.buffering.max.messages. If set to 0 events will" + + " be enqueued immediately or dropped if the queue is full (the producer send call will" + + " never block). If set to -1 the producer will block indefinitely and never willingly" + + " drop a send.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("-1").build(); + public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("Compression Codec (compression.codec)") + .description("This parameter allows you to specify the compression codec for all" + + " data generated by this producer. Valid values are \"none\", \"gzip\" and \"snappy\".") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy") + .defaultValue("none").build(); + public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder() + .name("Compressed Topics (compressed.topics)") + .description("This parameter allows you to set whether compression should be turned on" + + " for particular topics. If the compression codec is anything other than" + + " NoCompressionCodec, enable compression only for specified topics if any." + + " If the list of compressed topics is empty, then enable the specified" + + " compression codec for all topics. If the compression codec is NoCompressionCodec," + + " compression is disabled for all topics") + .required(false).build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -163,6 +225,13 @@ public class PutKafka extends AbstractProcessor { props.add(MESSAGE_DELIMITER); props.add(MAX_BUFFER_SIZE); props.add(TIMEOUT); + props.add(PRODUCER_TYPE); + props.add(BATCH_NUM_MESSAGES); + props.add(QUEUE_BUFFERING_MAX_MESSAGES); + props.add(QUEUE_BUFFERING_MAX_MS); + props.add(QUEUE_ENQUEUE_TIMEOUT_MS); + props.add(COMPRESSION_CODEC); + props.add(COMPRESSED_TOPICS); props.add(clientName); return props; } @@ -194,7 +263,17 @@ public class PutKafka extends AbstractProcessor { properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue())); properties.setProperty("message.send.max.retries", "1"); - properties.setProperty("producer.type", "sync"); + properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue()); + properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue()); + properties.setProperty("queue.buffering.max.ms", context.getProperty(QUEUE_BUFFERING_MAX_MS).getValue()); + properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue()); + properties.setProperty("queue.enqueue.timeout.ms", context.getProperty(QUEUE_ENQUEUE_TIMEOUT_MS).getValue()); + properties.setProperty("compression.codec", context.getProperty(COMPRESSION_CODEC).getValue()); + + String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue(); + if(compressedTopics != null) { + properties.setProperty("compressed.topics", compressedTopics); + } return new ProducerConfig(properties); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9653770a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 9500e29..dd6b309 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -29,11 +29,11 @@ import java.util.concurrent.atomic.AtomicLong; import kafka.common.FailedToSendMessageException; import kafka.javaapi.producer.Producer; +import kafka.message.CompressionCodec; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.nifi.annotation.lifecycle.OnScheduled; - import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.util.MockFlowFile; @@ -49,6 +49,8 @@ import org.junit.Test; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; +import scala.collection.Seq; + public class TestPutKafka { @Test @@ -217,7 +219,25 @@ public class TestPutKafka { runner.setProperty(PutKafka.TIMEOUT, "3 secs"); runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); - final Map<String, String> attributes = new HashMap<>(); + keyValuePutExecute(runner); + } + + @Test + @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...") + public void testKeyValuePutAsync() { + final TestRunner runner = TestRunners.newTestRunner(PutKafka.class); + runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092"); + runner.setProperty(PutKafka.TOPIC, "${kafka.topic}"); + runner.setProperty(PutKafka.KEY, "${kafka.key}"); + runner.setProperty(PutKafka.TIMEOUT, "3 secs"); + runner.setProperty(PutKafka.PRODUCER_TYPE, "async"); + runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); + + keyValuePutExecute(runner); + } + + private void keyValuePutExecute(final TestRunner runner) { + final Map<String, String> attributes = new HashMap<>(); attributes.put("kafka.topic", "test"); attributes.put("kafka.key", "key3"); @@ -234,6 +254,68 @@ public class TestPutKafka { final MockFlowFile mff = mffs.get(0); assertTrue(Arrays.equals(data, mff.toByteArray())); + } + + @Test + public void testProducerConfigDefault() { + + final TestableProcessor processor = new TestableProcessor(); + TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + + ProcessContext context = runner.getProcessContext(); + ProducerConfig config = processor.createConfig(context); + + // Check the codec + CompressionCodec codec = config.compressionCodec(); + assertTrue(codec instanceof kafka.message.NoCompressionCodec$); + + // Check compressed topics + Seq<String> compressedTopics = config.compressedTopics(); + assertEquals(0, compressedTopics.size()); + + // Check the producer type + String actualProducerType = config.producerType(); + assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), actualProducerType); + + } + + @Test + public void testProducerConfigAsyncWithCompression() { + + final TestableProcessor processor = new TestableProcessor(); + TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.PRODUCER_TYPE, "async"); + runner.setProperty(PutKafka.COMPRESSION_CODEC, "snappy"); + runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03"); + + ProcessContext context = runner.getProcessContext(); + ProducerConfig config = processor.createConfig(context); + + // Check that the codec is snappy + CompressionCodec codec = config.compressionCodec(); + assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$); + + // Check compressed topics + Seq<String> compressedTopics = config.compressedTopics(); + assertEquals(3, compressedTopics.size()); + assertTrue(compressedTopics.contains("topic01")); + assertTrue(compressedTopics.contains("topic02")); + assertTrue(compressedTopics.contains("topic03")); + + // Check the producer type + String actualProducerType = config.producerType(); + assertEquals("async", actualProducerType); + } private static class TestableProcessor extends PutKafka { @@ -262,6 +344,13 @@ public class TestPutKafka { public MockProducer getProducer() { return producer; } + + /** + * Exposed for test verification + */ + public ProducerConfig createConfig(final ProcessContext context) { + return super.createConfig(context); + } } private static class MockProducer extends Producer<byte[], byte[]> {