Repository: nifi
Updated Branches:
  refs/heads/NIFI-655 5ef53b6fe -> 769f19ee8


NIFI-1097: Rewrite PutKafka to use the new producer api


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/22de23ba
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/22de23ba
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/22de23ba

Branch: refs/heads/NIFI-655
Commit: 22de23baa6ae7318574ccb112d03daf8f8dfa10e
Parents: 7a165b6
Author: Mark Payne <marka...@hotmail.com>
Authored: Tue Nov 3 16:17:32 2015 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Fri Nov 13 10:46:46 2015 -0500

----------------------------------------------------------------------
 .../nifi-kafka-processors/pom.xml               |   5 +
 .../apache/nifi/processors/kafka/PutKafka.java  | 712 +++++++++++++------
 .../nifi/processors/kafka/TestPutKafka.java     | 439 ++++++------
 3 files changed, 727 insertions(+), 429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/22de23ba/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
index c2e86fe..1a8dc9d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
@@ -35,6 +35,11 @@
             <artifactId>nifi-utils</artifactId>
         </dependency>
         <dependency>
+                   <groupId>org.apache.kafka</groupId>
+                   <artifactId>kafka-clients</artifactId>
+                   <version>0.8.2.2</version>
+               </dependency>
+        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.9.1</artifactId>
             <version>0.8.2.2</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/22de23ba/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 09025a4..ea7f7bb 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -21,30 +21,47 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 
+import org.apache.kafka.clients.producer.BufferExhaustedException;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
@@ -56,21 +73,20 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
 import org.apache.nifi.util.LongHolder;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import scala.actors.threadpool.Arrays;
 
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
-@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka")
-public class PutKafka extends AbstractProcessor {
+@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka. The messages to send may be individual FlowFiles or may be 
delimited, using a "
+    + "user-specified delimiter, such as a new-line.")
+@TriggerWhenEmpty // because we have a queue of sessions that are ready to be 
committed
+public class PutKafka extends AbstractSessionFactoryProcessor {
 
     private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
     private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + 
"(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
 
-    public static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed 
to"
+    public static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("all", "Guarantee Replicated Delivery", "FlowFile will be routed 
to"
         + " failure unless the message is replicated to the appropriate number 
of Kafka Nodes according to the Topic configuration");
     public static final AllowableValue DELIVERY_ONE_NODE = new 
AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed"
         + " to success if the message is received by a single Kafka node, 
whether or not it is replicated. This is faster than"
@@ -79,16 +95,6 @@ public class PutKafka extends AbstractProcessor {
         + " successfully writing the content to a Kafka node, without waiting 
for a response. This provides the best performance but may result"
         + " in data loss.");
 
-    /**
-     * AllowableValue for a Producer Type that synchronously sends messages to 
Kafka
-     */
-    public static final AllowableValue PRODUCTER_TYPE_SYNCHRONOUS = new 
AllowableValue("sync", "Synchronous", "Send FlowFiles to Kafka immediately.");
-
-    /**
-     * AllowableValue for a Producer Type that asynchronously sends messages 
to Kafka
-     */
-    public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new 
AllowableValue("async", "Asynchronous", "Batch messages before sending them to 
Kafka."
-        + " While this will improve throughput, it opens the possibility that 
a failure on the client machine will drop unsent data.");
 
     /**
      * AllowableValue for sending messages to Kafka without compression
@@ -105,6 +111,13 @@ public class PutKafka extends AbstractProcessor {
      */
     public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new 
AllowableValue("snappy", "Snappy", "Compress messages using Snappy");
 
+    static final AllowableValue ROUND_ROBIN_PARTITIONING = new 
AllowableValue("Round Robin", "Round Robin",
+        "Messages will be assigned partitions in a round-robin fashion, 
sending the first message to Partition 1, the next Partition to Partition 2, 
and so on, wrapping as necessary.");
+    static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("Random Robin", "Random",
+        "Messages will be assigned to random partitions.");
+    static final AllowableValue USER_DEFINED_PARTITIONING = new 
AllowableValue("User-Defined", "User-Defined",
+        "The <Partition> property will be used to determine the partition. All 
messages within the same FlowFile will be assigned to the same partition.");
+
 
     public static final PropertyDescriptor SEED_BROKERS = new 
PropertyDescriptor.Builder()
         .name("Known Brokers")
@@ -120,6 +133,21 @@ public class PutKafka extends AbstractProcessor {
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .expressionLanguageSupported(true)
         .build();
+    static final PropertyDescriptor PARTITION_STRATEGY = new 
PropertyDescriptor.Builder()
+        .name("Partition Strategy")
+        .description("Specifies how messages should be partitioned when sent 
to Kafka")
+        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, 
USER_DEFINED_PARTITIONING)
+        .defaultValue(ROUND_ROBIN_PARTITIONING.getValue())
+        .required(true)
+        .build();
+    public static final PropertyDescriptor PARTITION = new 
PropertyDescriptor.Builder()
+        .name("Partition")
+        .description("Specifies which Kafka Partition to add the message to. 
If using a message delimiter, all messages in the same FlowFile will be sent to 
the same partition. "
+            + "If a partition is specified but is not valid, then all messages 
within the same FlowFile will use the same partition but it remains undefined 
which partition is used.")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(false)
+        .build();
     public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
         .name("Kafka Key")
         .description("The Key to use for the Message")
@@ -140,7 +168,10 @@ public class PutKafka extends AbstractProcessor {
         .description("Specifies the delimiter to use for splitting apart 
multiple messages within a single FlowFile. "
             + "If not specified, the entire content of the FlowFile will be 
used as a single message. "
             + "If specified, the contents of the FlowFile will be split on 
this delimiter and each section "
-            + "sent as a separate Kafka message.")
+            + "sent as a separate Kafka message. Note that if messages are 
delimited and some messages for a given FlowFile "
+            + "are transferred successfully while others are not, the messages 
will be split into individual FlowFiles, such that those "
+            + "messages that were successfully sent are routed to the 
'success' relationship while other messages are sent to the 'failure' "
+            + "relationship.")
         .required(false)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .expressionLanguageSupported(true)
@@ -151,6 +182,13 @@ public class PutKafka extends AbstractProcessor {
         .required(true)
         .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
         .expressionLanguageSupported(false)
+        .defaultValue("5 MB")
+        .build();
+    static final PropertyDescriptor MAX_RECORD_SIZE = new 
PropertyDescriptor.Builder()
+        .name("Max Record Size")
+        .description("The maximum size that any individual record can be.")
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .required(true)
         .defaultValue("1 MB")
         .build();
     public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
@@ -168,20 +206,10 @@ public class PutKafka extends AbstractProcessor {
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .expressionLanguageSupported(false)
         .build();
-    public static final PropertyDescriptor PRODUCER_TYPE = new 
PropertyDescriptor.Builder()
-        .name("Producer Type")
-        .description("This parameter specifies whether the messages are sent 
asynchronously in a background thread.")
-        .required(true)
-        .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, 
PRODUCTER_TYPE_ASYNCHRONOUS)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue())
-        .build();
     public static final PropertyDescriptor BATCH_NUM_MESSAGES = new 
PropertyDescriptor.Builder()
         .name("Async Batch Size")
-        .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
-            + " The number of messages to send in one batch when using " + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode."
-            + " The producer will wait until either this number of messages 
are ready"
+        .displayName("Batch Size")
+        .description("The number of messages to send in one batch. The 
producer will wait until either this number of messages are ready"
             + " to send or \"Queue Buffering Max Time\" is reached.")
         .required(true)
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
@@ -189,35 +217,13 @@ public class PutKafka extends AbstractProcessor {
         .build();
     public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new 
PropertyDescriptor.Builder()
         .name("Queue Buffering Max Time")
-        .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
-            + " Maximum time to buffer data when using " + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 
100 ms"
-            + " will try to batch together 100ms of messages to send at once. 
This will improve"
+        .description("Maximum time to buffer data before sending to Kafka. For 
example a setting of 100 ms"
+            + " will try to batch together 100 milliseconds' worth of messages 
to send at once. This will improve"
             + " throughput but adds message delivery latency due to the 
buffering.")
         .required(true)
         .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
         .defaultValue("5 secs")
         .build();
-    public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new 
PropertyDescriptor.Builder()
-        .name("Queue Buffer Max Count")
-        .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
-            + " The maximum number of unsent messages that can be queued up in 
the producer when"
-            + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " 
mode before either the producer must be blocked or data must be dropped.")
-        .required(true)
-        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-        .defaultValue("10000")
-        .build();
-    public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new 
PropertyDescriptor.Builder()
-        .name("Queue Enqueue Timeout")
-        .description("Used only if Producer Type is set to \"" + 
PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
-            + " The amount of time to block before dropping messages when 
running in "
-            + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode"
-            + " and the buffer has reached the \"Queue Buffer Max Count\". If 
set to 0, events will"
-            + " be enqueued immediately or dropped if the queue is full (the 
producer send call will"
-            + " never block). If not set, the producer will block indefinitely 
and never willingly"
-            + " drop a send.")
-        .required(false)
-        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-        .build();
     public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
         .name("Compression Codec")
         .description("This parameter allows you to specify the compression 
codec for all"
@@ -227,16 +233,6 @@ public class PutKafka extends AbstractProcessor {
         .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, 
COMPRESSION_CODEC_SNAPPY)
         .defaultValue(COMPRESSION_CODEC_NONE.getValue())
         .build();
-    public static final PropertyDescriptor COMPRESSED_TOPICS = new 
PropertyDescriptor.Builder()
-        .name("Compressed Topics")
-        .description("This parameter allows you to set whether compression 
should be turned on"
-            + " for particular topics. If the compression codec is anything 
other than"
-            + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable 
compression only for specified topics if any."
-            + " If the list of compressed topics is empty, then enable the 
specified"
-            + " compression codec for all topics. If the compression codec is 
" + COMPRESSION_CODEC_NONE.getDisplayName() + ","
-            + " compression is disabled for all topics")
-        .required(false)
-        .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -247,7 +243,13 @@ public class PutKafka extends AbstractProcessor {
         .description("Any FlowFile that cannot be sent to Kafka will be routed 
to this Relationship")
         .build();
 
-    private final BlockingQueue<Producer<byte[], byte[]>> producers = new 
LinkedBlockingQueue<>();
+    private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
+    private final BlockingQueue<FlowFileMessageBatch> completeBatches = new 
LinkedBlockingQueue<>();
+    private final Set<FlowFileMessageBatch> activeBatches = 
Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
+
+    private final ConcurrentMap<String, AtomicLong> partitionIndexMap = new 
ConcurrentHashMap<>();
+
+    private volatile Producer<byte[], byte[]> producer;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -259,36 +261,21 @@ public class PutKafka extends AbstractProcessor {
         final List<PropertyDescriptor> props = new ArrayList<>();
         props.add(SEED_BROKERS);
         props.add(TOPIC);
+        props.add(PARTITION_STRATEGY);
+        props.add(PARTITION);
         props.add(KEY);
         props.add(DELIVERY_GUARANTEE);
         props.add(MESSAGE_DELIMITER);
         props.add(MAX_BUFFER_SIZE);
+        props.add(MAX_RECORD_SIZE);
         props.add(TIMEOUT);
-        props.add(PRODUCER_TYPE);
         props.add(BATCH_NUM_MESSAGES);
-        props.add(QUEUE_BUFFERING_MAX_MESSAGES);
         props.add(QUEUE_BUFFERING_MAX);
-        props.add(QUEUE_ENQUEUE_TIMEOUT);
         props.add(COMPRESSION_CODEC);
-        props.add(COMPRESSED_TOPICS);
         props.add(clientName);
         return props;
     }
 
-    @Override
-    public Collection<ValidationResult> customValidate(final ValidationContext 
context) {
-        final List<ValidationResult> errors = new 
ArrayList<>(super.customValidate(context));
-
-        final Integer batchMessages = 
context.getProperty(BATCH_NUM_MESSAGES).asInteger();
-        final Integer bufferMaxMessages = 
context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger();
-
-        if (batchMessages > bufferMaxMessages) {
-            errors.add(new ValidationResult.Builder().subject("Batch Size, 
Queue Buffer").valid(false)
-                .explanation("Batch Size (" + batchMessages + ") must be equal 
to or less than the Queue Buffer Max Count (" + bufferMaxMessages + 
")").build());
-        }
-
-        return errors;
-    }
 
     @Override
     public Set<Relationship> getRelationships() {
@@ -298,71 +285,131 @@ public class PutKafka extends AbstractProcessor {
         return relationships;
     }
 
-    @OnStopped
-    public void closeProducers() {
-        Producer<byte[], byte[]> producer;
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        final String partitionStrategy = 
validationContext.getProperty(PARTITION_STRATEGY).getValue();
+        if 
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) && 
!validationContext.getProperty(PARTITION).isSet()) {
+            results.add(new 
ValidationResult.Builder().subject("Partition").valid(false).explanation(
+                "The <Partition> property must be set when configured to use 
the User-Defined Partitioning Strategy").build());
+        }
+
+        return results;
+    }
+
+    protected Producer<byte[], byte[]> getProducer() {
+        return producer;
+    }
 
-        while ((producer = producers.poll()) != null) {
+    @OnStopped
+    public void cleanup() {
+        final Producer<byte[], byte[]> producer = getProducer();
+        if (producer != null) {
             producer.close();
         }
+
+        for (final FlowFileMessageBatch batch : activeBatches) {
+            batch.cancelOrComplete();
+        }
+    }
+
+    @OnScheduled
+    public void createProducer(final ProcessContext context) {
+        producer = new KafkaProducer<byte[], byte[]>(createConfig(context), 
new ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    protected ProducerConfig createConfig(final ProcessContext context) {
+    protected int getActiveMessageBatchCount() {
+        return activeBatches.size();
+    }
+
+    protected int getCompleteMessageBatchCount() {
+        return completeBatches.size();
+    }
+
+    protected Properties createConfig(final ProcessContext context) {
         final String brokers = context.getProperty(SEED_BROKERS).getValue();
 
         final Properties properties = new Properties();
-        properties.setProperty("metadata.broker.list", brokers);
-        properties.setProperty("request.required.acks", 
context.getProperty(DELIVERY_GUARANTEE).getValue());
+        properties.setProperty("bootstrap.servers", brokers);
+        properties.setProperty("acks", 
context.getProperty(DELIVERY_GUARANTEE).getValue());
         properties.setProperty("client.id", 
context.getProperty(CLIENT_NAME).getValue());
-        properties.setProperty("request.timeout.ms", 
String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
 
-        properties.setProperty("message.send.max.retries", "1");
-        properties.setProperty("producer.type", 
context.getProperty(PRODUCER_TYPE).getValue());
-        properties.setProperty("batch.num.messages", 
context.getProperty(BATCH_NUM_MESSAGES).getValue());
+        final String timeout = 
String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
+        properties.setProperty("timeout.ms", timeout);
+        properties.setProperty("metadata.fetch.timeout.ms", timeout);
 
-        final Long queueBufferingMillis = 
context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
-        if (queueBufferingMillis != null) {
-            properties.setProperty("queue.buffering.max.ms", 
String.valueOf(queueBufferingMillis));
-        }
-        properties.setProperty("queue.buffering.max.messages", 
context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue());
+        properties.setProperty("batch.size", 
context.getProperty(BATCH_NUM_MESSAGES).getValue());
+        properties.setProperty("max.request.size", 
String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue()));
 
-        final Long queueEnqueueTimeoutMillis = 
context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
-        if (queueEnqueueTimeoutMillis != null) {
-            properties.setProperty("queue.enqueue.timeout.ms", 
String.valueOf(queueEnqueueTimeoutMillis));
-        }
+        final long maxBufferSize = 
context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
+        properties.setProperty("buffer.memory", String.valueOf(maxBufferSize));
 
         final String compressionCodec = 
context.getProperty(COMPRESSION_CODEC).getValue();
-        properties.setProperty("compression.codec", compressionCodec);
+        properties.setProperty("compression.type", compressionCodec);
 
-        final String compressedTopics = 
context.getProperty(COMPRESSED_TOPICS).getValue();
-        if (compressedTopics != null) {
-            properties.setProperty("compressed.topics", compressedTopics);
+        final Long queueBufferingMillis = 
context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
+        if (queueBufferingMillis != null) {
+            properties.setProperty("linger.ms", 
String.valueOf(queueBufferingMillis));
         }
 
-        return new ProducerConfig(properties);
-    }
+        properties.setProperty("retries", "0");
+        properties.setProperty("block.on.buffer.full", "false");
 
-    protected Producer<byte[], byte[]> createProducer(final ProcessContext 
context) {
-        return new Producer<>(createConfig(context));
+        return properties;
     }
 
-    private Producer<byte[], byte[]> borrowProducer(final ProcessContext 
context) {
-        final Producer<byte[], byte[]> producer = producers.poll();
-        return producer == null ? createProducer(context) : producer;
-    }
+    private Integer getPartition(final ProcessContext context, final FlowFile 
flowFile, final String topic) {
+        final long unnormalizedIndex;
+
+        final String partitionStrategy = 
context.getProperty(PARTITION_STRATEGY).getValue();
+        if 
(partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
+            AtomicLong partitionIndex = partitionIndexMap.get(topic);
+            if (partitionIndex == null) {
+                partitionIndex = new AtomicLong(0L);
+                final AtomicLong existing = 
partitionIndexMap.putIfAbsent(topic, partitionIndex);
+                if (existing != null) {
+                    partitionIndex = existing;
+                }
+            }
+
+            unnormalizedIndex = partitionIndex.getAndIncrement();
+        } else if 
(partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
+            return null;
+        } else {
+            if (context.getProperty(PARTITION).isSet()) {
+                final String partitionValue = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+
+                if (NUMBER_PATTERN.matcher(partitionValue).matches()) {
+                    // Subtract 1 because if the partition is "3" then we want 
to get index 2 into the List of partitions.
+                    unnormalizedIndex = Long.parseLong(partitionValue) - 1;
+                } else {
+                    unnormalizedIndex = partitionValue.hashCode();
+                }
+            } else {
+                return null;
+            }
+        }
 
-    private void returnProducer(final Producer<byte[], byte[]> producer) {
-        producers.offer(producer);
+        final Producer<byte[], byte[]> producer = getProducer();
+        final List<PartitionInfo> partitionInfos = 
producer.partitionsFor(topic);
+        final int partitionIdx = (int) (unnormalizedIndex % 
partitionInfos.size());
+        return partitionInfos.get(partitionIdx).partition();
     }
 
     @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
+        FlowFileMessageBatch batch;
+        while ((batch = completeBatches.poll()) != null) {
+            batch.completeSession();
+        }
+
+        final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
         }
 
-        final long start = System.nanoTime();
         final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
         final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
         final byte[] keyBytes = key == null ? null : 
key.getBytes(StandardCharsets.UTF_8);
@@ -371,8 +418,7 @@ public class PutKafka extends AbstractProcessor {
             delimiter = delimiter.replace("\\n", "\n").replace("\\r", 
"\r").replace("\\t", "\t");
         }
 
-        final long maxBufferSize = 
context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
-        final Producer<byte[], byte[]> producer = borrowProducer(context);
+        final Producer<byte[], byte[]> producer = getProducer();
 
         if (delimiter == null) {
             // Send the entire FlowFile as a single message.
@@ -384,31 +430,38 @@ public class PutKafka extends AbstractProcessor {
                 }
             });
 
-            boolean error = false;
+            final Integer partition;
             try {
-                final KeyedMessage<byte[], byte[]> message;
-                if (key == null) {
-                    message = new KeyedMessage<>(topic, value);
-                } else {
-                    message = new KeyedMessage<>(topic, keyBytes, value);
-                }
-
-                producer.send(message);
-                final long nanos = System.nanoTime() - start;
-
-                session.getProvenanceReporter().send(flowFile, "kafka://" + 
topic);
-                session.transfer(flowFile, REL_SUCCESS);
-                getLogger().info("Successfully sent {} to Kafka in {} millis", 
new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) });
+                partition = getPartition(context, flowFile, topic);
             } catch (final Exception e) {
-                getLogger().error("Failed to send {} to Kafka due to {}; 
routing to failure", new Object[] { flowFile, e });
+                getLogger().error("Failed to obtain a partition for {} due to 
{}", new Object[] {flowFile, e});
                 session.transfer(session.penalize(flowFile), REL_FAILURE);
-                error = true;
-            } finally {
-                if (error) {
-                    producer.close();
-                } else {
-                    returnProducer(producer);
-                }
+                session.commit();
+                return;
+            }
+
+            final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(topic, partition, keyBytes, value);
+
+            final FlowFileMessageBatch messageBatch = new 
FlowFileMessageBatch(session, flowFile, topic);
+            messageBatch.setNumMessages(1);
+            activeBatches.add(messageBatch);
+
+            try {
+                producer.send(producerRecord, new Callback() {
+                    @Override
+                    public void onCompletion(final RecordMetadata metadata, 
final Exception exception) {
+                        if (exception == null) {
+                            // record was successfully sent.
+                            messageBatch.addSuccessfulRange(0L, 
flowFile.getSize(), metadata.offset());
+                        } else {
+                            messageBatch.addFailedRange(0L, 
flowFile.getSize(), exception);
+                        }
+                    }
+                });
+            } catch (final BufferExhaustedException bee) {
+                messageBatch.addFailedRange(0L, flowFile.getSize(), bee);
+                context.yield();
+                return;
             }
         } else {
             final byte[] delimiterBytes = 
delimiter.getBytes(StandardCharsets.UTF_8);
@@ -418,9 +471,9 @@ public class PutKafka extends AbstractProcessor {
             // the stream of bytes in the FlowFile
             final NonThreadSafeCircularBuffer buffer = new 
NonThreadSafeCircularBuffer(delimiterBytes);
 
-            boolean error = false;
-            final LongHolder lastMessageOffset = new LongHolder(0L);
             final LongHolder messagesSent = new LongHolder(0L);
+            final FlowFileMessageBatch messageBatch = new 
FlowFileMessageBatch(session, flowFile, topic);
+            activeBatches.add(messageBatch);
 
             try (final ByteArrayOutputStream baos = new 
ByteArrayOutputStream()) {
                 session.read(flowFile, new InputStreamCallback() {
@@ -430,13 +483,12 @@ public class PutKafka extends AbstractProcessor {
 
                         boolean streamFinished = false;
 
-                        final List<KeyedMessage<byte[], byte[]>> messages = 
new ArrayList<>(); // batch to send
-                        long messageBytes = 0L; // size of messages in the 
'messages' list
-
                         int nextByte;
                         try (final InputStream bufferedIn = new 
BufferedInputStream(rawIn);
                             final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
 
+                            long messageStartOffset = in.getBytesConsumed();
+
                             // read until we're out of data.
                             while (!streamFinished) {
                                 nextByte = in.read();
@@ -457,107 +509,309 @@ public class PutKafka extends AbstractProcessor {
                                 }
 
                                 if (data != null) {
+                                    final long messageEndOffset = 
in.getBytesConsumed();
+
                                     // If the message has no data, ignore it.
                                     if (data.length != 0) {
-                                        // either we ran out of data or we 
reached the end of the message.
-                                        // Either way, create the message 
because it's ready to send.
-                                        final KeyedMessage<byte[], byte[]> 
message;
-                                        if (key == null) {
-                                            message = new 
KeyedMessage<>(topic, data);
-                                        } else {
-                                            message = new 
KeyedMessage<>(topic, keyBytes, data);
+                                        final Integer partition;
+                                        try {
+                                            partition = getPartition(context, 
flowFile, topic);
+                                        } catch (final Exception e) {
+                                            
messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e);
+                                            getLogger().error("Failed to 
obtain a partition for {} due to {}", new Object[] {flowFile, e});
+                                            continue;
                                         }
 
-                                        // Add the message to the list of 
messages ready to send. If we've reached our
-                                        // threshold of how many we're willing 
to send (or if we're out of data), go ahead
-                                        // and send the whole List.
-                                        messages.add(message);
-                                        messageBytes += data.length;
-                                        if (messageBytes >= maxBufferSize || 
streamFinished) {
-                                            // send the messages, then reset 
our state.
-                                            try {
-                                                producer.send(messages);
-                                            } catch (final Exception e) {
-                                                // we wrap the general 
exception in ProcessException because we want to separate
-                                                // failures in sending 
messages from general Exceptions that would indicate bugs
-                                                // in the Processor. Failure 
to send a message should be handled appropriately, but
-                                                // we don't want to catch the 
general Exception or RuntimeException in order to catch
-                                                // failures from Kafka's 
Producer.
-                                                throw new 
ProcessException("Failed to send messages to Kafka", e);
-                                            }
-
-                                            
messagesSent.addAndGet(messages.size());    // count number of messages sent
-
-                                            // reset state
-                                            messages.clear();
-                                            messageBytes = 0;
-
-                                            // We've successfully sent a batch 
of messages. Keep track of the byte offset in the
-                                            // FlowFile of the last 
successfully sent message. This way, if the messages cannot
-                                            // all be successfully sent, we 
know where to split off the data. This allows us to then
-                                            // split off the first X number of 
bytes and send to 'success' and then split off the rest
-                                            // and send them to 'failure'.
-                                            
lastMessageOffset.set(in.getBytesConsumed());
+
+                                        final ProducerRecord<byte[], byte[]> 
producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data);
+                                        final long rangeStart = 
messageStartOffset;
+
+                                        try {
+                                            producer.send(producerRecord, new 
Callback() {
+                                                @Override
+                                                public void onCompletion(final 
RecordMetadata metadata, final Exception exception) {
+                                                    if (exception == null) {
+                                                        // record was 
successfully sent.
+                                                        
messageBatch.addSuccessfulRange(rangeStart, messageEndOffset, 
metadata.offset());
+                                                    } else {
+                                                        
messageBatch.addFailedRange(rangeStart, messageEndOffset, exception);
+                                                    }
+                                                }
+                                            });
+
+                                            messagesSent.incrementAndGet();
+                                        } catch (final 
BufferExhaustedException bee) {
+                                            // Not enough room in the buffer. 
Add from the beginning of this message to end of FlowFile as a failed range
+                                            
messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee);
+                                            context.yield();
+                                            return;
                                         }
+
                                     }
+
                                     // reset BAOS so that we can start a new 
message.
                                     baos.reset();
                                     data = null;
-
-                                }
-                            }
-
-                            // If there are messages left, send them
-                            if (!messages.isEmpty()) {
-                                try {
-                                    messagesSent.addAndGet(messages.size());   
 // add count of messages
-                                    producer.send(messages);
-                                } catch (final Exception e) {
-                                    throw new ProcessException("Failed to send 
messages to Kafka", e);
+                                    messageStartOffset = in.getBytesConsumed();
                                 }
                             }
                         }
                     }
                 });
 
-                final long nanos = System.nanoTime() - start;
-                session.getProvenanceReporter().send(flowFile, "kafka://" + 
topic, "Sent " + messagesSent.get() + " messages");
-                session.transfer(flowFile, REL_SUCCESS);
-                getLogger().info("Successfully sent {} messages to Kafka for 
{} in {} millis", new Object[] { messagesSent.get(), flowFile, 
TimeUnit.NANOSECONDS.toMillis(nanos) });
-            } catch (final ProcessException pe) {
-                error = true;
-
-                // There was a failure sending messages to Kafka. Iff the 
lastMessageOffset is 0, then all of them failed and we can
-                // just route the FlowFile to failure. Otherwise, some 
messages were successful, so split them off and send them to
-                // 'success' while we send the others to 'failure'.
-                final long offset = lastMessageOffset.get();
-                if (offset == 0L) {
-                    // all of the messages failed to send. Route FlowFile to 
failure
-                    getLogger().error("Failed to send {} to Kafka due to {}; 
routing to fialure", new Object[] { flowFile, pe.getCause() });
-                    session.transfer(session.penalize(flowFile), REL_FAILURE);
+                messageBatch.setNumMessages(messagesSent.get());
+            }
+        }
+    }
+
+
+    private static class Range {
+        private final long start;
+        private final long end;
+        private final Long kafkaOffset;
+
+        public Range(final long start, final long end, final Long kafkaOffset) 
{
+            this.start = start;
+            this.end = end;
+            this.kafkaOffset = kafkaOffset;
+        }
+
+        public long getStart() {
+            return start;
+        }
+
+        public long getEnd() {
+            return end;
+        }
+
+        public Long getKafkaOffset() {
+            return kafkaOffset;
+        }
+
+        @Override
+        public String toString() {
+            return "Range[" + start + "-" + end + "]";
+        }
+    }
+
+    private class FlowFileMessageBatch {
+        private final ProcessSession session;
+        private final FlowFile flowFile;
+        private final String topic;
+        private final long startTime = System.nanoTime();
+
+        private final List<Range> successfulRanges = new ArrayList<>();
+        private final List<Range> failedRanges = new ArrayList<>();
+
+        private Exception lastFailureReason;
+        private long numMessages = -1L;
+        private long completeTime = 0L;
+        private boolean canceled = false;
+
+        public FlowFileMessageBatch(final ProcessSession session, final 
FlowFile flowFile, final String topic) {
+            this.session = session;
+            this.flowFile = flowFile;
+            this.topic = topic;
+        }
+
+        public synchronized void cancelOrComplete() {
+            if (isComplete()) {
+                completeSession();
+                return;
+            }
+
+            this.canceled = true;
+
+            session.rollback();
+            successfulRanges.clear();
+            failedRanges.clear();
+        }
+
+        public synchronized void addSuccessfulRange(final long start, final 
long end, final long kafkaOffset) {
+            if (canceled) {
+                return;
+            }
+
+            successfulRanges.add(new Range(start, end, kafkaOffset));
+
+            if (isComplete()) {
+                activeBatches.remove(this);
+                completeBatches.add(this);
+                completeTime = System.nanoTime();
+            }
+        }
+
+        public synchronized void addFailedRange(final long start, final long 
end, final Exception e) {
+            if (canceled) {
+                return;
+            }
+
+            failedRanges.add(new Range(start, end, null));
+            lastFailureReason = e;
+
+            if (isComplete()) {
+                activeBatches.remove(this);
+                completeBatches.add(this);
+                completeTime = System.nanoTime();
+            }
+        }
+
+        private boolean isComplete() {
+            return !canceled && (numMessages > -1) && (successfulRanges.size() 
+ failedRanges.size() >= numMessages);
+        }
+
+        public synchronized void setNumMessages(final long msgCount) {
+            this.numMessages = msgCount;
+
+            if (isComplete()) {
+                activeBatches.remove(this);
+                completeBatches.add(this);
+                completeTime = System.nanoTime();
+            }
+        }
+
+        private Long getMin(final Long a, final Long b) {
+            if (a == null && b == null) {
+                return null;
+            }
+
+            if (a == null) {
+                return b;
+            }
+
+            if (b == null) {
+                return a;
+            }
+
+            return Math.min(a, b);
+        }
+
+        private Long getMax(final Long a, final Long b) {
+            if (a == null && b == null) {
+                return null;
+            }
+
+            if (a == null) {
+                return b;
+            }
+
+            if (b == null) {
+                return a;
+            }
+
+            return Math.max(a, b);
+        }
+
+        private void transferRanges(final List<Range> ranges, final 
Relationship relationship) {
+            Collections.sort(ranges, new Comparator<Range>() {
+                @Override
+                public int compare(final Range o1, final Range o2) {
+                    return Long.compare(o1.getStart(), o2.getStart());
+                }
+            });
+
+            for (int i = 0; i < ranges.size(); i++) {
+                Range range = ranges.get(i);
+                int count = 1;
+                Long smallestKafkaOffset = range.getKafkaOffset();
+                Long largestKafkaOffset = range.getKafkaOffset();
+
+                while (i + 1 < ranges.size()) {
+                    // Check if the next range in the List continues where 
this one left off.
+                    final Range nextRange = ranges.get(i + 1);
+
+                    if (nextRange.getStart() == range.getEnd()) {
+                        // We have two ranges in a row that are contiguous; 
combine them into a single Range.
+                        range = new Range(range.getStart(), 
nextRange.getEnd(), null);
+
+                        smallestKafkaOffset = getMin(smallestKafkaOffset, 
nextRange.getKafkaOffset());
+                        largestKafkaOffset = getMax(largestKafkaOffset, 
nextRange.getKafkaOffset());
+                        count++;
+                        i++;
+                    } else {
+                        break;
+                    }
+                }
+
+                // Create a FlowFile for this range.
+                FlowFile child = session.clone(flowFile, range.getStart(), 
range.getEnd() - range.getStart());
+                if (relationship == REL_SUCCESS) {
+                    session.getProvenanceReporter().send(child, 
getTransitUri(), "Sent " + count + " messages; Kafka offsets range from " + 
smallestKafkaOffset + " to " + largestKafkaOffset);
+                    session.transfer(child, relationship);
                 } else {
-                    // Some of the messages were sent successfully. We want to 
split off the successful messages from the failed messages.
-                    final FlowFile successfulMessages = 
session.clone(flowFile, 0L, offset);
-                    final FlowFile failedMessages = session.clone(flowFile, 
offset, flowFile.getSize() - offset);
-
-                    getLogger().error("Successfully sent {} of the messages 
from {} but then failed to send the rest. Original FlowFile split into"
-                        + " two: {} routed to 'success', {} routed to 
'failure'. Failure was due to {}", new Object[] {
-                        messagesSent.get(), flowFile, successfulMessages, 
failedMessages, pe.getCause() });
-
-                    session.transfer(successfulMessages, REL_SUCCESS);
-                    session.transfer(session.penalize(failedMessages), 
REL_FAILURE);
-                    session.remove(flowFile);
-                    session.getProvenanceReporter().send(successfulMessages, 
"kafka://" + topic);
+                    session.transfer(session.penalize(child), relationship);
                 }
-            } finally {
-                if (error) {
-                    producer.close();
+            }
+        }
+
+        private String getTransitUri() {
+            final List<PartitionInfo> partitions = 
getProducer().partitionsFor(topic);
+            if (partitions.isEmpty()) {
+                return "kafka://unknown-host" + "/topics/" + topic;
+            }
+
+            final PartitionInfo info = partitions.get(0);
+            final Node leader = info.leader();
+            final String host = leader.host();
+            final int port = leader.port();
+
+            return "kafka://" + host + ":" + port + "/topics/" + topic;
+        }
+
+        public synchronized void completeSession() {
+            if (canceled) {
+                return;
+            }
+
+            if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
+                getLogger().info("Completed processing {} but sent 0 FlowFiles 
to Kafka", new Object[] {flowFile});
+                session.transfer(flowFile, REL_SUCCESS);
+                session.commit();
+                return;
+            }
+
+            if (successfulRanges.isEmpty()) {
+                getLogger().error("Failed to send {} to Kafka; routing to 
'failure'; last failure reason reported was {};", new Object[] {flowFile, 
lastFailureReason});
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
+                session.commit();
+                return;
+            }
+
+            if (failedRanges.isEmpty()) {
+                final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
+
+                if (successfulRanges.size() == 1) {
+                    final Long kafkaOffset = 
successfulRanges.get(0).getKafkaOffset();
+                    final String msg = "Sent 1 message" + ((kafkaOffset == 
null) ? "" : ("; Kafka offset = " + kafkaOffset));
+                    session.getProvenanceReporter().send(flowFile, 
getTransitUri(), msg);
                 } else {
-                    returnProducer(producer);
+                    long smallestKafkaOffset = 
successfulRanges.get(0).getKafkaOffset();
+                    long largestKafkaOffset = 
successfulRanges.get(0).getKafkaOffset();
+
+                    for (final Range range : successfulRanges) {
+                        smallestKafkaOffset = Math.min(smallestKafkaOffset, 
range.getKafkaOffset());
+                        largestKafkaOffset = Math.max(largestKafkaOffset, 
range.getKafkaOffset());
+                    }
+
+                    session.getProvenanceReporter().send(flowFile, 
getTransitUri(),
+                        "Sent " + successfulRanges.size() + " messages; Kafka 
offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset);
                 }
+
+                session.transfer(flowFile, REL_SUCCESS);
+                getLogger().info("Successfully sent {} messages to Kafka for 
{} in {} millis", new Object[] {successfulRanges.size(), flowFile, 
transferMillis});
+                session.commit();
+                return;
             }
 
+            // At this point, the successful ranges is not empty and the 
failed ranges is not empty. This indicates that some messages made their way to 
Kafka
+            // successfully and some failed. We will address this by splitting 
apart the source FlowFile into children and sending the successful messages to 
'success'
+            // and the failed messages to 'failure'.
+            transferRanges(successfulRanges, REL_SUCCESS);
+            transferRanges(failedRanges, REL_FAILURE);
+            session.remove(flowFile);
+            getLogger().error("Successfully sent {} messages to Kafka but 
failed to send {} messages; the last error received was {}",
+                new Object[] {successfulRanges.size(), failedRanges.size(), 
lastFailureReason});
+            session.commit();
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/22de23ba/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 750d406..17d1cc8 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -22,27 +22,33 @@ import static org.junit.Assert.assertTrue;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import kafka.common.FailedToSendMessageException;
-import kafka.javaapi.producer.Producer;
-import kafka.message.CompressionCodec;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.processor.ProcessContext;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.producer.BufferExhaustedException;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.Ignore;
+import org.junit.Assert;
 import org.junit.Test;
 
-import scala.collection.Seq;
+import kafka.common.FailedToSendMessageException;
+
 
 public class TestPutKafka {
 
@@ -56,24 +62,19 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
 
         runner.enqueue("Hello 
World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes());
-        runner.run();
+        runner.run(2); // we have to run twice because the first iteration 
will result in data being added to a queue in the processor; the second 
onTrigger call will transfer FlowFiles.
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
 
-        final List<byte[]> messages = proc.getProducer().getMessages();
+        final List<ProducerRecord<byte[], byte[]>> messages = ((MockProducer) 
proc.getProducer()).getMessages();
         assertEquals(11, messages.size());
 
-        assertTrue(Arrays.equals("Hello 
World".getBytes(StandardCharsets.UTF_8), messages.get(0)));
-        assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), 
messages.get(1)));
-        assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), 
messages.get(2)));
-        assertTrue(Arrays.equals("2".getBytes(StandardCharsets.UTF_8), 
messages.get(3)));
-        assertTrue(Arrays.equals("3".getBytes(StandardCharsets.UTF_8), 
messages.get(4)));
-        assertTrue(Arrays.equals("4".getBytes(StandardCharsets.UTF_8), 
messages.get(5)));
-        assertTrue(Arrays.equals("5".getBytes(StandardCharsets.UTF_8), 
messages.get(6)));
-        assertTrue(Arrays.equals("6".getBytes(StandardCharsets.UTF_8), 
messages.get(7)));
-        assertTrue(Arrays.equals("7".getBytes(StandardCharsets.UTF_8), 
messages.get(8)));
-        assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), 
messages.get(9)));
-        assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), 
messages.get(10)));
+        assertTrue(Arrays.equals("Hello 
World".getBytes(StandardCharsets.UTF_8), messages.get(0).value()));
+        assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), 
messages.get(1).value()));
+
+        for (int i = 1; i <= 9; i++) {
+            
assertTrue(Arrays.equals(String.valueOf(i).getBytes(StandardCharsets.UTF_8), 
messages.get(i + 1).value()));
+        }
     }
 
     @Test
@@ -87,7 +88,7 @@ public class TestPutKafka {
 
         final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9";
         runner.enqueue(text.getBytes());
-        runner.run();
+        runner.run(2);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
         final MockFlowFile mff = 
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
@@ -96,7 +97,7 @@ public class TestPutKafka {
 
     @Test
     public void testPartialFailure() {
-        final TestableProcessor proc = new TestableProcessor(2);
+        final TestableProcessor proc = new TestableProcessor(2); // fail after 
sending 2 messages.
         final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(PutKafka.TOPIC, "topic1");
         runner.setProperty(PutKafka.KEY, "key1");
@@ -106,7 +107,7 @@ public class TestPutKafka {
 
         final byte[] bytes = "1\n2\n3\n4".getBytes();
         runner.enqueue(bytes);
-        runner.run();
+        runner.run(2);
 
         runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
         runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
@@ -119,6 +120,39 @@ public class TestPutKafka {
     }
 
     @Test
+    public void testPartialFailureWithSuccessBeforeAndAfter() {
+        final TestableProcessor proc = new TestableProcessor(2, 4); // fail 
after sending 2 messages, then stop failing after 4
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
+
+        final byte[] bytes = "1\n2\n3\n4\n5\n6".getBytes();
+        runner.enqueue(bytes);
+        runner.run(2);
+
+        runner.assertTransferCount(PutKafka.REL_SUCCESS, 2);
+        runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
+
+        final List<MockFlowFile> success = 
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
+        for (final MockFlowFile successFF : success) {
+            if ('1' == successFF.toByteArray()[0]) {
+                successFF.assertContentEquals("1\n2\n");
+            } else if ('5' == successFF.toByteArray()[0]) {
+                successFF.assertContentEquals("5\n6");
+            } else {
+                Assert.fail("Wrong content for FlowFile; contained " + new 
String(successFF.toByteArray()));
+            }
+        }
+
+        final MockFlowFile failureFF = 
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
+        failureFF.assertContentEquals("3\n4\n");
+    }
+
+
+    @Test
     public void testWithEmptyMessages() {
         final TestableProcessor proc = new TestableProcessor();
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -129,16 +163,16 @@ public class TestPutKafka {
 
         final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
         runner.enqueue(bytes);
-        runner.run();
+        runner.run(2);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
 
-        final List<byte[]> msgs = proc.getProducer().getMessages();
+        final List<ProducerRecord<byte[], byte[]>> msgs = ((MockProducer) 
proc.getProducer()).getMessages();
         assertEquals(4, msgs.size());
-        assertTrue(Arrays.equals("1".getBytes(), msgs.get(0)));
-        assertTrue(Arrays.equals("2".getBytes(), msgs.get(1)));
-        assertTrue(Arrays.equals("3".getBytes(), msgs.get(2)));
-        assertTrue(Arrays.equals("4".getBytes(), msgs.get(3)));
+
+        for (int i = 1; i <= 4; i++) {
+            assertTrue(Arrays.equals(String.valueOf(i).getBytes(), msgs.get(i 
- 1).value()));
+        }
     }
 
     @Test
@@ -154,14 +188,14 @@ public class TestPutKafka {
 
         final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
         runner.enqueue(bytes);
-        runner.run();
+        runner.run(2);
 
         final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
         assertEquals(1, events.size());
         final ProvenanceEventRecord event = events.get(0);
         assertEquals(ProvenanceEventType.SEND, event.getEventType());
-        assertEquals("kafka://topic1", event.getTransitUri());
-        assertEquals("Sent 4 messages", event.getDetails());
+        assertEquals("kafka://localhost:1111/topics/topic1", 
event.getTransitUri());
+        assertTrue(event.getDetails().startsWith("Sent 4 messages"));
     }
 
     @Test
@@ -175,265 +209,270 @@ public class TestPutKafka {
 
         final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
         runner.enqueue(bytes);
-        runner.run();
+        runner.run(2);
 
         final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
         assertEquals(1, events.size());
         final ProvenanceEventRecord event = events.get(0);
         assertEquals(ProvenanceEventType.SEND, event.getEventType());
-        assertEquals("kafka://topic1", event.getTransitUri());
-    }
-
-    @Test
-    @Ignore("Intended only for local testing; requires an actual running 
instance of Kafka & ZooKeeper...")
-    public void testKeyValuePut() {
-        final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
-        runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
-        runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
-        runner.setProperty(PutKafka.KEY, "${kafka.key}");
-        runner.setProperty(PutKafka.TIMEOUT, "3 secs");
-        runner.setProperty(PutKafka.DELIVERY_GUARANTEE, 
PutKafka.DELIVERY_REPLICATED.getValue());
-
-        keyValuePutExecute(runner);
+        assertEquals("kafka://localhost:1111/topics/topic1", 
event.getTransitUri());
     }
 
     @Test
-    @Ignore("Intended only for local testing; requires an actual running 
instance of Kafka & ZooKeeper...")
-    public void testKeyValuePutAsync() {
-        final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
-        runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
-        runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
-        runner.setProperty(PutKafka.KEY, "${kafka.key}");
-        runner.setProperty(PutKafka.TIMEOUT, "3 secs");
-        runner.setProperty(PutKafka.PRODUCER_TYPE, 
PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
-        runner.setProperty(PutKafka.DELIVERY_GUARANTEE, 
PutKafka.DELIVERY_REPLICATED.getValue());
-
-        keyValuePutExecute(runner);
-    }
+    public void testRoundRobinAcrossMultipleMessages() {
+        final TestableProcessor proc = new TestableProcessor();
 
-    private void keyValuePutExecute(final TestRunner runner) {
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put("kafka.topic", "test");
-        attributes.put("kafka.key", "key3");
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.PARTITION_STRATEGY, 
PutKafka.ROUND_ROBIN_PARTITIONING);
 
-        final byte[] data = "Hello, World, Again! ;)".getBytes();
-        runner.enqueue(data, attributes);
-        runner.enqueue(data, attributes);
-        runner.enqueue(data, attributes);
-        runner.enqueue(data, attributes);
+        runner.enqueue("hello".getBytes());
+        runner.enqueue("there".getBytes());
+        runner.enqueue("how are you".getBytes());
+        runner.enqueue("today".getBytes());
 
         runner.run(5);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
-        final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
-        final MockFlowFile mff = mffs.get(0);
 
-        assertTrue(Arrays.equals(data, mff.toByteArray()));
+        final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) 
proc.getProducer()).getMessages();
+        for (int i = 0; i < 3; i++) {
+            assertEquals(i + 1, records.get(i).partition().intValue());
+        }
+
+        assertEquals(1, records.get(3).partition().intValue());
     }
 
     @Test
-    public void testProducerConfigDefault() {
-
-        final TestableProcessor processor = new TestableProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(processor);
+    public void testRoundRobinAcrossMultipleMessagesInSameFlowFile() {
+        final TestableProcessor proc = new TestableProcessor();
 
+        final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(PutKafka.TOPIC, "topic1");
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        runner.setProperty(PutKafka.PARTITION_STRATEGY, 
PutKafka.ROUND_ROBIN_PARTITIONING);
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
 
-        final ProcessContext context = runner.getProcessContext();
-        final ProducerConfig config = processor.createConfig(context);
+        runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes());
 
-        // Check the codec
-        final CompressionCodec codec = config.compressionCodec();
-        assertTrue(codec instanceof kafka.message.NoCompressionCodec$);
+        runner.run(2);
 
-        // Check compressed topics
-        final Seq<String> compressedTopics = config.compressedTopics();
-        assertEquals(0, compressedTopics.size());
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
 
-        // Check the producer type
-        final String actualProducerType = config.producerType();
-        assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), 
actualProducerType);
+        final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) 
proc.getProducer()).getMessages();
+        for (int i = 0; i < 3; i++) {
+            assertEquals(i + 1, records.get(i).partition().intValue());
+        }
 
+        assertEquals(1, records.get(3).partition().intValue());
     }
 
-    @Test
-    public void testProducerConfigAsyncWithCompression() {
 
-        final TestableProcessor processor = new TestableProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(processor);
+    @Test
+    public void testUserDefinedPartition() {
+        final TestableProcessor proc = new TestableProcessor();
 
+        final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(PutKafka.TOPIC, "topic1");
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-        runner.setProperty(PutKafka.PRODUCER_TYPE, 
PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
-        runner.setProperty(PutKafka.COMPRESSION_CODEC, 
PutKafka.COMPRESSION_CODEC_SNAPPY.getValue());
-        runner.setProperty(PutKafka.COMPRESSED_TOPICS, 
"topic01,topic02,topic03");
+        runner.setProperty(PutKafka.PARTITION_STRATEGY, 
PutKafka.USER_DEFINED_PARTITIONING);
+        runner.setProperty(PutKafka.PARTITION, "${part}");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
 
-        final ProcessContext context = runner.getProcessContext();
-        final ProducerConfig config = processor.createConfig(context);
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("part", "3");
+        runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs);
 
-        // Check that the codec is snappy
-        final CompressionCodec codec = config.compressionCodec();
-        assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$);
+        runner.run(2);
 
-        // Check compressed topics
-        final Seq<String> compressedTopics = config.compressedTopics();
-        assertEquals(3, compressedTopics.size());
-        assertTrue(compressedTopics.contains("topic01"));
-        assertTrue(compressedTopics.contains("topic02"));
-        assertTrue(compressedTopics.contains("topic03"));
-
-        // Check the producer type
-        final String actualProducerType = config.producerType();
-        assertEquals("async", actualProducerType);
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
 
+        final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) 
proc.getProducer()).getMessages();
+        for (int i = 0; i < 4; i++) {
+            assertEquals(3, records.get(i).partition().intValue());
+        }
     }
 
-    @Test
-    public void testProducerConfigAsyncQueueThresholds() {
 
-        final TestableProcessor processor = new TestableProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(processor);
 
+    @Test
+    public void testUserDefinedPartitionWithInvalidValue() {
+        final TestableProcessor proc = new TestableProcessor();
+
+        final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(PutKafka.TOPIC, "topic1");
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-        runner.setProperty(PutKafka.PRODUCER_TYPE, 
PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
-        runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX, "7 secs");
-        runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535");
-        runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms");
+        runner.setProperty(PutKafka.PARTITION_STRATEGY, 
PutKafka.USER_DEFINED_PARTITIONING);
+        runner.setProperty(PutKafka.PARTITION, "${part}");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
 
-        final ProcessContext context = runner.getProcessContext();
-        final ProducerConfig config = processor.createConfig(context);
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("part", "bogus");
+        runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs);
 
-        // Check that the queue thresholds were properly translated
-        assertEquals(7000, config.queueBufferingMaxMs());
-        assertEquals(535, config.queueBufferingMaxMessages());
-        assertEquals(200, config.queueEnqueueTimeoutMs());
+        runner.run(2);
 
-        // Check the producer type
-        final String actualProducerType = config.producerType();
-        assertEquals("async", actualProducerType);
-
-    }
-
-    @Test
-    public void testProducerConfigInvalidBatchSize() {
-
-        final TestableProcessor processor = new TestableProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(processor);
-
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-        runner.setProperty(PutKafka.PRODUCER_TYPE, 
PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
-        runner.setProperty(PutKafka.BATCH_NUM_MESSAGES, "200");
-        runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "100");
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
 
-        runner.assertNotValid();
+        final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) 
proc.getProducer()).getMessages();
+        // should all be the same partition, regardless of what partition it 
is.
+        final int partition = records.get(0).partition().intValue();
 
+        for (int i = 0; i < 4; i++) {
+            assertEquals(partition, records.get(i).partition().intValue());
+        }
     }
 
-    @Test
-    public void testProducerConfigAsyncDefaultEnqueueTimeout() {
 
-        final TestableProcessor processor = new TestableProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(processor);
+    @Test
+    public void testFullBuffer() {
+        final TestableProcessor proc = new TestableProcessor();
 
+        final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(PutKafka.TOPIC, "topic1");
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-        runner.setProperty(PutKafka.PRODUCER_TYPE, 
PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
-        // Do not set QUEUE_ENQUEUE_TIMEOUT
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
+        runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "5 B");
+        proc.setMaxQueueSize(10L); // will take 4 bytes for key and 1 byte for 
value.
 
-        final ProcessContext context = runner.getProcessContext();
-        final ProducerConfig config = processor.createConfig(context);
+        runner.enqueue("1\n2\n3\n4\n".getBytes());
+        runner.run(2);
 
-        // Check that the enqueue timeout defaults to -1
-        assertEquals(-1, config.queueEnqueueTimeoutMs());
-
-        // Check the producer type
-        final String actualProducerType = config.producerType();
-        assertEquals("async", actualProducerType);
+        runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
 
+        
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0).assertContentEquals("1\n2\n");
+        
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0).assertContentEquals("3\n4\n");
     }
 
-    private static class TestableProcessor extends PutKafka {
 
-        private MockProducer producer;
-        private int failAfter = Integer.MAX_VALUE;
+    /**
+     * Used to override the {@link #getProducer()} method so that we can 
enforce that our MockProducer is used
+     */
+    private static class TestableProcessor extends PutKafka {
+        private final MockProducer producer;
 
         public TestableProcessor() {
+            this(null);
         }
 
-        public TestableProcessor(final int failAfter) {
-            this.failAfter = failAfter;
+        public TestableProcessor(final Integer failAfter) {
+            this(failAfter, null);
         }
 
-        @OnScheduled
-        public void instantiateProducer(final ProcessContext context) {
-            producer = new MockProducer(createConfig(context));
+        public TestableProcessor(final Integer failAfter, final Integer 
stopFailingAfter) {
+            producer = new MockProducer();
             producer.setFailAfter(failAfter);
+            producer.setStopFailingAfter(stopFailingAfter);
         }
 
         @Override
-        protected Producer<byte[], byte[]> createProducer(final ProcessContext 
context) {
-            return producer;
-        }
-
-        public MockProducer getProducer() {
+        protected Producer<byte[], byte[]> getProducer() {
             return producer;
         }
 
-        /**
-         * Exposed for test verification
-         */
-        @Override
-        public ProducerConfig createConfig(final ProcessContext context) {
-            return super.createConfig(context);
+        public void setMaxQueueSize(final long bytes) {
+            producer.setMaxQueueSize(bytes);
         }
     }
 
-    private static class MockProducer extends Producer<byte[], byte[]> {
+
+    /**
+     * We have our own Mock Producer, which is very similar to the 
Kafka-supplied one. However, with the Kafka-supplied
+     * Producer, we don't have the ability to tell it to fail after X number 
of messages; rather, we can only tell it
+     * to fail on the next message. Since we are sending multiple messages in 
a single onTrigger call for the Processor,
+     * this doesn't allow us to test failure conditions adequately.
+     */
+    private static class MockProducer implements Producer<byte[], byte[]> {
 
         private int sendCount = 0;
-        private int failAfter = Integer.MAX_VALUE;
+        private Integer failAfter;
+        private Integer stopFailingAfter;
+        private long queueSize = 0L;
+        private long maxQueueSize = Long.MAX_VALUE;
 
-        private final List<byte[]> messages = new ArrayList<>();
+        private final List<ProducerRecord<byte[], byte[]>> messages = new 
ArrayList<>();
 
-        public MockProducer(final ProducerConfig config) {
-            super(config);
+        public MockProducer() {
+        }
+
+        public void setMaxQueueSize(final long bytes) {
+            this.maxQueueSize = bytes;
+        }
+
+        public List<ProducerRecord<byte[], byte[]>> getMessages() {
+            return messages;
+        }
+
+        public void setFailAfter(final Integer successCount) {
+            failAfter = successCount;
+        }
+
+        public void setStopFailingAfter(final Integer stopFailingAfter) {
+            this.stopFailingAfter = stopFailingAfter;
         }
 
         @Override
-        public void send(final KeyedMessage<byte[], byte[]> message) {
-            if (++sendCount > failAfter) {
-                throw new FailedToSendMessageException("Failed to send 
message", new RuntimeException("Unit test told to fail after " + failAfter + " 
successful messages"));
+        public Future<RecordMetadata> send(final ProducerRecord<byte[], 
byte[]> record) {
+            return send(record, null);
+        }
+
+        @Override
+        public Future<RecordMetadata> send(final ProducerRecord<byte[], 
byte[]> record, final Callback callback) {
+            sendCount++;
+
+            final ByteArraySerializer serializer = new ByteArraySerializer();
+            final int keyBytes = serializer.serialize(record.topic(), 
record.key()).length;
+            final int valueBytes = serializer.serialize(record.topic(), 
record.value()).length;
+            if (maxQueueSize - queueSize < keyBytes + valueBytes) {
+                throw new BufferExhaustedException("Queue size is " + 
queueSize + " but serialized message is " + (keyBytes + valueBytes));
+            }
+
+            queueSize += keyBytes + valueBytes;
+
+            if (failAfter != null && sendCount > failAfter && 
((stopFailingAfter == null) || (sendCount < stopFailingAfter + 1))) {
+                final Exception e = new FailedToSendMessageException("Failed 
to send message", new RuntimeException("Unit test told to fail after " + 
failAfter + " successful messages"));
+                callback.onCompletion(null, e);
             } else {
-                messages.add(message.message());
+                messages.add(record);
+                final RecordMetadata meta = new RecordMetadata(new 
TopicPartition(record.topic(), record.partition() == null ? 1 : 
record.partition()), 0L, 0L);
+                callback.onCompletion(meta, null);
             }
+
+            // we don't actually look at the Future in the processor, so we 
can just return null
+            return null;
         }
 
-        public List<byte[]> getMessages() {
-            return messages;
+        @Override
+        public List<PartitionInfo> partitionsFor(String topic) {
+            final Node leader = new Node(1, "localhost", 1111);
+            final Node node2 = new Node(2, "localhost-2", 2222);
+            final Node node3 = new Node(3, "localhost-3", 3333);
+
+            final PartitionInfo partInfo1 = new PartitionInfo(topic, 1, 
leader, new Node[] {node2, node3}, new Node[0]);
+            final PartitionInfo partInfo2 = new PartitionInfo(topic, 2, 
leader, new Node[] {node2, node3}, new Node[0]);
+            final PartitionInfo partInfo3 = new PartitionInfo(topic, 3, 
leader, new Node[] {node2, node3}, new Node[0]);
+
+            final List<PartitionInfo> infos = new ArrayList<>(3);
+            infos.add(partInfo1);
+            infos.add(partInfo2);
+            infos.add(partInfo3);
+            return infos;
         }
 
         @Override
-        public void send(final List<KeyedMessage<byte[], byte[]>> messages) {
-            for (final KeyedMessage<byte[], byte[]> msg : messages) {
-                send(msg);
-            }
+        public Map<MetricName, ? extends Metric> metrics() {
+            return Collections.emptyMap();
         }
 
-        public void setFailAfter(final int successCount) {
-            failAfter = successCount;
+        @Override
+        public void close() {
         }
     }
 

Reply via email to