NIFI-3363: PutKafka NPE with User-Defined partition - Marked PutKafka Partition Strategy property as deprecated, as Kafka 0.8 client doesn't use 'partitioner.class' as producer property, we don't have to specify it. - Changed Partition Strategy property from a required one to a dynamic property, so that existing processor config can stay in valid state. - Fixed partition property to work. - Route a flow file if it failed to be published due to invalid partition.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/008bffd9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/008bffd9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/008bffd9 Branch: refs/heads/support/nifi-0.7.x Commit: 008bffd9cd1787295840b411f1498439265bc8c5 Parents: 3a09481 Author: Koji Kawamura <ijokaruma...@apache.org> Authored: Wed Jan 18 17:44:40 2017 +0900 Committer: Oleg Zhurakousky <o...@suitcase.io> Committed: Fri Jan 27 12:43:24 2017 -0500 ---------------------------------------------------------------------- .../apache/nifi/processors/kafka/PutKafka.java | 77 ++++++++++---------- .../nifi/processors/kafka/PutKafkaTest.java | 59 ++++++++++++++- 2 files changed, 95 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/008bffd9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index abdf73d..38ec20c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -54,8 +54,7 @@ import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult; @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.8.x"}) -@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka, , specifically for 0.8.x versions. " + - "The messages to send may be individual FlowFiles or may be delimited, using a " +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka, specifically for 0.8.x versions. The messages to send may be individual FlowFiles or may be delimited, using a " + "user-specified delimiter, such as a new-line. The complementary NiFi processor for fetching messages is GetKafka.") @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." @@ -98,11 +97,20 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy"); + /** + * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property. + */ static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin", "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + "the next Partition to Partition 2, and so on, wrapping as necessary."); + /** + * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property. + */ static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random", "Messages will be assigned to random partitions."); + /** + * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property. To specify partition, simply configure the 'partition' property. + */ static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined", "The <Partition> property will be used to determine the partition. All messages within the same FlowFile will be " + "assigned to the same partition."); @@ -121,19 +129,22 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); + /** + * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property. + * This property is still valid as a dynamic property, so that existing processor configuration can stay valid. + */ static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder() .name("Partition Strategy") - .description("Specifies how messages should be partitioned when sent to Kafka") + .description("Deprecated. Used to specify how messages should be partitioned when sent to Kafka, but it's no longer used.") .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING) - .defaultValue(ROUND_ROBIN_PARTITIONING.getValue()) - .required(true) + .dynamic(true) .build(); public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() .name("Partition") .description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages " + "in the same FlowFile will be sent to the same partition. If a partition is specified but is not valid, " - + "then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + + "then the FlowFile will be routed to failure relationship.") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .expressionLanguageSupported(true) .required(false) .build(); @@ -250,7 +261,6 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.add(SEED_BROKERS); _propertyDescriptors.add(TOPIC); - _propertyDescriptors.add(PARTITION_STRATEGY); _propertyDescriptors.add(PARTITION); _propertyDescriptors.add(KEY); _propertyDescriptors.add(DELIVERY_GUARANTEE); @@ -313,7 +323,14 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { @Override public void process(InputStream contentStream) throws IOException { PublishingContext publishingContext = PutKafka.this.buildPublishingContext(flowFile, context, contentStream); - KafkaPublisherResult result = PutKafka.this.kafkaResource.publish(publishingContext); + KafkaPublisherResult result = null; + try { + result = PutKafka.this.kafkaResource.publish(publishingContext); + } catch (final IllegalArgumentException e) { + getLogger().error("Failed to publish {}, due to {}", new Object[]{flowFile, e}, e); + result = new KafkaPublisherResult(0, -1); + + } publishResultRef.set(result); } }); @@ -402,26 +419,16 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + if (PARTITION_STRATEGY.getName().equals(propertyDescriptorName)) { + return PARTITION_STRATEGY; + } + return new PropertyDescriptor.Builder() .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) .build(); } - @Override - protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { - final List<ValidationResult> results = new ArrayList<>(); - - final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue(); - if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) - && !validationContext.getProperty(PARTITION).isSet()) { - results.add(new ValidationResult.Builder().subject("Partition").valid(false) - .explanation("The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy") - .build()); - } - return results; - } - /** * */ @@ -442,15 +449,11 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { * */ private Integer determinePartition(ProcessContext context, FlowFile flowFile) { - String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); - Integer partitionValue = null; - if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) { - String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); - if (pv != null){ - partitionValue = Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue()); - } + String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); + if (pv != null){ + return Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue()); } - return partitionValue; + return null; } /** @@ -496,19 +499,13 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { properties.setProperty("timeout.ms", timeout); properties.setProperty("metadata.fetch.timeout.ms", timeout); - String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); - String partitionerClass = null; - if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) { - partitionerClass = Partitioners.RoundRobinPartitioner.class.getName(); - } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) { - partitionerClass = Partitioners.RandomPartitioner.class.getName(); - } - properties.setProperty("partitioner.class", partitionerClass); - // Set Dynamic Properties for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { PropertyDescriptor descriptor = entry.getKey(); if (descriptor.isDynamic()) { + if (PARTITION_STRATEGY.equals(descriptor)) { + continue; + } if (properties.containsKey(descriptor.getName())) { this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '" + properties.getProperty(descriptor.getName()) + "' with dynamically set value '" http://git-wip-us.apache.org/repos/asf/nifi/blob/008bffd9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java index fbd2963..77b2bb9 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.kafka; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.nio.charset.StandardCharsets; @@ -89,7 +90,7 @@ public class PutKafkaTest { } @Test - public void validateMultiCharacterDelimiyedMessages() { + public void validateMultiCharacterDelimitedMessages() { String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner"; PutKafka putKafka = new PutKafka(); TestRunner runner = TestRunners.newTestRunner(putKafka); @@ -210,6 +211,62 @@ public class PutKafkaTest { runner.shutdown(); } + @Test + public void validateDeprecatedPartitionStrategy() { + String topicName = "validateDeprecatedPartitionStrategy"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); + + // Old configuration using deprecated property still work. + runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING); + runner.setProperty(PutKafka.PARTITION, "${partition}"); + + runner.assertValid(); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("partition", "0"); + runner.enqueue("Hello World\nGoodbye".getBytes(StandardCharsets.UTF_8), attributes); + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); + assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); + + runner.shutdown(); + } + + @Test + public void validatePartitionOutOfBounds() { + String topicName = "validatePartitionOutOfBounds"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); + runner.setProperty(PutKafka.PARTITION, "${partition}"); + + runner.assertValid(); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("partition", "123"); + runner.enqueue("Hello World\nGoodbye".getBytes(StandardCharsets.UTF_8), attributes); + runner.run(1, false); + + assertTrue("Error message should be logged", runner.getLogger().getErrorMessages().size() > 0); + runner.assertTransferCount(PutKafka.REL_SUCCESS, 0); + runner.assertTransferCount(PutKafka.REL_FAILURE, 1); + + runner.shutdown(); + } + private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) { Properties props = new Properties(); props.put("zookeeper.connect", "0.0.0.0:" + kafkaLocal.getZookeeperPort());