http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
new file mode 100644
index 0000000..e29f2af
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.xml.bind.DatatypeConverter;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.10"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka using the Kafka 0.10 producer. "
+        + "The messages to send may be individual FlowFiles or may be 
delimited, using a "
+        + "user-specified delimiter, such as a new-line. "
+        + " Please note there are cases where the publisher can get into an 
indefinite stuck state.  We are closely monitoring"
+        + " how this evolves in the Kafka community and will take advantage of 
those fixes as soon as we can.  In the mean time"
+        + " it is possible to enter states where the only resolution will be 
to restart the JVM NiFi runs on.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
+        description = "These properties will be added on the Kafka 
configuration after loading any provided configuration properties."
+        + " In the event a dynamic property represents a property that was 
already set, its value will be ignored and WARN message logged."
+        + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration. ")
+public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id";
+
+    protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx";
+
+    protected static final String FAILED_TOPIC_ATTR = "failed.topic";
+
+    protected static final String FAILED_KEY_ATTR = "failed.key";
+
+    protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter";
+
+    protected static final String MSG_COUNT = "msg.count";
+
+    static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("all", "Guarantee Replicated Delivery",
+            "FlowFile will be routed to failure unless the message is 
replicated to the appropriate "
+            + "number of Kafka Nodes according to the Topic configuration");
+    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", 
"Guarantee Single Node Delivery",
+            "FlowFile will be routed to success if the message is received by 
a single Kafka node, "
+            + "whether or not it is replicated. This is faster than <Guarantee 
Replicated Delivery> "
+            + "but can result in data loss if a Kafka node crashes");
+    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", 
"Best Effort",
+            "FlowFile will be routed to success after successfully writing the 
content to a Kafka node, "
+            + "without waiting for a response. This provides the best 
performance but may result in data loss.");
+
+    static final AllowableValue ROUND_ROBIN_PARTITIONING = new 
AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(),
+            Partitioners.RoundRobinPartitioner.class.getSimpleName(),
+            "Messages will be assigned partitions in a round-robin fashion, 
sending the first message to Partition 1, "
+            + "the next Partition to Partition 2, and so on, wrapping as 
necessary.");
+    static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+            "DefaultPartitioner", "Messages will be assigned to random 
partitions.");
+
+    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+            .name("topic")
+            .displayName("Topic Name")
+            .description("The name of the Kafka Topic to publish to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    static final PropertyDescriptor DELIVERY_GUARANTEE = new 
PropertyDescriptor.Builder()
+            .name(ProducerConfig.ACKS_CONFIG)
+            .displayName("Delivery Guarantee")
+            .description("Specifies the requirement for guaranteeing that a 
message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, 
DELIVERY_REPLICATED)
+            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+            .build();
+
+    static final PropertyDescriptor META_WAIT_TIME = new 
PropertyDescriptor.Builder()
+            .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
+            .displayName("Meta Data Wait Time")
+            .description("The amount of time KafkaConsumer will wait to obtain 
metadata during the 'send' call before failing the "
+                    + "entire 'send' call. Corresponds to Kafka's 
'max.block.ms' property")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("30 sec")
+            .build();
+
+    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+            .name("kafka-key")
+            .displayName("Kafka Key")
+            .description("The Key to use for the Message.  It will be 
serialized as UTF-8 bytes. "
+                    + "If not specified then the flow file attribute 
kafka.key.hex is used if present "
+                    + "and we're not demarcating. In that case the hex string 
is coverted to its byte"
+                    + "form and written as a byte[] key.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
+            .name("message-demarcator")
+            .displayName("Message Demarcator")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .description("Specifies the string (interpreted as UTF-8) to use 
for demarcating multiple messages within "
+                    + "a single FlowFile. If not specified, the entire content 
of the FlowFile will be used as a single message. If specified, the "
+                    + "contents of the FlowFile will be split on this 
delimiter and each section sent as a separate Kafka message. "
+                    + "To enter special character such as 'new line' use 
CTRL+Enter or Shift+Enter depending on your OS.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_CLASS = new 
PropertyDescriptor.Builder()
+            .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
+            .displayName("Partitioner class")
+            .description("Specifies which class to use to compute a partition 
id for a message. Corresponds to Kafka's 'partitioner.class' property.")
+            .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+            .defaultValue(RANDOM_PARTITIONING.getValue())
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+            .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
+            .displayName("Compression Type")
+            .description("This parameter allows you to specify the compression 
codec for all data generated by this producer.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues("none", "gzip", "snappy", "lz4")
+            .defaultValue("none")
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles for which all content was sent to Kafka.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Any FlowFile that cannot be sent to Kafka will be 
routed to this Relationship")
+            .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS;
+
+    static final Set<Relationship> RELATIONSHIPS;
+
+    private volatile String brokers;
+
+    private final AtomicInteger taskCounter = new AtomicInteger();
+
+    private volatile boolean acceptTask = true;
+
+    /*
+     * Will ensure that list of PropertyDescriptors is build only once, since
+     * all other lifecycle methods are invoked multiple times.
+     */
+    static {
+        final List<PropertyDescriptor> _descriptors = new ArrayList<>();
+        
_descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
+        _descriptors.add(TOPIC);
+        _descriptors.add(DELIVERY_GUARANTEE);
+        _descriptors.add(KEY);
+        _descriptors.add(MESSAGE_DEMARCATOR);
+        _descriptors.add(META_WAIT_TIME);
+        _descriptors.add(PARTITION_CLASS);
+        _descriptors.add(COMPRESSION_CODEC);
+
+        DESCRIPTORS = Collections.unmodifiableList(_descriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
+                .name(propertyDescriptorName).addValidator(new 
KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)).dynamic(true)
+                .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+    }
+
+    volatile KafkaPublisher kafkaPublisher;
+
+    /**
+     * This thread-safe operation will delegate to
+     * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after first
+     * checking and creating (if necessary) Kafka resource which could be 
either
+     * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close and
+     * destroy the underlying Kafka resource upon catching an {@link Exception}
+     * raised by {@link #rendezvousWithKafka(ProcessContext, ProcessSession)}.
+     * After Kafka resource is destroyed it will be re-created upon the next
+     * invocation of this operation essentially providing a self healing
+     * mechanism to deal with potentially corrupted resource.
+     * <p>
+     * Keep in mind that upon catching an exception the state of this processor
+     * will be set to no longer accept any more tasks, until Kafka resource is
+     * reset. This means that in a multi-threaded situation currently executing
+     * tasks will be given a chance to complete while no new tasks will be
+     * accepted.
+     *
+     * @param context context
+     * @param sessionFactory factory
+     */
+    @Override
+    public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
+        if (this.acceptTask) { // acts as a circuit breaker to allow existing 
tasks to wind down so 'kafkaPublisher' can be reset before new tasks are 
accepted.
+            this.taskCounter.incrementAndGet();
+            final ProcessSession session = sessionFactory.createSession();
+            try {
+                /*
+                 * We can't be doing double null check here since as a pattern
+                 * it only works for lazy init but not reset, which is what we
+                 * are doing here. In fact the first null check is dangerous
+                 * since 'kafkaPublisher' can become null right after its null
+                 * check passed causing subsequent NPE.
+                 */
+                synchronized (this) {
+                    if (this.kafkaPublisher == null) {
+                        this.kafkaPublisher = this.buildKafkaResource(context, 
session);
+                    }
+                }
+
+                /*
+                 * The 'processed' boolean flag does not imply any failure or 
success. It simply states that:
+                 * - ConsumeKafka - some messages were received form Kafka and 
1_ FlowFile were generated
+                 * - PublishKafka0_10 - some messages were sent to Kafka based 
on existence of the input FlowFile
+                 */
+                boolean processed = this.rendezvousWithKafka(context, session);
+                session.commit();
+                if (!processed) {
+                    context.yield();
+                }
+            } catch (Throwable e) {
+                this.acceptTask = false;
+                session.rollback(true);
+                this.getLogger().error("{} failed to process due to {}; 
rolling back session", new Object[]{this, e});
+            } finally {
+                synchronized (this) {
+                    if (this.taskCounter.decrementAndGet() == 0 && 
!this.acceptTask) {
+                        this.close();
+                        this.acceptTask = true;
+                    }
+                }
+            }
+        } else {
+            this.logger.debug("Task was not accepted due to the processor 
being in 'reset' state. It will be re-submitted upon completion of the reset.");
+            this.getLogger().debug("Task was not accepted due to the processor 
being in 'reset' state. It will be re-submitted upon completion of the reset.");
+            context.yield();
+        }
+    }
+
+    /**
+     * Will call {@link Closeable#close()} on the target resource after which
+     * the target resource will be set to null. Should only be called when 
there
+     * are no more threads being executed on this processor or when it has been
+     * verified that only a single thread remains.
+     *
+     * @see KafkaPublisher
+     * @see KafkaConsumer
+     */
+    @OnStopped
+    public void close() {
+        try {
+            if (this.kafkaPublisher != null) {
+                try {
+                    this.kafkaPublisher.close();
+                } catch (Exception e) {
+                    this.getLogger().warn("Failed while closing " + 
this.kafkaPublisher, e);
+                }
+            }
+        } finally {
+            this.kafkaPublisher = null;
+        }
+    }
+
+    /**
+     * Will rendezvous with Kafka if {@link ProcessSession} contains
+     * {@link FlowFile} producing a result {@link FlowFile}.
+     * <br>
+     * The result {@link FlowFile} that is successful is then transfered to
+     * {@link #REL_SUCCESS}
+     * <br>
+     * The result {@link FlowFile} that is failed is then transfered to
+     * {@link #REL_FAILURE}
+     *
+     */
+    protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile != null) {
+            long start = System.nanoTime();
+            flowFile = this.doRendezvousWithKafka(flowFile, context, session);
+            Relationship relationship = REL_SUCCESS;
+            if (!this.isFailedFlowFile(flowFile)) {
+                String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+                long executionDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                String transitUri = 
KafkaProcessorUtils.buildTransitURI(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(),
 this.brokers, topic);
+                session.getProvenanceReporter().send(flowFile, transitUri, 
"Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", 
executionDuration);
+                this.getLogger().debug("Successfully sent {} to Kafka as {} 
message(s) in {} millis",
+                        new Object[]{flowFile, 
flowFile.getAttribute(MSG_COUNT), executionDuration});
+            } else {
+                relationship = REL_FAILURE;
+                flowFile = session.penalize(flowFile);
+            }
+            session.transfer(flowFile, relationship);
+        }
+        return flowFile != null;
+    }
+
+    /**
+     * Builds and instance of {@link KafkaPublisher}.
+     */
+    protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session) {
+        final Map<String, String> kafkaProps = new HashMap<>();
+        KafkaProcessorUtils.buildCommonKafkaProperties(context, 
ProducerConfig.class, kafkaProps);
+        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        this.brokers = 
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final Properties props = new Properties();
+        props.putAll(kafkaProps);
+        KafkaPublisher publisher = new KafkaPublisher(props, this.getLogger());
+        return publisher;
+    }
+
+    /**
+     * Will rendezvous with {@link KafkaPublisher} after building
+     * {@link PublishingContext} and will produce the resulting
+     * {@link FlowFile}. The resulting FlowFile contains all required
+     * information to determine if message publishing originated from the
+     * provided FlowFile has actually succeeded fully, partially or failed
+     * completely (see {@link #isFailedFlowFile(FlowFile)}.
+     */
+    private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final 
ProcessContext context, final ProcessSession session) {
+        final AtomicReference<KafkaPublisher.KafkaPublisherResult> 
publishResultRef = new AtomicReference<>();
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(InputStream contentStream) throws IOException {
+                PublishingContext publishingContext = 
PublishKafka_0_10.this.buildPublishingContext(flowFile, context, contentStream);
+                KafkaPublisher.KafkaPublisherResult result = 
PublishKafka_0_10.this.kafkaPublisher.publish(publishingContext);
+                publishResultRef.set(result);
+            }
+        });
+
+        FlowFile resultFile = publishResultRef.get().isAllAcked()
+                ? this.cleanUpFlowFileIfNecessary(flowFile, session)
+                : session.putAllAttributes(flowFile, 
this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(),
 flowFile, context));
+
+        if (!this.isFailedFlowFile(resultFile)) {
+            resultFile = session.putAttribute(resultFile, MSG_COUNT, 
String.valueOf(publishResultRef.get().getMessagesSent()));
+        }
+        return resultFile;
+    }
+
+    /**
+     * Builds {@link PublishingContext} for message(s) to be sent to Kafka.
+     * {@link PublishingContext} contains all contextual information required 
by
+     * {@link KafkaPublisher} to publish to Kafka. Such information contains
+     * things like topic name, content stream, delimiter, key and last ACKed
+     * message for cases where provided FlowFile is being retried (failed in 
the
+     * past).
+     * <br>
+     * For the clean FlowFile (file that has been sent for the first time),
+     * PublishingContext will be built form {@link ProcessContext} associated
+     * with this invocation.
+     * <br>
+     * For the failed FlowFile, {@link PublishingContext} will be built from
+     * attributes of that FlowFile which by then will already contain required
+     * information (e.g., topic, key, delimiter etc.). This is required to
+     * ensure the affinity of the retry in the even where processor
+     * configuration has changed. However keep in mind that failed FlowFile is
+     * only considered a failed FlowFile if it is being re-processed by the 
same
+     * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
+     * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent 
to
+     * another PublishKafka0_10 processor it is treated as a fresh FlowFile
+     * regardless if it has #FAILED* attributes set.
+     */
+    private PublishingContext buildPublishingContext(FlowFile flowFile, 
ProcessContext context, InputStream contentStream) {
+        String topicName;
+        byte[] keyBytes;
+        byte[] delimiterBytes = null;
+        int lastAckedMessageIndex = -1;
+        if (this.isFailedFlowFile(flowFile)) {
+            lastAckedMessageIndex = 
Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
+            topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
+            keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null
+                    ? 
flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null;
+            delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != 
null
+                    ? 
flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : 
null;
+
+        } else {
+            topicName = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+            String _key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+            keyBytes = _key == null ? null : 
_key.getBytes(StandardCharsets.UTF_8);
+            String keyHex = 
flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY_HEX);
+            if (_key == null && keyHex != null && 
KafkaProcessorUtils.HEX_KEY_PATTERN.matcher(keyHex).matches()) {
+                keyBytes = DatatypeConverter.parseHexBinary(keyHex);
+            }
+            delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? 
context.getProperty(MESSAGE_DEMARCATOR)
+                    
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8)
 : null;
+        }
+
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+        publishingContext.setKeyBytes(keyBytes);
+        publishingContext.setDelimiterBytes(delimiterBytes);
+        return publishingContext;
+    }
+
+    /**
+     * Will remove FAILED_* attributes if FlowFile is no longer considered a
+     * failed FlowFile
+     *
+     * @see #isFailedFlowFile(FlowFile)
+     */
+    private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, 
ProcessSession session) {
+        if (this.isFailedFlowFile(flowFile)) {
+            Set<String> keysToRemove = new HashSet<>();
+            keysToRemove.add(FAILED_DELIMITER_ATTR);
+            keysToRemove.add(FAILED_KEY_ATTR);
+            keysToRemove.add(FAILED_TOPIC_ATTR);
+            keysToRemove.add(FAILED_PROC_ID_ATTR);
+            keysToRemove.add(FAILED_LAST_ACK_IDX);
+            flowFile = session.removeAllAttributes(flowFile, keysToRemove);
+        }
+        return flowFile;
+    }
+
+    /**
+     * Builds a {@link Map} of FAILED_* attributes
+     *
+     * @see #FAILED_PROC_ID_ATTR
+     * @see #FAILED_LAST_ACK_IDX
+     * @see #FAILED_TOPIC_ATTR
+     * @see #FAILED_KEY_ATTR
+     * @see #FAILED_DELIMITER_ATTR
+     */
+    private Map<String, String> buildFailedFlowFileAttributes(int 
lastAckedMessageIndex, FlowFile sourceFlowFile, ProcessContext context) {
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier());
+        attributes.put(FAILED_LAST_ACK_IDX, 
String.valueOf(lastAckedMessageIndex));
+        attributes.put(FAILED_TOPIC_ATTR, 
context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue());
+        attributes.put(FAILED_KEY_ATTR, 
context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue());
+        attributes.put(FAILED_DELIMITER_ATTR, 
context.getProperty(MESSAGE_DEMARCATOR).isSet()
+                ? 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(sourceFlowFile).getValue()
 : null);
+        return attributes;
+    }
+
+    /**
+     * Returns 'true' if provided FlowFile is a failed FlowFile. A failed
+     * FlowFile contains {@link #FAILED_PROC_ID_ATTR}.
+     */
+    private boolean isFailedFlowFile(FlowFile flowFile) {
+        return 
this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
new file mode 100644
index 0000000..bda29e6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Holder of context information used by {@link KafkaPublisher} required to
+ * publish messages to Kafka.
+ */
+class PublishingContext {
+
+    private final InputStream contentStream;
+
+    private final String topic;
+
+    private final int lastAckedMessageIndex;
+
+    /*
+     * We're using the default value from Kafka. We are using it to control the
+     * message size before it goes to to Kafka thus limiting possibility of a
+     * late failures in Kafka client.
+     */
+    private int maxRequestSize = 1048576; // kafka default
+
+    private boolean maxRequestSizeSet;
+
+    private byte[] keyBytes;
+
+    private byte[] delimiterBytes;
+
+    PublishingContext(InputStream contentStream, String topic) {
+        this(contentStream, topic, -1);
+    }
+
+    PublishingContext(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
+        this.validateInput(contentStream, topic, lastAckedMessageIndex);
+        this.contentStream = contentStream;
+        this.topic = topic;
+        this.lastAckedMessageIndex = lastAckedMessageIndex;
+    }
+
+    @Override
+    public String toString() {
+        return "topic: '" + this.topic + "'; delimiter: '" + new 
String(this.delimiterBytes, StandardCharsets.UTF_8) + "'";
+    }
+
+    int getLastAckedMessageIndex() {
+        return this.lastAckedMessageIndex;
+    }
+
+    int getMaxRequestSize() {
+        return this.maxRequestSize;
+    }
+
+    byte[] getKeyBytes() {
+        return this.keyBytes;
+    }
+
+    byte[] getDelimiterBytes() {
+        return this.delimiterBytes;
+    }
+
+    InputStream getContentStream() {
+        return this.contentStream;
+    }
+
+    String getTopic() {
+        return this.topic;
+    }
+
+    void setKeyBytes(byte[] keyBytes) {
+        if (this.keyBytes == null) {
+            if (keyBytes != null) {
+                this.assertBytesValid(keyBytes);
+                this.keyBytes = keyBytes;
+            }
+        } else {
+            throw new IllegalArgumentException("'keyBytes' can only be set 
once per instance");
+        }
+    }
+
+    void setDelimiterBytes(byte[] delimiterBytes) {
+        if (this.delimiterBytes == null) {
+            if (delimiterBytes != null) {
+                this.assertBytesValid(delimiterBytes);
+                this.delimiterBytes = delimiterBytes;
+            }
+        } else {
+            throw new IllegalArgumentException("'delimiterBytes' can only be 
set once per instance");
+        }
+    }
+
+    void setMaxRequestSize(int maxRequestSize) {
+        if (!this.maxRequestSizeSet) {
+            if (maxRequestSize > 0) {
+                this.maxRequestSize = maxRequestSize;
+                this.maxRequestSizeSet = true;
+            } else {
+                throw new IllegalArgumentException("'maxRequestSize' must be > 
0");
+            }
+        } else {
+            throw new IllegalArgumentException("'maxRequestSize' can only be 
set once per instance");
+        }
+    }
+
+    private void assertBytesValid(byte[] bytes) {
+        if (bytes != null) {
+            if (bytes.length == 0) {
+                throw new IllegalArgumentException("'bytes' must not be 
empty");
+            }
+        }
+    }
+
+    private void validateInput(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
+        if (contentStream == null) {
+            throw new IllegalArgumentException("'contentStream' must not be 
null");
+        } else if (topic == null || topic.trim().length() == 0) {
+            throw new IllegalArgumentException("'topic' must not be null or 
empty");
+        } else if (lastAckedMessageIndex < -1) {
+            throw new IllegalArgumentException("'lastAckedMessageIndex' must 
be >= -1");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..aa1d4e2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_10
+org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
new file mode 100644
index 0000000..2ce6b51
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
@@ -0,0 +1,33 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>ConsumeKafka</title>
+        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+        <!-- Processor Documentation 
================================================== -->
+        <h2>Description:</h2>
+        <p>
+            This Processors polls <a href="http://kafka.apache.org/";>Apache 
Kafka</a>
+            for data using KafkaConsumer API available with Kafka 0.10+. When 
a message is received 
+            from Kafka, this Processor emits a FlowFile where the content of 
the FlowFile is the value 
+            of the Kafka message.
+        </p>
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
new file mode 100644
index 0000000..dfd92b3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
@@ -0,0 +1,47 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>PublishKafka</title>
+        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+        <!-- Processor Documentation 
================================================== -->
+        <h2>Description:</h2>
+        <p>
+            This Processors puts the contents of a FlowFile to a Topic in
+            <a href="http://kafka.apache.org/";>Apache Kafka</a> using 
KafkaProducer API available
+            with Kafka 0.10+ API. The content of a FlowFile becomes the 
contents of a Kafka message.
+            This message is optionally assigned a key by using the &lt;Kafka 
Key&gt; Property.
+        </p>
+
+        <p>
+            The Processor allows the user to configure an optional Message 
Demarcator that
+            can be used to send many messages per FlowFile. For example, a 
<i>\n</i> could be used
+            to indicate that the contents of the FlowFile should be used to 
send one message
+            per line of text. It also supports multi-char demarcators (e.g., 
'my custom demarcator').
+            If the property is not set, the entire contents of the FlowFile
+            will be sent as a single message. When using the demarcator, if 
some messages are
+            successfully sent but other messages fail to send, the resulting 
FlowFile will be
+            considered a failed FlowFuile and will have additional attributes 
to that effect.
+            One of such attributes is 'failed.last.idx' which indicates the 
index of the last message
+            that was successfully ACKed by Kafka. (if no demarcator is used 
the value of this index will be -1).
+            This will allow PublishKafka to only re-send un-ACKed messages on 
the next re-try.
+        </p>
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
new file mode 100644
index 0000000..c172b03
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ConsumeKafkaTest {
+
+    static class MockConsumerPool extends ConsumerPool {
+
+        final int actualMaxLeases;
+        final List<String> actualTopics;
+        final Map<String, String> actualKafkaProperties;
+        boolean throwKafkaExceptionOnPoll = false;
+        boolean throwKafkaExceptionOnCommit = false;
+        Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new 
ArrayDeque<>();
+        Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = 
null;
+        Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null;
+        boolean wasConsumerLeasePoisoned = false;
+        boolean wasConsumerLeaseClosed = false;
+        boolean wasPoolClosed = false;
+
+        public MockConsumerPool(int maxLeases, List<String> topics, 
Map<String, String> kafkaProperties, ComponentLog logger) {
+            super(maxLeases, topics, kafkaProperties, null);
+            actualMaxLeases = maxLeases;
+            actualTopics = topics;
+            actualKafkaProperties = kafkaProperties;
+        }
+
+        @Override
+        public ConsumerLease obtainConsumer() {
+            return new ConsumerLease() {
+                @Override
+                public ConsumerRecords<byte[], byte[]> poll() {
+                    if (throwKafkaExceptionOnPoll) {
+                        throw new KafkaException("i planned to fail");
+                    }
+                    final ConsumerRecords<byte[], byte[]> records = 
nextPlannedRecordsQueue.poll();
+                    return (records == null) ? ConsumerRecords.empty() : 
records;
+                }
+
+                @Override
+                public void commitOffsets(Map<TopicPartition, 
OffsetAndMetadata> offsets) {
+                    if (throwKafkaExceptionOnCommit) {
+                        throw new KafkaException("i planned to fail");
+                    }
+                    actualCommitOffsets = offsets;
+                }
+
+                @Override
+                public void poison() {
+                    wasConsumerLeasePoisoned = true;
+                }
+
+                @Override
+                public void close() {
+                    wasConsumerLeaseClosed = true;
+                }
+            };
+        }
+
+        @Override
+        public void close() {
+            wasPoolClosed = true;
+        }
+
+        void resetState() {
+            throwKafkaExceptionOnPoll = false;
+            throwKafkaExceptionOnCommit = false;
+            nextPlannedRecordsQueue = null;
+            nextExpectedCommitOffsets = null;
+            wasConsumerLeasePoisoned = false;
+            wasConsumerLeaseClosed = false;
+            wasPoolClosed = false;
+        }
+
+    }
+
+    @Test
+    public void validateCustomValidatorSettings() throws Exception {
+        ConsumeKafka_0_10 consumeKafka = new ConsumeKafka_0_10();
+        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"okeydokey:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"Foo");
+        runner.assertNotValid();
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void validatePropertiesValidation() throws Exception {
+        ConsumeKafka_0_10 consumeKafka = new ConsumeKafka_0_10();
+        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"okeydokey:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
+
+        runner.removeProperty(ConsumeKafka_0_10.GROUP_ID);
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("invalid because group.id is 
required"));
+        }
+
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, "");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, "  ");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+    }
+
+    @Test
+    public void validateGetAllMessages() throws Exception {
+        String groupName = "validateGetAllMessages";
+
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
+        };
+        final ConsumerRecords<byte[], byte[]> firstRecs = 
createConsumerRecords("foo", 1, 1L, firstPassValues);
+
+        final byte[][] secondPassValues = new byte[][]{
+            "Hello-4".getBytes(StandardCharsets.UTF_8),
+            "Hello-5".getBytes(StandardCharsets.UTF_8),
+            "Hello-6".getBytes(StandardCharsets.UTF_8)
+        };
+        final ConsumerRecords<byte[], byte[]> secondRecs = 
createConsumerRecords("bar", 1, 1L, secondPassValues);
+
+        final List<String> expectedTopics = new ArrayList<>();
+        expectedTopics.add("foo");
+        expectedTopics.add("bar");
+        final MockConsumerPool mockPool = new MockConsumerPool(1, 
expectedTopics, Collections.EMPTY_MAP, null);
+        mockPool.nextPlannedRecordsQueue.add(firstRecs);
+        mockPool.nextPlannedRecordsQueue.add(secondRecs);
+
+        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
+            @Override
+            protected ConsumerPool createConsumerPool(final int maxLeases, 
final List<String> topics, final Map<String, String> props, final ComponentLog 
log) {
+                return mockPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
+
+        runner.run(1, false);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
+
+        assertEquals(expectedTopics, mockPool.actualTopics);
+
+        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
+        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
+        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
+
+        if (mockPool.nextPlannedRecordsQueue.isEmpty()) {
+            assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count());
+            assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count());
+            assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count());
+            assertEquals(2, mockPool.actualCommitOffsets.size());
+            assertEquals(4L, mockPool.actualCommitOffsets.get(new 
TopicPartition("foo", 1)).offset());
+            assertEquals(4L, mockPool.actualCommitOffsets.get(new 
TopicPartition("bar", 1)).offset());
+        } else {
+            assertEquals(2, mockPool.actualCommitOffsets.size());
+            assertEquals(4L, mockPool.actualCommitOffsets.get(new 
TopicPartition("foo", 1)).offset());
+        }
+
+        //asert that all consumers were closed as expected
+        //assert that the consumer pool was properly closed
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertFalse(mockPool.wasPoolClosed);
+        runner.run(1, true);
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertTrue(mockPool.wasPoolClosed);
+
+    }
+
+    @Test
+    public void validateGetLotsOfMessages() throws Exception {
+        String groupName = "validateGetLotsOfMessages";
+
+        final byte[][] firstPassValues = new byte[10010][1];
+        for (final byte[] value : firstPassValues) {
+            value[0] = 0x12;
+        }
+        final ConsumerRecords<byte[], byte[]> firstRecs = 
createConsumerRecords("foo", 1, 1L, firstPassValues);
+
+        final byte[][] secondPassValues = new byte[][]{
+            "Hello-4".getBytes(StandardCharsets.UTF_8),
+            "Hello-5".getBytes(StandardCharsets.UTF_8),
+            "Hello-6".getBytes(StandardCharsets.UTF_8)
+        };
+        final ConsumerRecords<byte[], byte[]> secondRecs = 
createConsumerRecords("bar", 1, 1L, secondPassValues);
+
+        final List<String> expectedTopics = new ArrayList<>();
+        expectedTopics.add("foo");
+        expectedTopics.add("bar");
+        final MockConsumerPool mockPool = new MockConsumerPool(1, 
expectedTopics, Collections.EMPTY_MAP, null);
+        mockPool.nextPlannedRecordsQueue.add(firstRecs);
+        mockPool.nextPlannedRecordsQueue.add(secondRecs);
+
+        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
+            @Override
+            protected ConsumerPool createConsumerPool(final int maxLeases, 
final List<String> topics, final Map<String, String> props, final ComponentLog 
log) {
+                return mockPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
+
+        runner.run(1, false);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
+
+        assertEquals(10010, flowFiles.stream().map(ff -> 
ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 
0x12).count());
+        assertEquals(1, mockPool.nextPlannedRecordsQueue.size());
+
+        assertEquals(1, mockPool.actualCommitOffsets.size());
+        assertEquals(10011L, mockPool.actualCommitOffsets.get(new 
TopicPartition("foo", 1)).offset());
+
+        //asert that all consumers were closed as expected
+        //assert that the consumer pool was properly closed
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertFalse(mockPool.wasPoolClosed);
+        runner.run(1, true);
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertTrue(mockPool.wasPoolClosed);
+
+    }
+
+    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String 
topic, final int partition, final long startingOffset, final byte[][] 
rawRecords) {
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = 
new HashMap<>();
+        final TopicPartition tPart = new TopicPartition(topic, partition);
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+        long offset = startingOffset;
+        for (final byte[] rawRecord : rawRecords) {
+            final ConsumerRecord<byte[], byte[]> rec = new 
ConsumerRecord(topic, partition, offset++, 
UUID.randomUUID().toString().getBytes(), rawRecord);
+            records.add(rec);
+        }
+        map.put(tPart, records);
+        return new ConsumerRecords(map);
+    }
+
+    private ConsumerRecords<byte[], byte[]> mergeRecords(final 
ConsumerRecords<byte[], byte[]>... records) {
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = 
new HashMap<>();
+        for (final ConsumerRecords<byte[], byte[]> rec : records) {
+            rec.partitions().stream().forEach((part) -> {
+                final List<ConsumerRecord<byte[], byte[]>> conRecs = 
rec.records(part);
+                if (map.get(part) != null) {
+                    throw new IllegalStateException("already have that 
topic/partition in the record map");
+                }
+                map.put(part, conRecs);
+            });
+        }
+        return new ConsumerRecords<>(map);
+    }
+
+    @Test
+    public void validateGetAllMessagesWithProvidedDemarcator() throws 
Exception {
+        String groupName = "validateGetAllMessagesWithProvidedDemarcator";
+
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
+        };
+
+        final byte[][] secondPassValues = new byte[][]{
+            "Hello-4".getBytes(StandardCharsets.UTF_8),
+            "Hello-5".getBytes(StandardCharsets.UTF_8),
+            "Hello-6".getBytes(StandardCharsets.UTF_8)
+        };
+        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
+                createConsumerRecords("foo", 1, 1L, firstPassValues),
+                createConsumerRecords("bar", 1, 1L, secondPassValues)
+        );
+
+        final List<String> expectedTopics = new ArrayList<>();
+        expectedTopics.add("foo");
+        expectedTopics.add("bar");
+        final MockConsumerPool mockPool = new MockConsumerPool(1, 
expectedTopics, Collections.EMPTY_MAP, null);
+        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
+
+        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
+            @Override
+            protected ConsumerPool createConsumerPool(final int maxLeases, 
final List<String> topics, final Map<String, String> props, final ComponentLog 
log) {
+                return mockPool;
+            }
+        };
+
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
+        runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
+
+        runner.run(1, false);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
+
+        assertEquals(2, flowFiles.size());
+
+        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> 
content.equals("Hello-1blahHello-2blahHello-3")).count());
+        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> 
content.equals("Hello-4blahHello-5blahHello-6")).count());
+
+        //asert that all consumers were closed as expected
+        //assert that the consumer pool was properly closed
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertFalse(mockPool.wasPoolClosed);
+        runner.run(1, true);
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertTrue(mockPool.wasPoolClosed);
+
+        assertEquals(2, mockPool.actualCommitOffsets.size());
+        assertEquals(4L, mockPool.actualCommitOffsets.get(new 
TopicPartition("foo", 1)).offset());
+        assertEquals(4L, mockPool.actualCommitOffsets.get(new 
TopicPartition("bar", 1)).offset());
+    }
+
+    @Test
+    public void validatePollException() throws Exception {
+        String groupName = "validatePollException";
+
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
+        };
+
+        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
+                createConsumerRecords("foo", 1, 1L, firstPassValues)
+        );
+
+        final List<String> expectedTopics = new ArrayList<>();
+        expectedTopics.add("foo");
+        final MockConsumerPool mockPool = new MockConsumerPool(1, 
expectedTopics, Collections.EMPTY_MAP, null);
+        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
+        mockPool.throwKafkaExceptionOnPoll = true;
+
+        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
+            @Override
+            protected ConsumerPool createConsumerPool(final int maxLeases, 
final List<String> topics, final Map<String, String> props, final ComponentLog 
log) {
+                return mockPool;
+            }
+        };
+
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
+        runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
+
+        runner.run(1, true);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
+
+        assertEquals(0, flowFiles.size());
+        assertNull(null, mockPool.actualCommitOffsets);
+
+        //asert that all consumers were closed as expected
+        //assert that the consumer pool was properly closed
+        assertTrue(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertTrue(mockPool.wasPoolClosed);
+    }
+
+    @Test
+    public void validateCommitOffsetException() throws Exception {
+        String groupName = "validateCommitOffsetException";
+
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
+        };
+
+        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
+                createConsumerRecords("foo", 1, 1L, firstPassValues)
+        );
+
+        final List<String> expectedTopics = new ArrayList<>();
+        expectedTopics.add("foo");
+        final MockConsumerPool mockPool = new MockConsumerPool(1, 
expectedTopics, Collections.EMPTY_MAP, null);
+        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
+        mockPool.throwKafkaExceptionOnCommit = true;
+
+        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
+            @Override
+            protected ConsumerPool createConsumerPool(final int maxLeases, 
final List<String> topics, final Map<String, String> props, final ComponentLog 
log) {
+                return mockPool;
+            }
+        };
+
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
+        runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
+
+        runner.run(1, true);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
+
+        assertEquals(1, flowFiles.size());
+
+        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> 
content.equals("Hello-1blahHello-2blahHello-3")).count());
+
+        //asert that all consumers were closed as expected
+        //assert that the consumer pool was properly closed
+        assertTrue(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertTrue(mockPool.wasPoolClosed);
+
+        assertNull(null, mockPool.actualCommitOffsets);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
new file mode 100644
index 0000000..7f88ea2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.Collections;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.KafkaException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConsumerPoolTest {
+
+    Consumer<byte[], byte[]> consumer = null;
+    ComponentLog logger = null;
+
+    @Before
+    public void setup() {
+        consumer = mock(Consumer.class);
+        logger = mock(ComponentLog.class);
+    }
+
+    @Test
+    public void validatePoolSimpleCreateClose() throws Exception {
+
+        final ConsumerPool testPool = new ConsumerPool(1, 
Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+
+        when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+
+        try (final ConsumerLease lease = testPool.obtainConsumer()) {
+            lease.poll();
+            lease.commitOffsets(Collections.emptyMap());
+        }
+        testPool.close();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+        assertEquals(1, stats.unproductivePollCount);
+        assertEquals(0, stats.productivePollCount);
+    }
+
+    @Test
+    public void validatePoolSimpleBatchCreateClose() throws Exception {
+
+        final ConsumerPool testPool = new ConsumerPool(5, 
Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+
+        when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+
+        for (int i = 0; i < 100; i++) {
+            try (final ConsumerLease lease = testPool.obtainConsumer()) {
+                for (int j = 0; j < 100; j++) {
+                    lease.poll();
+                }
+                lease.commitOffsets(Collections.emptyMap());
+            }
+        }
+        testPool.close();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(100, stats.leasesObtainedCount);
+        assertEquals(10000, stats.unproductivePollCount);
+        assertEquals(0, stats.productivePollCount);
+    }
+
+    @Test
+    public void validatePoolConsumerFails() throws Exception {
+
+        final ConsumerPool testPool = new ConsumerPool(1, 
Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+
+        when(consumer.poll(anyInt())).thenThrow(new KafkaException());
+
+        try (final ConsumerLease lease = testPool.obtainConsumer()) {
+            lease.poll();
+            fail();
+        } catch (final KafkaException ke) {
+
+        }
+        testPool.close();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+        assertEquals(0, stats.unproductivePollCount);
+        assertEquals(0, stats.productivePollCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
new file mode 100644
index 0000000..19c64af
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
+import 
org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
+import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
+import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+public class KafkaPublisherTest {
+
+    private static EmbeddedKafka kafkaLocal;
+
+    private static EmbeddedKafkaProducerHelper producerHelper;
+
+    @BeforeClass
+    public static void beforeClass() {
+        kafkaLocal = new EmbeddedKafka();
+        kafkaLocal.start();
+        producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        producerHelper.close();
+        kafkaLocal.stop();
+    }
+
+    @Test
+    public void validateSuccessfulSendAsWhole() throws Exception {
+        InputStream contentStream = new ByteArrayInputStream("Hello 
Kafka".getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateSuccessfulSendAsWhole";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
+
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
+        KafkaPublisherResult result = publisher.publish(publishingContext);
+
+        assertEquals(0, result.getLastMessageAcked());
+        assertEquals(1, result.getMessagesSent());
+        contentStream.close();
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        assertNotNull(iter.next());
+        try {
+            iter.next();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+    }
+
+    @Test
+    public void validateSuccessfulSendAsDelimited() throws Exception {
+        InputStream contentStream = new ByteArrayInputStream(
+                "Hello Kafka\nHello Kafka\nHello Kafka\nHello 
Kafka\n".getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateSuccessfulSendAsDelimited";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
+
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
+        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+        KafkaPublisherResult result = publisher.publish(publishingContext);
+
+        assertEquals(3, result.getLastMessageAcked());
+        assertEquals(4, result.getMessagesSent());
+        contentStream.close();
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+    }
+
+    /*
+     * This test simulates the condition where not all messages were ACKed by
+     * Kafka
+     */
+    @Test
+    public void validateRetries() throws Exception {
+        byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello 
Kafka4\n".getBytes(StandardCharsets.UTF_8);
+        InputStream contentStream = new ByteArrayInputStream(testValue);
+        String topicName = "validateSuccessfulReSendOfFailedSegments";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
+
+        // simulates the first re-try
+        int lastAckedMessageIndex = 1;
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+
+        publisher.publish(publishingContext);
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        String m1 = new String(iter.next().message());
+        String m2 = new String(iter.next().message());
+        assertEquals("Hello Kafka3", m1);
+        assertEquals("Hello Kafka4", m2);
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+
+        // simulates the second re-try
+        lastAckedMessageIndex = 2;
+        contentStream = new ByteArrayInputStream(testValue);
+        publishingContext = new PublishingContext(contentStream, topicName, 
lastAckedMessageIndex);
+        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+        publisher.publish(publishingContext);
+
+        m1 = new String(iter.next().message());
+        assertEquals("Hello Kafka4", m1);
+
+        publisher.close();
+    }
+
+    /*
+     * Similar to the above test, but it sets the first retry index to the last
+     * possible message index and second index to an out of bound index. The
+     * expectation is that no messages will be sent to Kafka
+     */
+    @Test
+    public void validateRetriesWithWrongIndex() throws Exception {
+        byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello 
Kafka4\n".getBytes(StandardCharsets.UTF_8);
+        InputStream contentStream = new ByteArrayInputStream(testValue);
+        String topicName = "validateRetriesWithWrongIndex";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
+
+        // simulates the first re-try
+        int lastAckedMessageIndex = 3;
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+
+        publisher.publish(publishingContext);
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+
+        // simulates the second re-try
+        lastAckedMessageIndex = 6;
+        contentStream = new ByteArrayInputStream(testValue);
+        publishingContext = new PublishingContext(contentStream, topicName, 
lastAckedMessageIndex);
+        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+        publisher.publish(publishingContext);
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+
+        publisher.close();
+    }
+
+    @Test
+    public void validateWithMultiByteCharactersNoDelimiter() throws Exception {
+        String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
+        InputStream contentStream = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateWithMultiByteCharacters";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
+
+        publisher.publish(publishingContext);
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        String r = new String(iter.next().message(), StandardCharsets.UTF_8);
+        assertEquals(data, r);
+    }
+
+    @Test
+    public void validateWithNonDefaultPartitioner() throws Exception {
+        String data = "fooandbarandbaz";
+        InputStream contentStream = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateWithNonDefaultPartitioner";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+        kafkaProperties.setProperty("partitioner.class", 
TestPartitioner.class.getName());
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
+        
publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8));
+
+        try {
+            publisher.publish(publishingContext);
+            // partitioner should be invoked 3 times
+            assertTrue(TestPartitioner.counter == 3);
+            publisher.close();
+        } finally {
+            TestPartitioner.counter = 0;
+        }
+    }
+
+    private Properties buildProducerProperties() {
+        Properties kafkaProperties = new Properties();
+        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:" + kafkaLocal.getKafkaPort());
+        kafkaProperties.put("auto.create.topics.enable", "true");
+        return kafkaProperties;
+    }
+
+    private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
+        Properties props = new Properties();
+        props.put("zookeeper.connect", "localhost:" + 
kafkaLocal.getZookeeperPort());
+        props.put("group.id", "test");
+        props.put("consumer.timeout.ms", "500");
+        props.put("auto.offset.reset", "smallest");
+        ConsumerConfig consumerConfig = new ConsumerConfig(props);
+        ConsumerConnector consumer = 
Consumer.createJavaConsumerConnector(consumerConfig);
+        Map<String, Integer> topicCountMap = new HashMap<>(1);
+        topicCountMap.put(topic, 1);
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
+        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+        ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
+        return iter;
+    }
+
+    public static class TestPartitioner implements Partitioner {
+
+        static int counter;
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            // nothing to do, test
+        }
+
+        @Override
+        public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes,
+                Cluster cluster) {
+            counter++;
+            return 0;
+        }
+
+        @Override
+        public void close() {
+            counter = 0;
+        }
+    }
+}

Reply via email to