Repository: nifi
Updated Branches:
  refs/heads/master f8f66fa22 -> 63c763885


NIFI-3363: PutKafka NPE with User-Defined partition

- Marked PutKafka Partition Strategy property as deprecated, as Kafka 0.8 
client doesn't use 'partitioner.class' as producer property, we don't have to 
specify it.
- Changed Partition Strategy property from a required one to a dynamic 
property, so that existing processor config can stay in valid state.
- Fixed partition property to work.
- Route a flow file if it failed to be published due to invalid partition.

This closes #1425


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/63c76388
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/63c76388
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/63c76388

Branch: refs/heads/master
Commit: 63c763885c36ab06111edf2c9d7743563ea57fcb
Parents: f8f66fa
Author: Koji Kawamura <ijokaruma...@apache.org>
Authored: Wed Jan 18 16:48:09 2017 +0900
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Fri Jan 27 12:48:23 2017 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/PutKafka.java  | 77 +++++++++-----------
 .../nifi/processors/kafka/PutKafkaTest.java     | 57 +++++++++++++++
 2 files changed, 93 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/63c76388/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index ab0618b..616c6f3 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,8 +39,6 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
@@ -97,11 +94,20 @@ public class PutKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
     public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new 
AllowableValue("snappy", "Snappy",
             "Compress messages using Snappy");
 
+    /**
+     * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' 
property.
+     */
     static final AllowableValue ROUND_ROBIN_PARTITIONING = new 
AllowableValue("Round Robin", "Round Robin",
             "Messages will be assigned partitions in a round-robin fashion, 
sending the first message to Partition 1, "
                     + "the next Partition to Partition 2, and so on, wrapping 
as necessary.");
+    /**
+     * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' 
property.
+     */
     static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("Random Robin", "Random",
             "Messages will be assigned to random partitions.");
+    /**
+     * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' 
property. To specify partition, simply configure the 'partition' property.
+     */
     static final AllowableValue USER_DEFINED_PARTITIONING = new 
AllowableValue("User-Defined", "User-Defined",
             "The <Partition> property will be used to determine the partition. 
All messages within the same FlowFile will be "
                     + "assigned to the same partition.");
@@ -120,19 +126,22 @@ public class PutKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(true)
             .build();
+    /**
+     * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' 
property.
+     * This property is still valid as a dynamic property, so that existing 
processor configuration can stay valid.
+     */
     static final PropertyDescriptor PARTITION_STRATEGY = new 
PropertyDescriptor.Builder()
             .name("Partition Strategy")
-            .description("Specifies how messages should be partitioned when 
sent to Kafka")
+            .description("Deprecated. Used to specify how messages should be 
partitioned when sent to Kafka, but it's no longer used.")
             .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, 
USER_DEFINED_PARTITIONING)
-            .defaultValue(ROUND_ROBIN_PARTITIONING.getValue())
-            .required(true)
+            .dynamic(true)
             .build();
     public static final PropertyDescriptor PARTITION = new 
PropertyDescriptor.Builder()
             .name("Partition")
             .description("Specifies which Kafka Partition to add the message 
to. If using a message delimiter, all messages "
                             + "in the same FlowFile will be sent to the same 
partition. If a partition is specified but is not valid, "
-                            + "then all messages within the same FlowFile will 
use the same partition but it remains undefined which partition is used.")
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                            + "then the FlowFile will be routed to failure 
relationship.")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .expressionLanguageSupported(true)
             .required(false)
             .build();
@@ -247,7 +256,6 @@ public class PutKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.add(SEED_BROKERS);
         _propertyDescriptors.add(TOPIC);
-        _propertyDescriptors.add(PARTITION_STRATEGY);
         _propertyDescriptors.add(PARTITION);
         _propertyDescriptors.add(KEY);
         _propertyDescriptors.add(DELIVERY_GUARANTEE);
@@ -310,7 +318,14 @@ public class PutKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
             @Override
             public void process(InputStream contentStream) throws IOException {
                 PublishingContext publishingContext = 
PutKafka.this.buildPublishingContext(flowFile, context, contentStream);
-                KafkaPublisherResult result = 
PutKafka.this.kafkaResource.publish(publishingContext);
+                KafkaPublisherResult result = null;
+                try {
+                    result = 
PutKafka.this.kafkaResource.publish(publishingContext);
+                } catch (final IllegalArgumentException e) {
+                    getLogger().error("Failed to publish {}, due to {}", new 
Object[]{flowFile, e}, e);
+                    result = new KafkaPublisherResult(0, -1);
+
+                }
                 publishResultRef.set(result);
             }
         });
@@ -399,26 +414,16 @@ public class PutKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
 
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        if (PARTITION_STRATEGY.getName().equals(propertyDescriptorName)) {
+            return PARTITION_STRATEGY;
+        }
+
         return new PropertyDescriptor.Builder()
                 .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
                 
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
                 .build();
     }
 
-    @Override
-    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
-
-        final String partitionStrategy = 
validationContext.getProperty(PARTITION_STRATEGY).getValue();
-        if 
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())
-                && !validationContext.getProperty(PARTITION).isSet()) {
-            results.add(new 
ValidationResult.Builder().subject("Partition").valid(false)
-                    .explanation("The <Partition> property must be set when 
configured to use the User-Defined Partitioning Strategy")
-                    .build());
-        }
-        return results;
-    }
-
     /**
      *
      */
@@ -439,15 +444,11 @@ public class PutKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
      *
      */
     private Integer determinePartition(ProcessContext context, FlowFile 
flowFile) {
-        String partitionStrategy = 
context.getProperty(PARTITION_STRATEGY).getValue();
-        Integer partitionValue = null;
-        if 
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) {
-            String pv = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
-            if (pv != null){
-                partitionValue = 
Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
-            }
+        String pv = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+        if (pv != null){
+            return 
Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
         }
-        return partitionValue;
+        return null;
     }
 
     /**
@@ -493,19 +494,13 @@ public class PutKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
         properties.setProperty("timeout.ms", timeout);
         properties.setProperty("metadata.fetch.timeout.ms", timeout);
 
-        String partitionStrategy = 
context.getProperty(PARTITION_STRATEGY).getValue();
-        String partitionerClass = null;
-        if 
(partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
-            partitionerClass = 
Partitioners.RoundRobinPartitioner.class.getName();
-        } else if 
(partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
-            partitionerClass = Partitioners.RandomPartitioner.class.getName();
-        }
-        properties.setProperty("partitioner.class", partitionerClass);
-
         // Set Dynamic Properties
         for (final Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
             PropertyDescriptor descriptor = entry.getKey();
             if (descriptor.isDynamic()) {
+                if (PARTITION_STRATEGY.equals(descriptor)) {
+                    continue;
+                }
                 if (properties.containsKey(descriptor.getName())) {
                     this.getLogger().warn("Overriding existing property '" + 
descriptor.getName() + "' which had value of '"
                                     + 
properties.getProperty(descriptor.getName()) + "' with dynamically set value '"

http://git-wip-us.apache.org/repos/asf/nifi/blob/63c76388/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
index 8437b00..77b2bb9 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.kafka;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.nio.charset.StandardCharsets;
@@ -210,6 +211,62 @@ public class PutKafkaTest {
         runner.shutdown();
     }
 
+    @Test
+    public void validateDeprecatedPartitionStrategy() {
+        String topicName = "validateDeprecatedPartitionStrategy";
+        PutKafka putKafka = new PutKafka();
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PutKafka.TOPIC, topicName);
+        runner.setProperty(PutKafka.CLIENT_NAME, "foo");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + 
kafkaLocal.getKafkaPort());
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
+
+        // Old configuration using deprecated property still work.
+        runner.setProperty(PutKafka.PARTITION_STRATEGY, 
PutKafka.USER_DEFINED_PARTITIONING);
+        runner.setProperty(PutKafka.PARTITION, "${partition}");
+
+        runner.assertValid();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("partition", "0");
+        runner.enqueue("Hello 
World\nGoodbye".getBytes(StandardCharsets.UTF_8), attributes);
+        runner.run(1, false);
+
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+        ConsumerIterator<byte[], byte[]> consumer = 
this.buildConsumer(topicName);
+        assertEquals("Hello World", new String(consumer.next().message(), 
StandardCharsets.UTF_8));
+        assertEquals("Goodbye", new String(consumer.next().message(), 
StandardCharsets.UTF_8));
+
+        runner.shutdown();
+    }
+
+    @Test
+    public void validatePartitionOutOfBounds() {
+        String topicName = "validatePartitionOutOfBounds";
+        PutKafka putKafka = new PutKafka();
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PutKafka.TOPIC, topicName);
+        runner.setProperty(PutKafka.CLIENT_NAME, "foo");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + 
kafkaLocal.getKafkaPort());
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
+        runner.setProperty(PutKafka.PARTITION, "${partition}");
+
+        runner.assertValid();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("partition", "123");
+        runner.enqueue("Hello 
World\nGoodbye".getBytes(StandardCharsets.UTF_8), attributes);
+        runner.run(1, false);
+
+        assertTrue("Error message should be logged", 
runner.getLogger().getErrorMessages().size() > 0);
+        runner.assertTransferCount(PutKafka.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
+
+        runner.shutdown();
+    }
+
     private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
         Properties props = new Properties();
         props.put("zookeeper.connect", "0.0.0.0:" + 
kafkaLocal.getZookeeperPort());

Reply via email to