http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
new file mode 100644
index 0000000..7660305
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Fetches messages from Apache Kafka")
+@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
+@WritesAttributes({
+        @WritesAttribute(attribute = "kafka.topic", description = "The name of 
the Kafka Topic from which the message was received"),
+        @WritesAttribute(attribute = "kafka.key", description = "The key of 
the Kafka message, if it exists and batch size is 1. If"
+                + " the message does not have a key, or if the batch size is 
greater than 1, this attribute will not be added"),
+        @WritesAttribute(attribute = "kafka.partition", description = "The 
partition of the Kafka Topic from which the message was received. This 
attribute is added only if the batch size is 1"),
+        @WritesAttribute(attribute = "kafka.offset", description = "The offset 
of the message within the Kafka partition. This attribute is added only if the 
batch size is 1")})
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
+            description = "These properties will be added on the Kafka 
configuration after loading any provided configuration properties."
+        + " In the event a dynamic property represents a property that was 
already set as part of the static properties, its value wil be"
+        + " overriden with warning message describing the override."
+        + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration.";)
+public class GetKafka extends AbstractProcessor {
+
+    public static final String SMALLEST = "smallest";
+    public static final String LARGEST = "largest";
+
+    public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new 
PropertyDescriptor.Builder()
+            .name("ZooKeeper Connection String")
+            .description("The Connection String to use in order to connect to 
ZooKeeper. This is often a comma-separated list of <host>:<port>"
+                    + " combinations. For example, 
host1:2181,host2:2181,host3:2188")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+    public static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+            .name("Topic Name")
+            .description("The Kafka Topic to pull messages from")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+    public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new 
PropertyDescriptor.Builder()
+            .name("Zookeeper Commit Frequency")
+            .description("Specifies how often to communicate with ZooKeeper to 
indicate which messages have been pulled. A longer time period will"
+                    + " result in better overall performance but can result in 
more data duplication if a NiFi node is lost")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("60 secs")
+            .build();
+    public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("ZooKeeper Communications Timeout")
+            .description("The amount of time to wait for a response from 
ZooKeeper before determining that there is a communications error")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("30 secs")
+            .build();
+    public static final PropertyDescriptor KAFKA_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Kafka Communications Timeout")
+            .description("The amount of time to wait for a response from Kafka 
before determining that there is a communications error")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("30 secs")
+            .build();
+    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("Specifies the maximum number of messages to combine 
into a single FlowFile. These messages will be "
+                    + "concatenated together with the <Message Demarcator> 
string placed between the content of each message. "
+                    + "If the messages from Kafka should not be concatenated 
together, leave this value at 1.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("1")
+            .build();
+    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
+            .name("Message Demarcator")
+            .description("Specifies the characters to use in order to 
demarcate multiple messages from Kafka. If the <Batch Size> "
+                    + "property is set to 1, this value is ignored. Otherwise, 
for each two subsequent messages in the batch, "
+                    + "this value will be placed in between them.")
+            .required(true)
+            .addValidator(Validator.VALID) // accept anything as a demarcator, 
including empty string
+            .expressionLanguageSupported(false)
+            .defaultValue("\\n")
+            .build();
+
+    public static final PropertyDescriptor CLIENT_NAME = new 
PropertyDescriptor.Builder()
+            .name("Client Name")
+            .description("Client Name to use when communicating with Kafka")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+    public static final PropertyDescriptor GROUP_ID = new 
PropertyDescriptor.Builder()
+            .name("Group ID")
+            .description("A Group ID is used to identify consumers that are 
within the same consumer group")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+
+    public static final PropertyDescriptor AUTO_OFFSET_RESET = new 
PropertyDescriptor.Builder()
+            .name("Auto Offset Reset")
+            .description("Automatically reset the offset to the smallest or 
largest offset available on the broker")
+            .required(true)
+            .allowableValues(SMALLEST, LARGEST)
+            .defaultValue(LARGEST)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles that are created are routed to this 
relationship")
+            .build();
+
+    private final BlockingQueue<ConsumerIterator<byte[], byte[]>> 
streamIterators = new LinkedBlockingQueue<>();
+    private volatile ConsumerConnector consumer;
+
+    private final AtomicBoolean consumerStreamsReady = new AtomicBoolean();
+
+    private volatile long deadlockTimeout;
+
+    private volatile ExecutorService executor;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final PropertyDescriptor clientNameWithDefault = new 
PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(CLIENT_NAME)
+                .defaultValue("NiFi-" + getIdentifier())
+                .build();
+        final PropertyDescriptor groupIdWithDefault = new 
PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(GROUP_ID)
+                .defaultValue(getIdentifier())
+                .build();
+
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(ZOOKEEPER_CONNECTION_STRING);
+        props.add(TOPIC);
+        props.add(ZOOKEEPER_COMMIT_DELAY);
+        props.add(BATCH_SIZE);
+        props.add(MESSAGE_DEMARCATOR);
+        props.add(clientNameWithDefault);
+        props.add(groupIdWithDefault);
+        props.add(KAFKA_TIMEOUT);
+        props.add(ZOOKEEPER_TIMEOUT);
+        props.add(AUTO_OFFSET_RESET);
+        return props;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>(1);
+        relationships.add(REL_SUCCESS);
+        return relationships;
+    }
+
+    public void createConsumers(final ProcessContext context) {
+        final String topic = context.getProperty(TOPIC).getValue();
+
+        final Properties props = new Properties();
+        props.setProperty("zookeeper.connect", 
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
+        props.setProperty("group.id", 
context.getProperty(GROUP_ID).getValue());
+        props.setProperty("client.id", 
context.getProperty(CLIENT_NAME).getValue());
+        props.setProperty("auto.commit.interval.ms", 
String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
+        props.setProperty("auto.offset.reset", 
context.getProperty(AUTO_OFFSET_RESET).getValue());
+        props.setProperty("zookeeper.connection.timeout.ms", 
context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
+        props.setProperty("socket.timeout.ms", 
context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
+
+        for (final Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+            PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                if (props.containsKey(descriptor.getName())) {
+                    this.getLogger().warn("Overriding existing property '" + 
descriptor.getName() + "' which had value of '"
+                        + props.getProperty(descriptor.getName()) + "' with 
dynamically set value '" + entry.getValue() + "'.");
+                }
+                props.setProperty(descriptor.getName(), entry.getValue());
+            }
+        }
+
+        /*
+         * Unless user sets it to some explicit value we are setting it to the
+         * lowest possible value of 1 millisecond to ensure the
+         * consumerStream.hasNext() doesn't block. See
+         * http://kafka.apache.org/documentation.html#configuration) as well as
+         * comment in 'catch ConsumerTimeoutException' in onTrigger() for more
+         * explanation as to the reasoning behind it.
+         */
+        if (!props.containsKey("consumer.timeout.ms")) {
+            this.getLogger().info("Setting 'consumer.timeout.ms' to 1 
milliseconds to avoid consumer"
+                            + " block in the event when no events are present 
in Kafka topic. If you wish to change this value "
+                            + " set it as dynamic property. If you wish to 
explicitly enable consumer block (at your own risk)"
+                            + " set its value to -1.");
+            props.setProperty("consumer.timeout.ms", "1");
+        }
+
+        int partitionCount = KafkaUtils.retrievePartitionCountForTopic(
+                context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), 
context.getProperty(TOPIC).getValue());
+
+        final ConsumerConfig consumerConfig = new ConsumerConfig(props);
+        consumer = Consumer.createJavaConsumerConnector(consumerConfig);
+
+        final Map<String, Integer> topicCountMap = new HashMap<>(1);
+
+        int concurrentTaskToUse = context.getMaxConcurrentTasks();
+        if (context.getMaxConcurrentTasks() < partitionCount){
+            this.getLogger().warn("The amount of concurrent tasks '" + 
context.getMaxConcurrentTasks() + "' configured for "
+                    + "this processor is less than the amount of partitions '" 
+ partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + 
"'. "
+                + "Consider making it equal to the amount of partition count 
for most efficient event consumption.");
+        } else if (context.getMaxConcurrentTasks() > partitionCount){
+            concurrentTaskToUse = partitionCount;
+            this.getLogger().warn("The amount of concurrent tasks '" + 
context.getMaxConcurrentTasks() + "' configured for "
+                    + "this processor is greater than the amount of partitions 
'" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + 
"'. "
+                + "Therefore those tasks would never see a message. To avoid 
that the '" + partitionCount + "'(partition count) will be used to consume 
events");
+        }
+
+        topicCountMap.put(topic, concurrentTaskToUse);
+
+        final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
+        final List<KafkaStream<byte[], byte[]>> streams = 
consumerMap.get(topic);
+
+        this.streamIterators.clear();
+
+        for (final KafkaStream<byte[], byte[]> stream : streams) {
+            streamIterators.add(stream.iterator());
+        }
+        this.consumerStreamsReady.set(true);
+    }
+
+    @OnStopped
+    public void shutdownConsumer() {
+        this.consumerStreamsReady.set(false);
+        if (consumer != null) {
+            try {
+                consumer.commitOffsets();
+            } finally {
+                consumer.shutdown();
+            }
+        }
+        if (this.executor != null) {
+            this.executor.shutdown();
+            try {
+                if (!this.executor.awaitTermination(30000, 
TimeUnit.MILLISECONDS)) {
+                    this.executor.shutdownNow();
+                    getLogger().warn("Executor did not stop in 30 sec. 
Terminated.");
+                }
+                this.executor = null;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
+                
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void schedule(ProcessContext context) {
+        this.deadlockTimeout = 
context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        /*
+         * Will ensure that consumer streams are ready upon the first 
invocation
+         * of onTrigger. Will be reset to 'false' in the event of exception
+         */
+        synchronized (this.consumerStreamsReady) {
+            if (this.executor == null || this.executor.isShutdown()) {
+                this.executor = Executors.newCachedThreadPool();
+            }
+            if (!this.consumerStreamsReady.get()) {
+                Future<Void> f = this.executor.submit(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        createConsumers(context);
+                        return null;
+                    }
+                });
+                try {
+                    f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    shutdownConsumer();
+                    f.cancel(true);
+                    Thread.currentThread().interrupt();
+                    getLogger().warn("Interrupted while waiting to get 
connection", e);
+                } catch (ExecutionException e) {
+                    throw new IllegalStateException(e);
+                } catch (TimeoutException e) {
+                    shutdownConsumer();
+                    f.cancel(true);
+                    getLogger().warn("Timed out after " + this.deadlockTimeout 
+ " milliseconds while waiting to get connection", e);
+                }
+            }
+        }
+        //===
+        if (this.consumerStreamsReady.get()) {
+            Future<Void> consumptionFuture = this.executor.submit(new 
Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    ConsumerIterator<byte[], byte[]> iterator = 
getStreamIterator();
+                    if (iterator != null) {
+                        consumeFromKafka(context, session, iterator);
+                    }
+                    return null;
+                }
+            });
+            try {
+                consumptionFuture.get(this.deadlockTimeout, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                shutdownConsumer();
+                consumptionFuture.cancel(true);
+                Thread.currentThread().interrupt();
+                getLogger().warn("Interrupted while consuming messages", e);
+            } catch (ExecutionException e) {
+                throw new IllegalStateException(e);
+            } catch (TimeoutException e) {
+                shutdownConsumer();
+                consumptionFuture.cancel(true);
+                getLogger().warn("Timed out after " + this.deadlockTimeout + " 
milliseconds while consuming messages", e);
+            }
+        }
+    }
+
+    protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
+        return this.streamIterators.poll();
+    }
+
+
+    private void consumeFromKafka(final ProcessContext context, final 
ProcessSession session,
+            ConsumerIterator<byte[], byte[]> iterator) throws ProcessException 
{
+
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        final String demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
+        final byte[] demarcatorBytes = 
demarcator.getBytes(StandardCharsets.UTF_8);
+        final String topic = context.getProperty(TOPIC).getValue();
+
+        FlowFile flowFile = session.create();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("kafka.topic", topic);
+        final long start = System.nanoTime();
+        int msgCount = 0;
+
+        try {
+            for (; msgCount < batchSize && iterator.hasNext(); msgCount++) {
+                final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
+
+                if (batchSize == 1) {
+                    final byte[] key = mam.key();
+                    // the kafka.key, kafka.offset, and kafka.partition 
attributes are added only
+                    // for a batch size of 1.
+                    if (key != null) {
+                        attributes.put("kafka.key", new String(key, 
StandardCharsets.UTF_8));
+                    }
+
+                    attributes.put("kafka.offset", 
String.valueOf(mam.offset()));
+                    attributes.put("kafka.partition", 
String.valueOf(mam.partition()));
+                }
+
+                // add the message to the FlowFile's contents
+                final boolean firstMessage = msgCount == 0;
+                flowFile = session.append(flowFile, new OutputStreamCallback() 
{
+                    @Override
+                    public void process(final OutputStream out) throws 
IOException {
+                        if (!firstMessage) {
+                            out.write(demarcatorBytes);
+                        }
+                        out.write(mam.message());
+                    }
+                });
+            }
+            this.releaseFlowFile(flowFile, session, attributes, start, topic, 
msgCount);
+        } catch (ConsumerTimeoutException e) {
+            /*
+             * By default Kafka blocks indefinitely if topic is empty via
+             * stream.hasNext(). If 'consumer.timeout.ms' property is set (see
+             * http://kafka.apache.org/documentation.html#configuration) the
+             * hasNext() will fail with this exception. To this processor it
+             * simply means there are no messages and current task should exit
+             * in non-failure releasing the flow file if it was able to
+             * accumulate any events.
+             */
+            this.releaseFlowFile(flowFile, session, attributes, start, topic, 
msgCount);
+        } catch (final Exception e) {
+            this.shutdownConsumer();
+            getLogger().error("Failed to receive FlowFile from Kafka due to 
{}", new Object[]{e});
+            if (flowFile != null) {
+                session.remove(flowFile);
+            }
+        } finally {
+            // Add the iterator back to the queue
+            if (iterator != null) {
+                streamIterators.offer(iterator);
+            }
+        }
+    }
+
+    /**
+     * Will release flow file. Releasing of the flow file in the context of 
this
+     * operation implies the following:
+     *
+     * If Empty then remove from session and return
+     * If has something then transfer to REL_SUCCESS
+     */
+    private void releaseFlowFile(FlowFile flowFile, ProcessSession session, 
Map<String, String> attributes, long start, String topic, int msgCount){
+        if (flowFile.getSize() == 0L) {
+            session.remove(flowFile);
+        } else {
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            session.getProvenanceReporter().receive(flowFile, "kafka://" + 
topic, "Received " + msgCount + " Kafka messages", millis);
+            getLogger().info("Successfully received {} from Kafka with {} 
messages in {} millis", new Object[]{flowFile, msgCount, millis});
+            session.transfer(flowFile, REL_SUCCESS);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
new file mode 100644
index 0000000..5bc0e0e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.Closeable;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.stream.io.util.StreamDemarcator;
+
+import kafka.producer.Partitioner;
+
+/**
+ * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
+ * with sending contents of the {@link FlowFile}s to Kafka.
+ */
+class KafkaPublisher implements Closeable {
+
+    private final Producer<byte[], byte[]> kafkaProducer;
+
+    private long ackWaitTime = 30000;
+
+    private final ComponentLog componentLog;
+
+    private final Partitioner partitioner;
+
+    private final int ackCheckSize;
+
+    KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
+        this(kafkaProperties, 100, componentLog);
+    }
+
+    /**
+     * Creates an instance of this class as well as the instance of the
+     * corresponding Kafka {@link KafkaProducer} using provided Kafka
+     * configuration properties.
+     *
+     * @param kafkaProperties
+     *            instance of {@link Properties} used to bootstrap
+     *            {@link KafkaProducer}
+     */
+    KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog 
componentLog) {
+        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
+        this.ackCheckSize = ackCheckSize;
+        try {
+            if (kafkaProperties.containsKey("partitioner.class")) {
+                this.partitioner = (Partitioner) 
Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance();
+            } else {
+                this.partitioner = null;
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to create partitioner", e);
+        }
+        this.componentLog = componentLog;
+    }
+
+    /**
+     * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to
+     * determine how many messages to Kafka will be sent from a provided
+     * {@link InputStream} (see {@link PublishingContext#getContentStream()}).
+     * It supports two publishing modes:
+     * <ul>
+     * <li>Sending all messages constructed from
+     * {@link StreamDemarcator#nextToken()} operation.</li>
+     * <li>Sending only unacknowledged messages constructed from
+     * {@link StreamDemarcator#nextToken()} operation.</li>
+     * </ul>
+     * The unacknowledged messages are determined from the value of
+     * {@link PublishingContext#getLastAckedMessageIndex()}.
+     * <br>
+     * This method assumes content stream affinity where it is expected that 
the
+     * content stream that represents the same Kafka message(s) will remain the
+     * same across possible retries. This is required specifically for cases
+     * where delimiter is used and a single content stream may represent
+     * multiple Kafka messages. The
+     * {@link PublishingContext#getLastAckedMessageIndex()} will provide the
+     * index of the last ACKed message, so upon retry only messages with the
+     * higher index are sent.
+     *
+     * @param publishingContext
+     *            instance of {@link PublishingContext} which hold context
+     *            information about the message(s) to be sent.
+     * @return The index of the last successful offset.
+     */
+    KafkaPublisherResult publish(PublishingContext publishingContext) {
+        StreamDemarcator streamTokenizer = new 
StreamDemarcator(publishingContext.getContentStream(),
+            publishingContext.getDelimiterBytes(), 
publishingContext.getMaxRequestSize());
+
+        int prevLastAckedMessageIndex = 
publishingContext.getLastAckedMessageIndex();
+        List<Future<RecordMetadata>> resultFutures = new ArrayList<>();
+
+        byte[] messageBytes;
+        int tokenCounter = 0;
+        boolean continueSending = true;
+        KafkaPublisherResult result = null;
+        for (; continueSending && (messageBytes = streamTokenizer.nextToken()) 
!= null; tokenCounter++) {
+            if (prevLastAckedMessageIndex < tokenCounter) {
+                Integer partitionId = publishingContext.getPartitionId();
+                if (partitionId == null && publishingContext.getKeyBytes() != 
null) {
+                    partitionId = 
this.getPartition(publishingContext.getKeyBytes(), 
publishingContext.getTopic());
+                }
+                ProducerRecord<byte[], byte[]> message =
+                    new ProducerRecord<>(publishingContext.getTopic(), 
publishingContext.getPartitionId(), publishingContext.getKeyBytes(), 
messageBytes);
+                resultFutures.add(this.kafkaProducer.send(message));
+
+                if (tokenCounter % this.ackCheckSize == 0) {
+                    int lastAckedMessageIndex = 
this.processAcks(resultFutures, prevLastAckedMessageIndex);
+                    resultFutures.clear();
+                    if (lastAckedMessageIndex % this.ackCheckSize != 0) {
+                        continueSending = false;
+                        result = new KafkaPublisherResult(tokenCounter, 
lastAckedMessageIndex);
+                    }
+                    prevLastAckedMessageIndex = lastAckedMessageIndex;
+                }
+            }
+        }
+
+        if (result == null) {
+            int lastAckedMessageIndex = this.processAcks(resultFutures, 
prevLastAckedMessageIndex);
+            resultFutures.clear();
+            result = new KafkaPublisherResult(tokenCounter, 
lastAckedMessageIndex);
+        }
+        return result;
+    }
+
+    /**
+     * Sets the time this publisher will wait for the {@link Future#get()}
+     * operation (the Future returned by
+     * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing
+     * out.
+     *
+     * This value will also be used as a timeout when closing the underlying
+     * {@link KafkaProducer}. See {@link #close()}.
+     */
+    void setAckWaitTime(long ackWaitTime) {
+        this.ackWaitTime = ackWaitTime;
+    }
+
+    /**
+     * This operation will process ACKs from Kafka in the order in which
+     * {@link KafkaProducer#send(ProducerRecord)} invocation were made 
returning
+     * the index of the last ACKed message. Within this operation processing 
ACK
+     * simply means successful invocation of 'get()' operation on the
+     * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)}
+     * operation. Upon encountering any type of error while interrogating such
+     * {@link Future} the ACK loop will end. Messages that were not ACKed would
+     * be considered non-delivered and therefore could be resent at the later
+     * time.
+     *
+     * @param sendFutures
+     *            list of {@link Future}s representing results of publishing to
+     *            Kafka
+     *
+     * @param lastAckMessageIndex
+     *            the index of the last ACKed message. It is important to
+     *            provide the last ACKed message especially while re-trying so
+     *            the proper index is maintained.
+     */
+    private int processAcks(List<Future<RecordMetadata>> sendFutures, int 
lastAckMessageIndex) {
+        boolean exceptionThrown = false;
+        for (int segmentCounter = 0; segmentCounter < sendFutures.size() && 
!exceptionThrown; segmentCounter++) {
+            Future<RecordMetadata> future = sendFutures.get(segmentCounter);
+            try {
+                future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
+                lastAckMessageIndex++;
+            } catch (InterruptedException e) {
+                exceptionThrown = true;
+                Thread.currentThread().interrupt();
+                this.warnOrError("Interrupted while waiting for acks from 
Kafka", null);
+            } catch (ExecutionException e) {
+                exceptionThrown = true;
+                this.warnOrError("Failed while waiting for acks from Kafka", 
e);
+            } catch (TimeoutException e) {
+                exceptionThrown = true;
+                this.warnOrError("Timed out while waiting for acks from 
Kafka", null);
+            }
+        }
+
+        return lastAckMessageIndex;
+    }
+
+    /**
+     * Will close the underlying {@link KafkaProducer}
+     */
+    @Override
+    public void close() {
+        this.kafkaProducer.close();
+    }
+
+    /**
+     *
+     */
+    private void warnOrError(String message, Exception e) {
+        if (e == null) {
+            this.componentLog.warn(message);
+        } else {
+            this.componentLog.error(message);
+        }
+    }
+
+    static class KafkaPublisherResult {
+        private final int messagesSent;
+        private final int lastMessageAcked;
+        KafkaPublisherResult(int messagesSent, int lastMessageAcked) {
+            this.messagesSent = messagesSent;
+            this.lastMessageAcked = lastMessageAcked;
+        }
+
+        public int getMessagesSent() {
+            return this.messagesSent;
+        }
+
+        public int getLastMessageAcked() {
+            return this.lastMessageAcked;
+        }
+
+        public boolean isAllAcked() {
+            return this.lastMessageAcked > -1 && this.messagesSent - 1 == 
this.lastMessageAcked;
+        }
+
+        @Override
+        public String toString() {
+            return "Sent:" + this.messagesSent + "; Last ACK:" + 
this.lastMessageAcked;
+        }
+    }
+
+    /**
+     *
+     */
+    private int getPartition(Object key, String topicName) {
+        if (this.partitioner != null) {
+            int partSize = this.kafkaProducer.partitionsFor(topicName).size();
+            return this.partitioner.partition(key, partSize);
+        }
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
new file mode 100644
index 0000000..8ddea61
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
@@ -0,0 +1,74 @@
+package org.apache.nifi.processors.kafka;
+
+import java.util.Collections;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import kafka.admin.AdminUtils;
+import kafka.api.TopicMetadata;
+import kafka.utils.ZKStringSerializer;
+import scala.collection.JavaConversions;
+
+/**
+ * Utility class to support interruction with Kafka internals.
+ *
+ */
+class KafkaUtils {
+
+
+    /**
+     * Will retrieve the amount of partitions for a given Kafka topic.
+     */
+    static int retrievePartitionCountForTopic(String 
zookeeperConnectionString, String topicName) {
+        ZkClient zkClient = null;
+
+        try {
+            zkClient = new ZkClient(zookeeperConnectionString);
+            zkClient.setZkSerializer(new ZkSerializer() {
+                @Override
+                public byte[] serialize(Object o) throws ZkMarshallingError {
+                    return ZKStringSerializer.serialize(o);
+                }
+
+                @Override
+                public Object deserialize(byte[] bytes) throws 
ZkMarshallingError {
+                    return ZKStringSerializer.deserialize(bytes);
+                }
+            });
+            scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
+                    
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)),
 zkClient);
+            if (topicMetadatas != null && topicMetadatas.size() > 0) {
+                return 
JavaConversions.asJavaSet(topicMetadatas).iterator().next().partitionsMetadata().size();
+            } else {
+                throw new IllegalStateException("Failed to get metadata for 
topic " + topicName);
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to retrieve partitions for 
topic " + topicName, e);
+        } finally {
+            try {
+                zkClient.close();
+            } catch (Exception e2) {
+                // ignore
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
new file mode 100644
index 0000000..32d3606
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.util.Random;
+
+import kafka.producer.Partitioner;
+
+/**
+ * Collection of implementation of common Kafka {@link Partitioner}s.
+ */
+final public class Partitioners {
+
+    private Partitioners() {
+    }
+    /**
+     * {@link Partitioner} that implements 'round-robin' mechanism which evenly
+     * distributes load between all available partitions.
+     */
+    public static class RoundRobinPartitioner implements Partitioner {
+        private volatile int index;
+
+        @Override
+        public int partition(Object key, int numberOfPartitions) {
+            int partitionIndex = this.next(numberOfPartitions);
+            return partitionIndex;
+        }
+
+        private synchronized int next(int numberOfPartitions) {
+            if (this.index >= numberOfPartitions) {
+                this.index = 0;
+            }
+            return index++;
+        }
+    }
+
+    /**
+     * {@link Partitioner} that implements 'random' mechanism which randomly
+     * distributes the load between all available partitions.
+     */
+    public static class RandomPartitioner implements Partitioner {
+        private final Random random;
+
+        public RandomPartitioner() {
+            this.random = new Random();
+        }
+
+        @Override
+        public int partition(Object key, int numberOfPartitions) {
+            return this.random.nextInt(numberOfPartitions);
+        }
+    }
+
+    /**
+     * {@link Partitioner} that implements 'key hash' mechanism which
+     * distributes the load between all available partitions based on hashing
+     * the value of the key.
+     */
+    public static class HashPartitioner implements Partitioner {
+
+        @Override
+        public int partition(Object key, int numberOfPartitions) {
+            if (key != null) {
+                return (key.hashCode() & Integer.MAX_VALUE) % 
numberOfPartitions;
+            }
+            return 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java
new file mode 100644
index 0000000..914ac1a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Holder of context information used by {@link KafkaPublisher} required to
+ * publish messages to Kafka.
+ */
+class PublishingContext {
+
+    private final InputStream contentStream;
+
+    private final String topic;
+
+    private final int lastAckedMessageIndex;
+
+    private volatile Integer partitionId;
+
+    /*
+     * We're using the default value from Kafka. We are using it to control the
+     * message size before it goes to to Kafka thus limiting possibility of a
+     * late failures in Kafka client.
+     */
+    private volatile int maxRequestSize = 1048576; // kafka default
+
+    private volatile boolean maxRequestSizeSet;
+
+    private volatile byte[] keyBytes;
+
+    private volatile byte[] delimiterBytes;
+
+
+
+    PublishingContext(InputStream contentStream, String topic) {
+        this(contentStream, topic, -1);
+    }
+
+    PublishingContext(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
+        this.validateInput(contentStream, topic, lastAckedMessageIndex);
+        this.contentStream = contentStream;
+        this.topic = topic;
+        this.lastAckedMessageIndex = lastAckedMessageIndex;
+    }
+
+    @Override
+    public String toString() {
+        return "topic: '" + this.topic + "'; delimiter: '" + new 
String(this.delimiterBytes, StandardCharsets.UTF_8) + "'";
+    }
+
+    int getLastAckedMessageIndex() {
+        return this.lastAckedMessageIndex;
+    }
+
+    int getMaxRequestSize() {
+        return this.maxRequestSize;
+    }
+
+    byte[] getKeyBytes() {
+        return this.keyBytes;
+    }
+
+    Integer getPartitionId() {
+        return partitionId;
+    }
+
+    public void setPartitionId(Integer partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    byte[] getDelimiterBytes() {
+        return this.delimiterBytes;
+    }
+
+    InputStream getContentStream() {
+        return this.contentStream;
+    }
+
+    String getTopic() {
+        return this.topic;
+    }
+
+    void setKeyBytes(byte[] keyBytes) {
+        if (this.keyBytes == null) {
+            if (keyBytes != null) {
+                this.assertBytesValid(keyBytes);
+                this.keyBytes = keyBytes;
+            }
+        } else {
+            throw new IllegalArgumentException("'keyBytes' can only be set 
once per instance");
+        }
+    }
+
+    void setDelimiterBytes(byte[] delimiterBytes) {
+        if (this.delimiterBytes == null) {
+            if (delimiterBytes != null) {
+                this.assertBytesValid(delimiterBytes);
+                this.delimiterBytes = delimiterBytes;
+            }
+        } else {
+            throw new IllegalArgumentException("'delimiterBytes' can only be 
set once per instance");
+        }
+    }
+
+    void setMaxRequestSize(int maxRequestSize) {
+        if (!this.maxRequestSizeSet) {
+            if (maxRequestSize > 0) {
+                this.maxRequestSize = maxRequestSize;
+                this.maxRequestSizeSet = true;
+            } else {
+                throw new IllegalArgumentException("'maxRequestSize' must be > 
0");
+            }
+        } else {
+            throw new IllegalArgumentException("'maxRequestSize' can only be 
set once per instance");
+        }
+    }
+
+    private void assertBytesValid(byte[] bytes) {
+        if (bytes != null) {
+            if (bytes.length == 0) {
+                throw new IllegalArgumentException("'bytes' must not be 
empty");
+            }
+        }
+    }
+
+    private void validateInput(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
+        if (contentStream == null) {
+            throw new IllegalArgumentException("'contentStream' must not be 
null");
+        } else if (topic == null || topic.trim().length() == 0) {
+            throw new IllegalArgumentException("'topic' must not be null or 
empty");
+        } else if (lastAckedMessageIndex < -1) {
+            throw new IllegalArgumentException("'lastAckedMessageIndex' must 
be >= -1");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
new file mode 100644
index 0000000..4dc8d18
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka. The messages to send may be individual FlowFiles or may be 
delimited, using a "
+        + "user-specified delimiter, such as a new-line.")
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
+                 description = "These properties will be added on the Kafka 
configuration after loading any provided configuration properties."
+        + " In the event a dynamic property represents a property that was 
already set as part of the static properties, its value wil be"
+        + " overriden with warning message describing the override."
+        + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration.";)
+public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
+
+    private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
+
+    private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + 
"(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
+
+    public static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("all", "Guarantee Replicated Delivery",
+            "FlowFile will be routed to"
+                    + " failure unless the message is replicated to the 
appropriate number of Kafka Nodes according to the Topic configuration");
+    public static final AllowableValue DELIVERY_ONE_NODE = new 
AllowableValue("1", "Guarantee Single Node Delivery",
+            "FlowFile will be routed"
+                    + " to success if the message is received by a single 
Kafka node, whether or not it is replicated. This is faster than"
+                    + " <Guarantee Replicated Delivery> but can result in data 
loss if a Kafka node crashes");
+    public static final AllowableValue DELIVERY_BEST_EFFORT = new 
AllowableValue("0", "Best Effort",
+            "FlowFile will be routed to success after"
+                    + " successfully writing the content to a Kafka node, 
without waiting for a response. This provides the best performance but may 
result"
+                    + " in data loss.");
+
+    /**
+     * AllowableValue for sending messages to Kafka without compression
+     */
+    public static final AllowableValue COMPRESSION_CODEC_NONE = new 
AllowableValue("none", "None",
+            "Compression will not be used for any topic.");
+
+    /**
+     * AllowableValue for sending messages to Kafka with GZIP compression
+     */
+    public static final AllowableValue COMPRESSION_CODEC_GZIP = new 
AllowableValue("gzip", "GZIP",
+            "Compress messages using GZIP");
+
+    /**
+     * AllowableValue for sending messages to Kafka with Snappy compression
+     */
+    public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new 
AllowableValue("snappy", "Snappy",
+            "Compress messages using Snappy");
+
+    static final AllowableValue ROUND_ROBIN_PARTITIONING = new 
AllowableValue("Round Robin", "Round Robin",
+            "Messages will be assigned partitions in a round-robin fashion, 
sending the first message to Partition 1, "
+                    + "the next Partition to Partition 2, and so on, wrapping 
as necessary.");
+    static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("Random Robin", "Random",
+            "Messages will be assigned to random partitions.");
+    static final AllowableValue USER_DEFINED_PARTITIONING = new 
AllowableValue("User-Defined", "User-Defined",
+            "The <Partition> property will be used to determine the partition. 
All messages within the same FlowFile will be "
+                    + "assigned to the same partition.");
+
+    public static final PropertyDescriptor SEED_BROKERS = new 
PropertyDescriptor.Builder()
+            .name("Known Brokers")
+            .description("A comma-separated list of known Kafka Brokers in the 
format <host>:<port>")
+            .required(true)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
+            .expressionLanguageSupported(false)
+            .build();
+    public static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+            .name("Topic Name")
+            .description("The Kafka Topic of interest")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    static final PropertyDescriptor PARTITION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Partition Strategy")
+            .description("Specifies how messages should be partitioned when 
sent to Kafka")
+            .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, 
USER_DEFINED_PARTITIONING)
+            .defaultValue(ROUND_ROBIN_PARTITIONING.getValue())
+            .required(true)
+            .build();
+    public static final PropertyDescriptor PARTITION = new 
PropertyDescriptor.Builder()
+            .name("Partition")
+            .description("Specifies which Kafka Partition to add the message 
to. If using a message delimiter, all messages "
+                            + "in the same FlowFile will be sent to the same 
partition. If a partition is specified but is not valid, "
+                            + "then all messages within the same FlowFile will 
use the same partition but it remains undefined which partition is used.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(false)
+            .build();
+    public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
+            .name("Kafka Key")
+            .description("The Key to use for the Message")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor DELIVERY_GUARANTEE = new 
PropertyDescriptor.Builder()
+            .name("Delivery Guarantee")
+            .description("Specifies the requirement for guaranteeing that a 
message is sent to Kafka").required(true)
+            .expressionLanguageSupported(false)
+            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, 
DELIVERY_REPLICATED)
+            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+            .build();
+    public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
+            .name("Message Delimiter")
+            .description("Specifies the delimiter (interpreted in its UTF-8 
byte representation) to use for splitting apart multiple messages within a 
single FlowFile. "
+                            + "If not specified, the entire content of the 
FlowFile will be used as a single message. If specified, "
+                            + "the contents of the FlowFile will be split on 
this delimiter and each section sent as a separate Kafka "
+                            + "message. Note that if messages are delimited 
and some messages for a given FlowFile are transferred "
+                            + "successfully while others are not, the messages 
will be split into individual FlowFiles, such that those "
+                            + "messages that were successfully sent are routed 
to the 'success' relationship while other messages are "
+                            + "sent to the 'failure' relationship.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Buffer Size")
+            .description("The maximum amount of data to buffer in memory 
before sending to Kafka")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("5 MB")
+            .build();
+    static final PropertyDescriptor MAX_RECORD_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Record Size")
+            .description("The maximum size that any individual record can be.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .required(true)
+            .defaultValue("1 MB")
+            .build();
+    public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Communications Timeout")
+            .description("The amount of time to wait for a response from Kafka 
before determining that there is a communications error")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("30 secs").build();
+    public static final PropertyDescriptor CLIENT_NAME = new 
PropertyDescriptor.Builder()
+            .name("Client Name")
+            .description("Client Name to use when communicating with Kafka")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+    public static final PropertyDescriptor BATCH_NUM_MESSAGES = new 
PropertyDescriptor.Builder()
+            .name("Async Batch Size")
+            .displayName("Batch Size")
+            .description("This configuration controls the default batch size 
in bytes.The producer will attempt to batch records together into "
+                    + "fewer requests whenever multiple records are being sent 
to the same partition. This helps performance on both the client "
+                    + "and the server.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("16384") // Kafka default
+            .build();
+    public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new 
PropertyDescriptor.Builder()
+            .name("Queue Buffering Max Time")
+            .description("Maximum time to buffer data before sending to Kafka. 
For example a setting of 100 ms"
+                    + " will try to batch together 100 milliseconds' worth of 
messages to send at once. This will improve"
+                    + " throughput but adds message delivery latency due to 
the buffering.")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+            .name("Compression Codec")
+            .description("This parameter allows you to specify the compression 
codec for all"
+                    + " data generated by this producer.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, 
COMPRESSION_CODEC_SNAPPY)
+            .defaultValue(COMPRESSION_CODEC_NONE.getValue())
+            .build();
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Any FlowFile that is successfully sent to Kafka will 
be routed to this Relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Any FlowFile that cannot be sent to Kafka will be 
routed to this Relationship")
+            .build();
+
+    protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id";
+
+    protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx";
+
+    protected static final String FAILED_TOPIC_ATTR = "failed.topic";
+
+    protected static final String FAILED_KEY_ATTR = "failed.key";
+
+    protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter";
+
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    private static final Set<Relationship> relationships;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.add(SEED_BROKERS);
+        _propertyDescriptors.add(TOPIC);
+        _propertyDescriptors.add(PARTITION_STRATEGY);
+        _propertyDescriptors.add(PARTITION);
+        _propertyDescriptors.add(KEY);
+        _propertyDescriptors.add(DELIVERY_GUARANTEE);
+        _propertyDescriptors.add(MESSAGE_DELIMITER);
+        _propertyDescriptors.add(MAX_BUFFER_SIZE);
+        _propertyDescriptors.add(MAX_RECORD_SIZE);
+        _propertyDescriptors.add(TIMEOUT);
+        _propertyDescriptors.add(BATCH_NUM_MESSAGES);
+        _propertyDescriptors.add(QUEUE_BUFFERING_MAX);
+        _propertyDescriptors.add(COMPRESSION_CODEC);
+        _propertyDescriptors.add(CLIENT_NAME);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+
+    /**
+     * Will rendezvous with Kafka if {@link ProcessSession} contains {@link 
FlowFile}
+     * producing a result {@link FlowFile}.
+     * <br>
+     * The result {@link FlowFile} that is successful is then transfered to 
{@link #REL_SUCCESS}
+     * <br>
+     * The result {@link FlowFile} that is failed is then transfered to {@link 
#REL_FAILURE}
+     *
+     */
+    @Override
+    protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session) throws ProcessException {
+        boolean processed = false;
+        FlowFile flowFile = session.get();
+        if (flowFile != null) {
+            flowFile = this.doRendezvousWithKafka(flowFile, context, session);
+            if (!this.isFailedFlowFile(flowFile)) {
+                session.getProvenanceReporter().send(flowFile,
+                        context.getProperty(SEED_BROKERS).getValue() + "/"
+                        + 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue());
+                session.transfer(flowFile, REL_SUCCESS);
+            } else {
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
+            }
+            processed = true;
+        }
+        return processed;
+    }
+
+    /**
+     * Will rendezvous with {@link KafkaPublisher} after building
+     * {@link PublishingContext} and will produce the resulting {@link 
FlowFile}.
+     * The resulting FlowFile contains all required information to determine
+     * if message publishing originated from the provided FlowFile has actually
+     * succeeded fully, partially or failed completely (see
+     * {@link #isFailedFlowFile(FlowFile)}.
+     */
+    private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final 
ProcessContext context, final ProcessSession session) {
+        final AtomicReference<KafkaPublisherResult> publishResultRef = new 
AtomicReference<>();
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(InputStream contentStream) throws IOException {
+                PublishingContext publishingContext = 
PutKafka.this.buildPublishingContext(flowFile, context, contentStream);
+                KafkaPublisherResult result = 
PutKafka.this.kafkaResource.publish(publishingContext);
+                publishResultRef.set(result);
+            }
+        });
+
+        FlowFile resultFile = publishResultRef.get().isAllAcked()
+                ? this.cleanUpFlowFileIfNecessary(flowFile, session)
+                : session.putAllAttributes(flowFile, 
this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(),
 flowFile, context));
+
+        return resultFile;
+    }
+
+    /**
+     * Builds {@link PublishingContext} for message(s) to be sent to Kafka.
+     * {@link PublishingContext} contains all contextual information required 
by
+     * {@link KafkaPublisher} to publish to Kafka. Such information contains
+     * things like topic name, content stream, delimiter, key and last ACKed
+     * message for cases where provided FlowFile is being retried (failed in 
the
+     * past). <br>
+     * For the clean FlowFile (file that has been sent for the first time),
+     * PublishingContext will be built form {@link ProcessContext} associated
+     * with this invocation. <br>
+     * For the failed FlowFile, {@link PublishingContext} will be built from
+     * attributes of that FlowFile which by then will already contain required
+     * information (e.g., topic, key, delimiter etc.). This is required to
+     * ensure the affinity of the retry in the even where processor
+     * configuration has changed. However keep in mind that failed FlowFile is
+     * only considered a failed FlowFile if it is being re-processed by the 
same
+     * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
+     * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent 
to
+     * another PublishKafka processor it is treated as a fresh FlowFile
+     * regardless if it has #FAILED* attributes set.
+     */
+    private PublishingContext buildPublishingContext(FlowFile flowFile, 
ProcessContext context,
+            InputStream contentStream) {
+        String topicName;
+        byte[] keyBytes;
+        byte[] delimiterBytes = null;
+        int lastAckedMessageIndex = -1;
+        if (this.isFailedFlowFile(flowFile)) {
+            lastAckedMessageIndex = 
Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
+            topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
+            keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null
+                    ? 
flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null;
+            delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != 
null
+                    ? 
flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : 
null;
+
+        } else {
+            topicName = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+            String _key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+            keyBytes = _key == null ? null : 
_key.getBytes(StandardCharsets.UTF_8);
+            delimiterBytes = context.getProperty(MESSAGE_DELIMITER).isSet() ? 
context.getProperty(MESSAGE_DELIMITER)
+                    
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8)
 : null;
+        }
+
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+        publishingContext.setKeyBytes(keyBytes);
+        publishingContext.setDelimiterBytes(delimiterBytes);
+        publishingContext.setPartitionId(this.determinePartition(context, 
flowFile));
+        return publishingContext;
+    }
+
+    /**
+     * Returns 'true' if provided FlowFile is a failed FlowFile. A failed
+     * FlowFile contains {@link #FAILED_PROC_ID_ATTR}.
+     */
+    private boolean isFailedFlowFile(FlowFile flowFile) {
+        return 
this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR));
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session)
+            throws ProcessException {
+        KafkaPublisher kafkaPublisher = new 
KafkaPublisher(this.buildKafkaConfigProperties(context), this.getLogger());
+        return kafkaPublisher;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
+                
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
+                .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        final String partitionStrategy = 
validationContext.getProperty(PARTITION_STRATEGY).getValue();
+        if 
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())
+                && !validationContext.getProperty(PARTITION).isSet()) {
+            results.add(new 
ValidationResult.Builder().subject("Partition").valid(false)
+                    .explanation("The <Partition> property must be set when 
configured to use the User-Defined Partitioning Strategy")
+                    .build());
+        }
+        return results;
+    }
+
+    /**
+     *
+     */
+    private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, 
ProcessSession session) {
+        if (this.isFailedFlowFile(flowFile)) {
+            Set<String> keysToRemove = new HashSet<>();
+            keysToRemove.add(FAILED_DELIMITER_ATTR);
+            keysToRemove.add(FAILED_KEY_ATTR);
+            keysToRemove.add(FAILED_TOPIC_ATTR);
+            keysToRemove.add(FAILED_PROC_ID_ATTR);
+            keysToRemove.add(FAILED_LAST_ACK_IDX);
+            flowFile = session.removeAllAttributes(flowFile, keysToRemove);
+        }
+        return flowFile;
+    }
+
+    /**
+     *
+     */
+    private Integer determinePartition(ProcessContext context, FlowFile 
flowFile) {
+        String partitionStrategy = 
context.getProperty(PARTITION_STRATEGY).getValue();
+        Integer partitionValue = null;
+        if 
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) {
+            String pv = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+            if (pv != null){
+                partitionValue = 
Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
+            }
+        }
+        return partitionValue;
+    }
+
+    /**
+     * Builds a {@link Map} of FAILED_* attributes
+     *
+     * @see #FAILED_PROC_ID_ATTR
+     * @see #FAILED_LAST_ACK_IDX
+     * @see #FAILED_TOPIC_ATTR
+     * @see #FAILED_KEY_ATTR
+     * @see #FAILED_DELIMITER_ATTR
+     */
+    private Map<String, String> buildFailedFlowFileAttributes(int 
lastAckedMessageIndex, FlowFile sourceFlowFile,
+            ProcessContext context) {
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier());
+        attributes.put(FAILED_LAST_ACK_IDX, 
String.valueOf(lastAckedMessageIndex));
+        attributes.put(FAILED_TOPIC_ATTR, 
context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue());
+        attributes.put(FAILED_KEY_ATTR, 
context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue());
+        attributes.put(FAILED_DELIMITER_ATTR, 
context.getProperty(MESSAGE_DELIMITER).isSet()
+                ? 
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(sourceFlowFile).getValue()
+                : null);
+        return attributes;
+    }
+
+    /**
+     *
+     */
+    private Properties buildKafkaConfigProperties(final ProcessContext 
context) {
+        Properties properties = new Properties();
+        String timeout = 
String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
+        properties.setProperty("bootstrap.servers", 
context.getProperty(SEED_BROKERS).getValue());
+        properties.setProperty("acks", 
context.getProperty(DELIVERY_GUARANTEE).getValue());
+        properties.setProperty("buffer.memory", 
String.valueOf(context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue()));
+        properties.setProperty("compression.type", 
context.getProperty(COMPRESSION_CODEC).getValue());
+        properties.setProperty("batch.size", 
context.getProperty(BATCH_NUM_MESSAGES).getValue());
+
+        properties.setProperty("client.id", 
context.getProperty(CLIENT_NAME).getValue());
+        Long queueBufferingMillis = 
context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
+        if (queueBufferingMillis != null) {
+            properties.setProperty("linger.ms", 
String.valueOf(queueBufferingMillis));
+        }
+        properties.setProperty("max.request.size", 
String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue()));
+        properties.setProperty("timeout.ms", timeout);
+        properties.setProperty("metadata.fetch.timeout.ms", timeout);
+
+        String partitionStrategy = 
context.getProperty(PARTITION_STRATEGY).getValue();
+        String partitionerClass = null;
+        if 
(partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
+            partitionerClass = 
Partitioners.RoundRobinPartitioner.class.getName();
+        } else if 
(partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
+            partitionerClass = Partitioners.RandomPartitioner.class.getName();
+        }
+        properties.setProperty("partitioner.class", partitionerClass);
+
+        // Set Dynamic Properties
+        for (final Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+            PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                if (properties.containsKey(descriptor.getName())) {
+                    this.getLogger().warn("Overriding existing property '" + 
descriptor.getName() + "' which had value of '"
+                                    + 
properties.getProperty(descriptor.getName()) + "' with dynamically set value '"
+                                    + entry.getValue() + "'.");
+                }
+                properties.setProperty(descriptor.getName(), entry.getValue());
+            }
+        }
+        return properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..b478d9f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.kafka.GetKafka
+org.apache.nifi.processors.kafka.PutKafka
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
new file mode 100644
index 0000000..10c7082
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
@@ -0,0 +1,45 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>GetKafka</title>
+        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+        <!-- Processor Documentation 
================================================== -->
+        <h2>Description:</h2>
+        <p>
+            This Processors polls <a href="http://kafka.apache.org/";>Apache 
Kafka</a>
+            for data. When a message is received from Kafka, this Processor 
emits a FlowFile
+            where the content of the FlowFile is the value of the Kafka 
message. If the
+            message has a key associated with it, an attribute named 
<code>kafka.key</code>
+            will be added to the FlowFile, with the value being the UTF-8 
Encoded value
+            of the Message's Key.
+        </p>
+        <p>
+            Kafka supports the notion of a Consumer Group when pulling 
messages in order to
+            provide scalability while still offering a publish-subscribe 
interface. Each
+            Consumer Group must have a unique identifier. The Consumer Group 
identifier that
+            is used by NiFi is the UUID of the Processor. This means that all 
of the nodes
+            within a cluster will use the same Consumer Group Identifier so 
that they do
+            not receive duplicate data but multiple GetKafka Processors can be 
used to pull
+            from multiple Topics, as each Processor will receive a different 
Processor UUID 
+            and therefore a different Consumer Group Identifier.
+        </p>
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
new file mode 100644
index 0000000..d51ce95
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
@@ -0,0 +1,45 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>PutKafka</title>
+        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+        <!-- Processor Documentation 
================================================== -->
+        <h2>Description:</h2>
+        <p>
+            This Processors puts the contents of a FlowFile to a Topic in 
+            <a href="http://kafka.apache.org/";>Apache Kafka</a>. The full 
contents of
+            a FlowFile becomes the contents of a single message in Kafka.
+            This message is optionally assigned a key by using the
+            &lt;Kafka Key&gt; Property.
+        </p>
+
+        <p>
+            The Processor allows the user to configure an optional Message 
Delimiter that
+            can be used to send many messages per FlowFile. For example, a \n 
could be used
+            to indicate that the contents of the FlowFile should be used to 
send one message
+            per line of text. If the property is not set, the entire contents 
of the FlowFile
+            will be sent as a single message. When using the delimiter, if 
some messages are
+            successfully sent but other messages fail to send, the FlowFile 
will be FORKed into
+            two child FlowFiles, with the successfully sent messages being 
routed to 'success'
+            and the messages that could not be sent going to 'failure'.
+        </p>
+    </body>
+</html>

Reply via email to