[NIFI-413] Updating PutKafka properties to follow NiFi standards. Added validation for asynchronous properties.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8af84f3f Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8af84f3f Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8af84f3f Branch: refs/heads/develop Commit: 8af84f3f73bd7890c44c1f3b7597330023e34d16 Parents: 5b0648c Author: Brian Ghigiarelli <briang...@gmail.com> Authored: Fri Jun 19 18:49:21 2015 -0400 Committer: Brian Ghigiarelli <briang...@gmail.com> Committed: Fri Jun 19 18:49:21 2015 -0400 ---------------------------------------------------------------------- .../apache/nifi/processors/kafka/PutKafka.java | 129 +++++++++++++------ .../nifi/processors/kafka/TestPutKafka.java | 78 ++++++++++- 2 files changed, 167 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8af84f3f/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 5bd0d2b..e572622 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Properties; @@ -39,6 +40,8 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -74,6 +77,32 @@ public class PutKafka extends AbstractProcessor { + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" + " in data loss."); + /** + * AllowableValue for a Producer Type that synchronously sends messages to Kafka + */ + public static final AllowableValue PRODUCTER_TYPE_SYNCHRONOUS = new AllowableValue("sync", "Synchronous", "Send FlowFiles to Kafka immediately."); + + /** + * AllowableValue for a Producer Type that asynchronously sends messages to Kafka + */ + public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new AllowableValue("async", "Asynchronous", "Batch messages before sending them to Kafka." + + " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data."); + + /** + * AllowableValue for sending messages to Kafka without compression + */ + public static final AllowableValue COMPRESSION_CODEC_NONE = new AllowableValue("none", "None", "Compression will not be used for any topic."); + + /** + * AllowableValue for sending messages to Kafka with GZIP compression + */ + public static final AllowableValue COMPRESSION_CODEC_GZIP = new AllowableValue("gzip", "GZIP", "Compress messages using GZIP"); + + /** + * AllowableValue for sending messages to Kafka with Snappy compression + */ + public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy"); + public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() .name("Known Brokers") .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") @@ -138,64 +167,66 @@ public class PutKafka extends AbstractProcessor { .build(); public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder() .name("Producer Type") - .description("This parameter specifies whether the messages are sent asynchronously in a background thread." - + " Valid values are (1) async for asynchronous send and (2) sync for synchronous send." - + " By setting the producer to async we allow batching together of requests (which is great for throughput)" - + " but open the possibility of a failure of the client machine dropping unsent data.") + .description("This parameter specifies whether the messages are sent asynchronously in a background thread.") .required(true) - .allowableValues("sync", "async") + .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) - .defaultValue("sync") + .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue()) .build(); public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() - .name("Async Message Batch Size (batch.num.messages)") - .description("Used only if Producer Type is set to \"async\". The number of messages to send in one batch when using async mode." + .name("Async Batch Size") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode." + " The producer will wait until either this number of messages are ready" - + " to send or queue.buffer.max.ms is reached.") + + " to send or \"Queue Buffering Max Time\" is reached.") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .defaultValue("200").build(); - public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MS = new PropertyDescriptor.Builder() - .name("Queue Buffering Max Time (queue.buffering.max.ms)") - .description("Used only if Producer Type is set to \"async\". Maximum time to buffer data when using async mode. For example a setting of 100" + public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() + .name("Queue Buffering Max Time") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms" + " will try to batch together 100ms of messages to send at once. This will improve" + " throughput but adds message delivery latency due to the buffering.") .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("5000").build(); + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 secs").build(); public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder() - .name("Queue Buffer Max Count (queue.buffering.max.messages)") - .description("Used only if Producer Type is set to \"async\". The maximum number of unsent messages that can be queued up the producer when" - + " using async mode before either the producer must be blocked or data must be dropped.") + .name("Queue Buffer Max Count") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " The maximum number of unsent messages that can be queued up in the producer when" + + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .defaultValue("10000").build(); - public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT_MS = new PropertyDescriptor.Builder() - .name("Queue Enqueue Timeout (queue.enqueue.timeout.ms)") - .description("Used only if Producer Type is set to \"async\". The amount of time to block before dropping messages when running in async mode" - + " and the buffer has reached queue.buffering.max.messages. If set to 0 events will" + public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder() + .name("Queue Enqueue Timeout") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " The amount of time to block before dropping messages when running in " + + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode" + + " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will" + " be enqueued immediately or dropped if the queue is full (the producer send call will" - + " never block). If set to -1 the producer will block indefinitely and never willingly" + + " never block). If not set, the producer will block indefinitely and never willingly" + " drop a send.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("-1").build(); + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() - .name("Compression Codec (compression.codec)") + .name("Compression Codec") .description("This parameter allows you to specify the compression codec for all" - + " data generated by this producer. Valid values are \"none\", \"gzip\" and \"snappy\".") + + " data generated by this producer.") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .allowableValues("none", "gzip", "snappy") - .defaultValue("none").build(); + .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) + .defaultValue(COMPRESSION_CODEC_NONE.getValue()).build(); public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder() - .name("Compressed Topics (compressed.topics)") + .name("Compressed Topics") .description("This parameter allows you to set whether compression should be turned on" + " for particular topics. If the compression codec is anything other than" - + " NoCompressionCodec, enable compression only for specified topics if any." + + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any." + " If the list of compressed topics is empty, then enable the specified" - + " compression codec for all topics. If the compression codec is NoCompressionCodec," + + " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + "," + " compression is disabled for all topics") .required(false).build(); @@ -228,8 +259,8 @@ public class PutKafka extends AbstractProcessor { props.add(PRODUCER_TYPE); props.add(BATCH_NUM_MESSAGES); props.add(QUEUE_BUFFERING_MAX_MESSAGES); - props.add(QUEUE_BUFFERING_MAX_MS); - props.add(QUEUE_ENQUEUE_TIMEOUT_MS); + props.add(QUEUE_BUFFERING_MAX); + props.add(QUEUE_ENQUEUE_TIMEOUT); props.add(COMPRESSION_CODEC); props.add(COMPRESSED_TOPICS); props.add(clientName); @@ -237,6 +268,20 @@ public class PutKafka extends AbstractProcessor { } @Override + public Collection<ValidationResult> customValidate(final ValidationContext context) { + final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context)); + + final Integer batchMessages = context.getProperty(BATCH_NUM_MESSAGES).asInteger(); + final Integer bufferMaxMessages = context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger(); + + if (batchMessages > bufferMaxMessages) { + errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false).explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build()); + } + + return errors; + } + + @Override public Set<Relationship> getRelationships() { final Set<Relationship> relationships = new HashSet<>(1); relationships.add(REL_SUCCESS); @@ -265,10 +310,20 @@ public class PutKafka extends AbstractProcessor { properties.setProperty("message.send.max.retries", "1"); properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue()); properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue()); - properties.setProperty("queue.buffering.max.ms", context.getProperty(QUEUE_BUFFERING_MAX_MS).getValue()); + + Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); + if(queueBufferingMillis != null) { + properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis)); + } properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue()); - properties.setProperty("queue.enqueue.timeout.ms", context.getProperty(QUEUE_ENQUEUE_TIMEOUT_MS).getValue()); - properties.setProperty("compression.codec", context.getProperty(COMPRESSION_CODEC).getValue()); + + Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + if(queueEnqueueTimeoutMillis != null) { + properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis)); + } + + String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); + properties.setProperty("compression.codec", compressionCodec); String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue(); if(compressedTopics != null) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8af84f3f/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index ded0afa..5d1eacf 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -205,7 +205,7 @@ public class TestPutKafka { runner.setProperty(PutKafka.TOPIC, "${kafka.topic}"); runner.setProperty(PutKafka.KEY, "${kafka.key}"); runner.setProperty(PutKafka.TIMEOUT, "3 secs"); - runner.setProperty(PutKafka.PRODUCER_TYPE, "async"); + runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); keyValuePutExecute(runner); @@ -269,8 +269,8 @@ public class TestPutKafka { runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.PRODUCER_TYPE, "async"); - runner.setProperty(PutKafka.COMPRESSION_CODEC, "snappy"); + runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); + runner.setProperty(PutKafka.COMPRESSION_CODEC, PutKafka.COMPRESSION_CODEC_SNAPPY.getValue()); runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03"); ProcessContext context = runner.getProcessContext(); @@ -292,6 +292,78 @@ public class TestPutKafka { assertEquals("async", actualProducerType); } + + @Test + public void testProducerConfigAsyncQueueThresholds() { + + final TestableProcessor processor = new TestableProcessor(); + TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); + runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX, "7 secs"); + runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535"); + runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms"); + + ProcessContext context = runner.getProcessContext(); + ProducerConfig config = processor.createConfig(context); + + // Check that the queue thresholds were properly translated + assertEquals(7000, config.queueBufferingMaxMs()); + assertEquals(535, config.queueBufferingMaxMessages()); + assertEquals(200, config.queueEnqueueTimeoutMs()); + + // Check the producer type + String actualProducerType = config.producerType(); + assertEquals("async", actualProducerType); + + } + + @Test + public void testProducerConfigInvalidBatchSize() { + + final TestableProcessor processor = new TestableProcessor(); + TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); + runner.setProperty(PutKafka.BATCH_NUM_MESSAGES, "200"); + runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "100"); + + runner.assertNotValid(); + + } + + @Test + public void testProducerConfigAsyncDefaultEnqueueTimeout() { + + final TestableProcessor processor = new TestableProcessor(); + TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); + // Do not set QUEUE_ENQUEUE_TIMEOUT + + ProcessContext context = runner.getProcessContext(); + ProducerConfig config = processor.createConfig(context); + + // Check that the enqueue timeout defaults to -1 + assertEquals(-1, config.queueEnqueueTimeoutMs()); + + // Check the producer type + String actualProducerType = config.producerType(); + assertEquals("async", actualProducerType); + + } private static class TestableProcessor extends PutKafka {