http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
new file mode 100644
index 0000000..e5255f5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.xml.bind.DatatypeConverter;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import static 
org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURITY_PROTOCOL;
+
+@CapabilityDescription("Consumes messages from Apache Kafka specifically built 
against the Kafka 0.9 Consumer API. "
+        + " Please note there are cases where the publisher can get into an 
indefinite stuck state.  We are closely monitoring"
+        + " how this evolves in the Kafka community and will take advantage of 
those fixes as soon as we can.  In the mean time"
+        + " it is possible to enter states where the only resolution will be 
to restart the JVM NiFi runs on.")
+@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", 
"0.9.x"})
+@WritesAttributes({
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description 
= "The number of messages written if more than one"),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY_HEX, 
description = "The hex encoded key of message if present and if single 
message"),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description 
= "The offset of the message in the partition of the topic."),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, 
description = "The partition of the topic the message or message bundle is 
from"),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description 
= "The topic the message or message bundle is from")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
+        description = "These properties will be added on the Kafka 
configuration after loading any provided configuration properties."
+        + " In the event a dynamic property represents a property that was 
already set, its value will be ignored and WARN message logged."
+        + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration. ")
+public class ConsumeKafka extends AbstractProcessor {
+
+    private static final long TWO_MB = 2L * 1024L * 1024L;
+
+    static final AllowableValue OFFSET_EARLIEST = new 
AllowableValue("earliest", "earliest", "Automatically reset the offset to the 
earliest offset");
+
+    static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", 
"latest", "Automatically reset the offset to the latest offset");
+
+    static final AllowableValue OFFSET_NONE = new AllowableValue("none", 
"none", "Throw exception to the consumer if no previous offset is found for the 
consumer's group");
+
+    static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
+            .name("topic")
+            .displayName("Topic Name(s)")
+            .description("The name of the Kafka Topic(s) to pull from. More 
than one can be supplied if comma seperated.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
+            .name(ConsumerConfig.GROUP_ID_CONFIG)
+            .displayName("Group ID")
+            .description("A Group ID is used to identify consumers that are 
within the same consumer group. Corresponds to Kafka's 'group.id' property.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+
+    static final PropertyDescriptor AUTO_OFFSET_RESET = new 
PropertyDescriptor.Builder()
+            .name(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+            .displayName("Offset Reset")
+            .description("Allows you to manage the condition when there is no 
initial offset in Kafka or if the current offset does not exist any "
+                    + "more on the server (e.g. because that data has been 
deleted). Corresponds to Kafka's 'auto.offset.reset' property.")
+            .required(true)
+            .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE)
+            .defaultValue(OFFSET_LATEST.getValue())
+            .build();
+
+    static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
+            .name("message-demarcator")
+            .displayName("Message Demarcator")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .description("Since KafkaConsumer receives messages in batches, 
you have an option to output FlowFiles which contains "
+                    + "all Kafka messages in a single batch for a given topic 
and partition and this property allows you to provide a string (interpreted as 
UTF-8) to use "
+                    + "for demarcating apart multiple Kafka messages. This is 
an optional property and if not provided each Kafka message received "
+                    + "will result in a single FlowFile which  "
+                    + "time it is triggered. To enter special character such 
as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
+            .build();
+    static final PropertyDescriptor MAX_POLL_RECORDS = new 
PropertyDescriptor.Builder()
+            .name("max.poll.records")
+            .displayName("Max Poll Records")
+            .description("Specifies the maximum number of records Kafka should 
return in a single poll.")
+            .required(false)
+            .defaultValue("10000")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles received from Kafka.  Depending on 
demarcation strategy it is a flow file per message or a bundle of messages 
grouped by topic and partition.")
+            .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS;
+    static final Set<Relationship> RELATIONSHIPS;
+
+    private volatile byte[] demarcatorBytes = null;
+    private volatile ConsumerPool consumerPool = null;
+
+    static {
+        List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
+        descriptors.add(TOPICS);
+        descriptors.add(GROUP_ID);
+        descriptors.add(AUTO_OFFSET_RESET);
+        descriptors.add(MESSAGE_DEMARCATOR);
+        descriptors.add(MAX_POLL_RECORDS);
+        DESCRIPTORS = Collections.unmodifiableList(descriptors);
+        RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnScheduled
+    public void prepareProcessing(final ProcessContext context) {
+        this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet()
+                ? 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
+                : null;
+    }
+
+    @OnStopped
+    public void close() {
+        demarcatorBytes = null;
+        final ConsumerPool pool = consumerPool;
+        consumerPool = null;
+        if (pool != null) {
+            pool.close();
+        }
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
+                .name(propertyDescriptorName).addValidator(new 
KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true)
+                .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+    }
+
+    private synchronized ConsumerPool getConsumerPool(final ProcessContext 
context) {
+        ConsumerPool pool = consumerPool;
+        if (pool != null) {
+            return pool;
+        }
+
+        final Map<String, String> props = new HashMap<>();
+        KafkaProcessorUtils.buildCommonKafkaProperties(context, 
ConsumerConfig.class, props);
+        final String topicListing = 
context.getProperty(TOPICS).evaluateAttributeExpressions().getValue();
+        final List<String> topics = new ArrayList<>();
+        for (final String topic : topicListing.split(",", 100)) {
+            final String trimmedName = topic.trim();
+            if (!trimmedName.isEmpty()) {
+                topics.add(trimmedName);
+            }
+        }
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        return consumerPool = 
createConsumerPool(context.getMaxConcurrentTasks(), topics, props, getLogger());
+    }
+
+    protected ConsumerPool createConsumerPool(final int maxLeases, final 
List<String> topics, final Map<String, String> props, final ComponentLog log) {
+        return new ConsumerPool(maxLeases, topics, props, log);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final long startTimeNanos = System.nanoTime();
+        final ConsumerPool pool = getConsumerPool(context);
+        if (pool == null) {
+            context.yield();
+            return;
+        }
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecordMap = new HashMap<>();
+
+        try (final ConsumerLease lease = pool.obtainConsumer()) {
+            try {
+                if (lease == null) {
+                    context.yield();
+                    return;
+                }
+
+                final boolean foundData = gatherDataFromKafka(lease, 
partitionRecordMap, context);
+                if (!foundData) {
+                    session.rollback();
+                    return;
+                }
+
+                writeSessionData(context, session, partitionRecordMap, 
startTimeNanos);
+                //At-least once commit handling (if order is reversed it is 
at-most once)
+                session.commit();
+                commitOffsets(lease, partitionRecordMap);
+            } catch (final KafkaException ke) {
+                lease.poison();
+                getLogger().error("Problem while accessing kafka consumer " + 
ke, ke);
+                context.yield();
+                session.rollback();
+            }
+        }
+    }
+
+    private void commitOffsets(final ConsumerLease lease, final 
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap) {
+        final Map<TopicPartition, OffsetAndMetadata> partOffsetMap = new 
HashMap<>();
+        partitionRecordMap.entrySet().stream()
+                .filter(entry -> !entry.getValue().isEmpty())
+                .forEach((entry) -> {
+                    long maxOffset = entry.getValue().stream()
+                            .mapToLong(record -> record.offset())
+                            .max()
+                            .getAsLong();
+                    partOffsetMap.put(entry.getKey(), new 
OffsetAndMetadata(maxOffset + 1L));
+                });
+        lease.commitOffsets(partOffsetMap);
+    }
+
+    private void writeSessionData(
+            final ProcessContext context, final ProcessSession session,
+            final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecordMap,
+            final long startTimeNanos) {
+        if (demarcatorBytes != null) {
+            partitionRecordMap.entrySet().stream()
+                    .filter(entry -> !entry.getValue().isEmpty())
+                    .forEach(entry -> {
+                        writeData(context, session, entry.getValue(), 
startTimeNanos);
+                    });
+        } else {
+            partitionRecordMap.entrySet().stream()
+                    .filter(entry -> !entry.getValue().isEmpty())
+                    .flatMap(entry -> entry.getValue().stream())
+                    .forEach(record -> {
+                        writeData(context, session, 
Collections.singletonList(record), startTimeNanos);
+                    });
+        }
+    }
+
+    private void writeData(final ProcessContext context, final ProcessSession 
session, final List<ConsumerRecord<byte[], byte[]>> records, final long 
startTimeNanos) {
+        final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
+        final String offset = String.valueOf(firstRecord.offset());
+        final String keyHex = (firstRecord.key() != null) ? 
DatatypeConverter.printHexBinary(firstRecord.key()) : null;
+        final String topic = firstRecord.topic();
+        final String partition = String.valueOf(firstRecord.partition());
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, out -> {
+            boolean useDemarcator = false;
+            for (final ConsumerRecord<byte[], byte[]> record : records) {
+                if (useDemarcator) {
+                    out.write(demarcatorBytes);
+                }
+                out.write(record.value());
+                useDemarcator = true;
+            }
+        });
+        final Map<String, String> kafkaAttrs = new HashMap<>();
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
+        if (keyHex != null && records.size() == 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY_HEX, keyHex);
+        }
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);
+        if (records.size() > 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, 
String.valueOf(records.size()));
+        }
+        flowFile = session.putAllAttributes(flowFile, kafkaAttrs);
+        final long executionDurationMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
+        final String transitUri = KafkaProcessorUtils.buildTransitURI(
+                context.getProperty(SECURITY_PROTOCOL).getValue(),
+                
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(),
+                topic);
+        session.getProvenanceReporter().receive(flowFile, transitUri, 
executionDurationMillis);
+        this.getLogger().debug("Created {} containing {} messages from Kafka 
topic {}, partition {}, starting offset {} in {} millis",
+                new Object[]{flowFile, records.size(), topic, partition, 
offset, executionDurationMillis});
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+    /**
+     * Populates the given partitionRecordMap with new records until we poll
+     * that returns no records or until we have enough data. It is important to
+     * ensure we keep items grouped by their topic and partition so that when 
we
+     * bundle them we bundle them intelligently and so that we can set offsets
+     * properly even across multiple poll calls.
+     */
+    private boolean gatherDataFromKafka(final ConsumerLease lease, final 
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, 
ProcessContext context) {
+        final long startNanos = System.nanoTime();
+        boolean foundData = false;
+        ConsumerRecords<byte[], byte[]> records;
+        final int maxRecords = 
context.getProperty(MAX_POLL_RECORDS).asInteger();
+
+        do {
+            records = lease.poll();
+
+            for (final TopicPartition partition : records.partitions()) {
+                List<ConsumerRecord<byte[], byte[]>> currList = 
partitionRecordMap.get(partition);
+                if (currList == null) {
+                    currList = new ArrayList<>();
+                    partitionRecordMap.put(partition, currList);
+                }
+                currList.addAll(records.records(partition));
+                if (currList.size() > 0) {
+                    foundData = true;
+                }
+            }
+            //If we received data and we still want to get more
+        } while (!records.isEmpty() && 
!checkIfGatheredEnoughData(partitionRecordMap, maxRecords, startNanos));
+        return foundData;
+    }
+
+    /**
+     * Determines if we have enough data as-is and should move on.
+     *
+     * @return true if we've been gathering for more than 500 ms or if we're
+     * demarcating and have more than 50 flowfiles worth or if we're per 
message
+     * and have more than 2000 flowfiles or if totalMessageSize is greater than
+     * two megabytes; false otherwise
+     *
+     * Implementation note: 500 millis and 5 MB are magic numbers. These may
+     * need to be tuned. They get at how often offsets will get committed to
+     * kafka relative to how many records will get buffered into memory in a
+     * poll call before writing to repos.
+     */
+    private boolean checkIfGatheredEnoughData(final Map<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, final long 
maxRecords, final long startTimeNanos) {
+
+        final long durationMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
+
+        if (durationMillis > 500) {
+            return true;
+        }
+
+        int topicPartitionsFilled = 0;
+        int totalRecords = 0;
+        long totalRecordSize = 0;
+
+        for (final List<ConsumerRecord<byte[], byte[]>> recordList : 
partitionRecordMap.values()) {
+            if (!recordList.isEmpty()) {
+                topicPartitionsFilled++;
+            }
+            totalRecords += recordList.size();
+            for (final ConsumerRecord<byte[], byte[]> rec : recordList) {
+                totalRecordSize += rec.value().length;
+            }
+        }
+
+        if (demarcatorBytes != null && demarcatorBytes.length > 0) {
+            return topicPartitionsFilled > 50;
+        } else if (totalRecordSize > TWO_MB) {
+            return true;
+        } else {
+            return totalRecords > maxRecords;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
new file mode 100644
index 0000000..b954eba
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * This class represents a lease to access a Kafka Consumer object. The lease 
is
+ * intended to be obtained from a ConsumerPool. The lease is closeable to allow
+ * for the clean model of a try w/resources whereby non-exceptional cases mean
+ * the lease will be returned to the pool for future use by others. A given
+ * lease may only belong to a single thread a time.
+ */
+public interface ConsumerLease extends Closeable {
+
+    /**
+     * Executes a poll on the underlying Kafka Consumer.
+     *
+     * @return ConsumerRecords retrieved in the poll.
+     * @throws KafkaException if issue occurs talking to underlying resource.
+     */
+    ConsumerRecords<byte[], byte[]> poll() throws KafkaException;
+
+    /**
+     * Notifies Kafka to commit the offsets for the specified topic/partition
+     * pairs to the specified offsets w/the given metadata. This can offer
+     * higher performance than the other commitOffsets call as it allows the
+     * kafka client to collect more data from Kafka before committing the
+     * offsets.
+     *
+     * @param offsets offsets
+     * @throws KafkaException if issue occurs talking to underlying resource.
+     */
+    void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws 
KafkaException;
+
+    /**
+     * Notifies that this lease is poisoned and should not be reused.
+     */
+    void poison();
+
+    /**
+     * Notifies that this lease is to be returned. The pool may optionally 
reuse
+     * this lease with another client. No further references by the caller
+     * should occur after calling close.
+     */
+    @Override
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
new file mode 100644
index 0000000..3f20b8f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A pool of Kafka Consumers for a given topic. Consumers can be obtained by
+ * calling 'obtainConsumer'. Once closed the pool is ready to be immediately
+ * used again.
+ */
+public class ConsumerPool implements Closeable {
+
+    private final AtomicInteger activeLeaseCount = new AtomicInteger(0);
+    private final int maxLeases;
+    private final Queue<ConsumerLease> consumerLeases;
+    private final List<String> topics;
+    private final Map<String, Object> kafkaProperties;
+    private final ComponentLog logger;
+
+    private final AtomicLong consumerCreatedCountRef = new AtomicLong();
+    private final AtomicLong consumerClosedCountRef = new AtomicLong();
+    private final AtomicLong leasesObtainedCountRef = new AtomicLong();
+    private final AtomicLong productivePollCountRef = new AtomicLong();
+    private final AtomicLong unproductivePollCountRef = new AtomicLong();
+
+    /**
+     * Creates a pool of KafkaConsumer objects that will grow up to the maximum
+     * indicated leases. Consumers are lazily initialized.
+     *
+     * @param maxLeases maximum number of active leases in the pool
+     * @param topics the topics to consume from
+     * @param kafkaProperties the properties for each consumer
+     * @param logger the logger to report any errors/warnings
+     */
+    public ConsumerPool(final int maxLeases, final List<String> topics, final 
Map<String, String> kafkaProperties, final ComponentLog logger) {
+        this.maxLeases = maxLeases;
+        if (maxLeases <= 0) {
+            throw new IllegalArgumentException("Max leases value must be 
greather than zero.");
+        }
+        this.logger = logger;
+        if (topics == null || topics.isEmpty()) {
+            throw new IllegalArgumentException("Must have a list of one or 
more topics");
+        }
+        this.topics = topics;
+        this.kafkaProperties = new HashMap<>(kafkaProperties);
+        this.consumerLeases = new ArrayDeque<>();
+    }
+
+    /**
+     * Obtains a consumer from the pool if one is available
+     *
+     * @return consumer from the pool
+     * @throws IllegalArgumentException if pool already contains
+     */
+    public ConsumerLease obtainConsumer() {
+        final ConsumerLease lease;
+        final int activeLeases;
+        synchronized (this) {
+            lease = consumerLeases.poll();
+            activeLeases = activeLeaseCount.get();
+        }
+        if (lease == null && activeLeases >= maxLeases) {
+            logger.warn("No available consumers and cannot create any as max 
consumer leases limit reached - verify pool settings");
+            return null;
+        }
+        leasesObtainedCountRef.incrementAndGet();
+        return (lease == null) ? createConsumer() : lease;
+    }
+
+    protected Consumer<byte[], byte[]> createKafkaConsumer() {
+        return new KafkaConsumer<>(kafkaProperties);
+    }
+
+    private ConsumerLease createConsumer() {
+        final Consumer<byte[], byte[]> kafkaConsumer = createKafkaConsumer();
+        consumerCreatedCountRef.incrementAndGet();
+        try {
+            kafkaConsumer.subscribe(topics);
+        } catch (final KafkaException kex) {
+            try {
+                kafkaConsumer.close();
+                consumerClosedCountRef.incrementAndGet();
+            } catch (final Exception ex) {
+                consumerClosedCountRef.incrementAndGet();
+                //ignore
+            }
+            throw kex;
+        }
+
+        final ConsumerLease lease = new ConsumerLease() {
+
+            private volatile boolean poisoned = false;
+            private volatile boolean closed = false;
+
+            @Override
+            public ConsumerRecords<byte[], byte[]> poll() {
+
+                if (poisoned) {
+                    throw new KafkaException("The consumer is poisoned and 
should no longer be used");
+                }
+
+                try {
+                    final ConsumerRecords<byte[], byte[]> records = 
kafkaConsumer.poll(50);
+                    if (records.isEmpty()) {
+                        unproductivePollCountRef.incrementAndGet();
+                    } else {
+                        productivePollCountRef.incrementAndGet();
+                    }
+                    return records;
+                } catch (final KafkaException kex) {
+                    logger.warn("Unable to poll from Kafka consumer so will 
poison and close this " + kafkaConsumer, kex);
+                    poison();
+                    close();
+                    throw kex;
+                }
+            }
+
+            @Override
+            public void commitOffsets(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
+
+                if (poisoned) {
+                    throw new KafkaException("The consumer is poisoned and 
should no longer be used");
+                }
+                try {
+                    kafkaConsumer.commitSync(offsets);
+                } catch (final KafkaException kex) {
+                    logger.warn("Unable to commit kafka consumer offsets so 
will poison and close this " + kafkaConsumer, kex);
+                    poison();
+                    close();
+                    throw kex;
+                }
+            }
+
+            @Override
+            public void close() {
+                if (closed) {
+                    return;
+                }
+                if (poisoned || activeLeaseCount.get() > maxLeases) {
+                    closeConsumer(kafkaConsumer);
+                    activeLeaseCount.decrementAndGet();
+                    closed = true;
+                } else {
+                    final boolean added;
+                    synchronized (ConsumerPool.this) {
+                        added = consumerLeases.offer(this);
+                    }
+                    if (!added) {
+                        closeConsumer(kafkaConsumer);
+                        activeLeaseCount.decrementAndGet();
+                    }
+                }
+            }
+
+            @Override
+            public void poison() {
+                poisoned = true;
+            }
+        };
+        activeLeaseCount.incrementAndGet();
+        return lease;
+    }
+
+    /**
+     * Closes all consumers in the pool. Can be safely recalled.
+     */
+    @Override
+    public void close() {
+        final List<ConsumerLease> leases = new ArrayList<>();
+        synchronized (this) {
+            ConsumerLease lease = null;
+            while ((lease = consumerLeases.poll()) != null) {
+                leases.add(lease);
+            }
+        }
+        for (final ConsumerLease lease : leases) {
+            lease.poison();
+            lease.close();
+        }
+    }
+
+    private void closeConsumer(final Consumer consumer) {
+        try {
+            consumer.unsubscribe();
+        } catch (Exception e) {
+            logger.warn("Failed while unsubscribing " + consumer, e);
+        }
+
+        try {
+            consumer.close();
+            consumerClosedCountRef.incrementAndGet();
+        } catch (Exception e) {
+            consumerClosedCountRef.incrementAndGet();
+            logger.warn("Failed while closing " + consumer, e);
+        }
+    }
+
+    PoolStats getPoolStats() {
+        return new PoolStats(consumerCreatedCountRef.get(), 
consumerClosedCountRef.get(), leasesObtainedCountRef.get(), 
productivePollCountRef.get(), unproductivePollCountRef.get());
+    }
+
+    static final class PoolStats {
+
+        final long consumerCreatedCount;
+        final long consumerClosedCount;
+        final long leasesObtainedCount;
+        final long productivePollCount;
+        final long unproductivePollCount;
+
+        PoolStats(
+                final long consumerCreatedCount,
+                final long consumerClosedCount,
+                final long leasesObtainedCount,
+                final long productivePollCount,
+                final long unproductivePollCount
+        ) {
+            this.consumerCreatedCount = consumerCreatedCount;
+            this.consumerClosedCount = consumerClosedCount;
+            this.leasesObtainedCount = leasesObtainedCount;
+            this.productivePollCount = productivePollCount;
+            this.unproductivePollCount = unproductivePollCount;
+        }
+
+        @Override
+        public String toString() {
+            return "Created Consumers [" + consumerCreatedCount + "]\n"
+                    + "Closed Consumers  [" + consumerClosedCount + "]\n"
+                    + "Leases Obtained   [" + leasesObtainedCount + "]\n"
+                    + "Productive Polls  [" + productivePollCount + "]\n"
+                    + "Unproductive Polls  [" + unproductivePollCount + "]\n";
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
new file mode 100644
index 0000000..fd747fc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.CommonClientConfigs;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class KafkaProcessorUtils {
+
+    final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
+
+    private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + 
"(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
+
+    static final Pattern HEX_KEY_PATTERN = 
Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
+
+    static final String KAFKA_KEY_HEX = "kafka.key.hex";
+    static final String KAFKA_TOPIC = "kafka.topic";
+    static final String KAFKA_PARTITION = "kafka.partition";
+    static final String KAFKA_OFFSET = "kafka.offset";
+    static final String KAFKA_COUNT = "kafka.count";
+    static final AllowableValue SEC_PLAINTEXT = new 
AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
+    static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", 
"SSL");
+    static final AllowableValue SEC_SASL_PLAINTEXT = new 
AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
+    static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", 
"SASL_SSL", "SASL_SSL");
+
+    static final PropertyDescriptor BOOTSTRAP_SERVERS = new 
PropertyDescriptor.Builder()
+            .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
+            .displayName("Kafka Brokers")
+            .description("A comma-separated list of known Kafka Brokers in the 
format <host>:<port>")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
+            .expressionLanguageSupported(true)
+            .defaultValue("localhost:9092")
+            .build();
+    static final PropertyDescriptor SECURITY_PROTOCOL = new 
PropertyDescriptor.Builder()
+            .name("security.protocol")
+            .displayName("Security Protocol")
+            .description("Protocol used to communicate with brokers. 
Corresponds to Kafka's 'security.protocol' property.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, 
SEC_SASL_SSL)
+            .defaultValue(SEC_PLAINTEXT.getValue())
+            .build();
+    static final PropertyDescriptor KERBEROS_PRINCIPLE = new 
PropertyDescriptor.Builder()
+            .name("sasl.kerberos.service.name")
+            .displayName("Kerberos Service Name")
+            .description("The Kerberos principal name that Kafka runs as. This 
can be defined either in Kafka's JAAS config or in Kafka's config. "
+                    + "Corresponds to Kafka's 'security.protocol' property."
+                    + "It is ignored unless one of the SASL options of the 
<Security Protocol> are selected.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+
+    static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("ssl.context.service")
+            .displayName("SSL Context Service")
+            .description("Specifies the SSL Context Service to use for 
communicating with Kafka.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    static List<PropertyDescriptor> getCommonPropertyDescriptors() {
+        return Arrays.asList(
+                BOOTSTRAP_SERVERS,
+                SECURITY_PROTOCOL,
+                KERBEROS_PRINCIPLE,
+                SSL_CONTEXT_SERVICE
+        );
+    }
+
+    static Collection<ValidationResult> validateCommonProperties(final 
ValidationContext validationContext) {
+        List<ValidationResult> results = new ArrayList<>();
+
+        String securityProtocol = 
validationContext.getProperty(SECURITY_PROTOCOL).getValue();
+
+        /*
+         * validates that if one of SASL (Kerberos) option is selected for
+         * security protocol, then Kerberos principal is provided as well
+         */
+        if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || 
SEC_SASL_SSL.getValue().equals(securityProtocol)) {
+            String kerberosPrincipal = 
validationContext.getProperty(KERBEROS_PRINCIPLE).getValue();
+            if (kerberosPrincipal == null || kerberosPrincipal.trim().length() 
== 0) {
+                results.add(new 
ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
+                        .explanation("The <" + 
KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
+                                + SECURITY_PROTOCOL.getDisplayName() + "> is 
configured as '"
+                                + SEC_SASL_PLAINTEXT.getValue() + "' or '" + 
SEC_SASL_SSL.getValue() + "'.")
+                        .build());
+            }
+        }
+
+        //If SSL or SASL_SSL then CS must be set.
+        final boolean sslProtocol = 
SEC_SSL.getValue().equals(securityProtocol) || 
SEC_SASL_SSL.getValue().equals(securityProtocol);
+        final boolean csSet = 
validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
+        if (csSet && !sslProtocol) {
+            results.add(new 
ValidationResult.Builder().subject(SECURITY_PROTOCOL.getDisplayName()).valid(false)
+                    .explanation("If you set the SSL Controller Service you 
should also choose an SSL based security protocol.").build());
+        }
+        if (!csSet && sslProtocol) {
+            results.add(new 
ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false)
+                    .explanation("If you set to an SSL based protocol you need 
to set the SSL Controller Service").build());
+        }
+
+        final String enableAutoCommit = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue();
+        if (enableAutoCommit != null && 
!enableAutoCommit.toLowerCase().equals("false")) {
+            results.add(new 
ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                    .explanation("Enable auto commit must be false.  It is 
managed by the processor.").build());
+        }
+
+        final String keySerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue();
+        if (keySerializer != null && 
!ByteArraySerializer.class.getName().equals(keySerializer)) {
+            results.add(new 
ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+                    .explanation("Key Serializer must be " + 
ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
+        }
+
+        final String valueSerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue();
+        if (valueSerializer != null && 
!ByteArraySerializer.class.getName().equals(valueSerializer)) {
+            results.add(new 
ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
+                    .explanation("Value Serializer must be " + 
ByteArraySerializer.class.getName() + "' was '" + valueSerializer + 
"'").build());
+        }
+
+        final String keyDeSerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue();
+        if (keyDeSerializer != null && 
!ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) {
+            results.add(new 
ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)
+                    .explanation("Key De-Serializer must be '" + 
ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + 
"'").build());
+        }
+
+        final String valueDeSerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue();
+        if (valueDeSerializer != null && 
!ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) {
+            results.add(new 
ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
+                    .explanation("Value De-Serializer must be " + 
ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + 
"'").build());
+        }
+
+        return results;
+    }
+
+    static final class KafkaConfigValidator implements Validator {
+
+        final Class<?> classType;
+
+        public KafkaConfigValidator(final Class classType) {
+            this.classType = classType;
+        }
+
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            final boolean knownValue = 
KafkaProcessorUtils.isStaticStringFieldNamePresent(subject, classType, 
CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class);
+            return new 
ValidationResult.Builder().subject(subject).explanation("Must be a known 
configuration parameter for this kafka client").valid(knownValue).build();
+        }
+    };
+
+    /**
+     * Builds transit URI for provenance event. The transit URI will be in the
+     * form of &lt;security.protocol&gt;://&lt;bootstrap.servers&gt;/topic
+     */
+    static String buildTransitURI(String securityProtocol, String brokers, 
String topic) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(securityProtocol);
+        builder.append("://");
+        builder.append(brokers);
+        builder.append("/");
+        builder.append(topic);
+        return builder.toString();
+    }
+
+    static void buildCommonKafkaProperties(final ProcessContext context, final 
Class kafkaConfigClass, final Map<String, String> mapToPopulate) {
+        for (PropertyDescriptor propertyDescriptor : 
context.getProperties().keySet()) {
+            if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
+                // Translate SSLContext Service configuration into Kafka 
properties
+                final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+                if (sslContextService != null && 
sslContextService.isKeyStoreConfigured()) {
+                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
sslContextService.getKeyStoreFile());
+                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
sslContextService.getKeyStorePassword());
+                    final String keyPass = sslContextService.getKeyPassword() 
== null ? sslContextService.getKeyStorePassword() : 
sslContextService.getKeyPassword();
+                    mapToPopulate.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
keyPass);
+                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
sslContextService.getKeyStoreType());
+                }
+
+                if (sslContextService != null && 
sslContextService.isTrustStoreConfigured()) {
+                    
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
sslContextService.getTrustStoreFile());
+                    
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
sslContextService.getTrustStorePassword());
+                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
sslContextService.getTrustStoreType());
+                }
+            }
+
+            String pName = propertyDescriptor.getName();
+            String pValue = propertyDescriptor.isExpressionLanguageSupported()
+                    ? 
context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
+                    : context.getProperty(propertyDescriptor).getValue();
+            if (pValue != null) {
+                if (pName.endsWith(".ms")) { // kafka standard time notation
+                    pValue = 
String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), 
TimeUnit.MILLISECONDS));
+                }
+                if (isStaticStringFieldNamePresent(pName, kafkaConfigClass, 
CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
+                    mapToPopulate.put(pName, pValue);
+                }
+            }
+        }
+    }
+
+    private static boolean isStaticStringFieldNamePresent(final String name, 
final Class... classes) {
+        return 
KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
+    }
+
+    private static Set<String> getPublicStaticStringFieldValues(final Class... 
classes) {
+        final Set<String> strings = new HashSet<>();
+        for (final Class classType : classes) {
+            for (final Field field : classType.getDeclaredFields()) {
+                if (Modifier.isPublic(field.getModifiers()) && 
Modifier.isStatic(field.getModifiers()) && 
field.getType().equals(String.class)) {
+                    try {
+                        strings.add(String.valueOf(field.get(null)));
+                    } catch (IllegalArgumentException | IllegalAccessException 
ex) {
+                        //ignore
+                    }
+                }
+            }
+        }
+        return strings;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
new file mode 100644
index 0000000..31a084f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.stream.io.util.StreamDemarcator;
+
+/**
+ * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
+ * with sending contents of the {@link FlowFile}s to Kafka.
+ */
+class KafkaPublisher implements Closeable {
+
+    private final Producer<byte[], byte[]> kafkaProducer;
+
+    private volatile long ackWaitTime = 30000;
+
+    private final ComponentLog componentLog;
+
+    private final int ackCheckSize;
+
+    KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
+        this(kafkaProperties, 100, componentLog);
+    }
+
+    /**
+     * Creates an instance of this class as well as the instance of the
+     * corresponding Kafka {@link KafkaProducer} using provided Kafka
+     * configuration properties.
+     *
+     * @param kafkaProperties instance of {@link Properties} used to bootstrap
+     * {@link KafkaProducer}
+     */
+    KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog 
componentLog) {
+        this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
+        this.ackCheckSize = ackCheckSize;
+        this.componentLog = componentLog;
+    }
+
+    /**
+     * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to
+     * determine how many messages to Kafka will be sent from a provided
+     * {@link InputStream} (see {@link PublishingContext#getContentStream()}).
+     * It supports two publishing modes:
+     * <ul>
+     * <li>Sending all messages constructed from
+     * {@link StreamDemarcator#nextToken()} operation.</li>
+     * <li>Sending only unacknowledged messages constructed from
+     * {@link StreamDemarcator#nextToken()} operation.</li>
+     * </ul>
+     * The unacknowledged messages are determined from the value of
+     * {@link PublishingContext#getLastAckedMessageIndex()}.
+     * <br>
+     * This method assumes content stream affinity where it is expected that 
the
+     * content stream that represents the same Kafka message(s) will remain the
+     * same across possible retries. This is required specifically for cases
+     * where delimiter is used and a single content stream may represent
+     * multiple Kafka messages. The
+     * {@link PublishingContext#getLastAckedMessageIndex()} will provide the
+     * index of the last ACKed message, so upon retry only messages with the
+     * higher index are sent.
+     *
+     * @param publishingContext instance of {@link PublishingContext} which 
hold
+     * context information about the message(s) to be sent.
+     * @return The index of the last successful offset.
+     */
+    KafkaPublisherResult publish(PublishingContext publishingContext) {
+        StreamDemarcator streamTokenizer = new 
StreamDemarcator(publishingContext.getContentStream(),
+                publishingContext.getDelimiterBytes(), 
publishingContext.getMaxRequestSize());
+
+        int prevLastAckedMessageIndex = 
publishingContext.getLastAckedMessageIndex();
+        List<Future<RecordMetadata>> resultFutures = new ArrayList<>();
+
+        byte[] messageBytes;
+        int tokenCounter = 0;
+        boolean continueSending = true;
+        KafkaPublisherResult result = null;
+        for (; continueSending && (messageBytes = streamTokenizer.nextToken()) 
!= null; tokenCounter++) {
+            if (prevLastAckedMessageIndex < tokenCounter) {
+                ProducerRecord<byte[], byte[]> message = new 
ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), 
messageBytes);
+                resultFutures.add(this.kafkaProducer.send(message));
+
+                if (tokenCounter % this.ackCheckSize == 0) {
+                    int lastAckedMessageIndex = 
this.processAcks(resultFutures, prevLastAckedMessageIndex);
+                    resultFutures.clear();
+                    if (lastAckedMessageIndex % this.ackCheckSize != 0) {
+                        continueSending = false;
+                        result = new KafkaPublisherResult(tokenCounter, 
lastAckedMessageIndex);
+                    }
+                    prevLastAckedMessageIndex = lastAckedMessageIndex;
+                }
+            }
+        }
+
+        if (result == null) {
+            int lastAckedMessageIndex = this.processAcks(resultFutures, 
prevLastAckedMessageIndex);
+            resultFutures.clear();
+            result = new KafkaPublisherResult(tokenCounter, 
lastAckedMessageIndex);
+        }
+        return result;
+    }
+
+    /**
+     * Sets the time this publisher will wait for the {@link Future#get()}
+     * operation (the Future returned by
+     * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing
+     * out.
+     *
+     * This value will also be used as a timeout when closing the underlying
+     * {@link KafkaProducer}. See {@link #close()}.
+     */
+    void setAckWaitTime(long ackWaitTime) {
+        this.ackWaitTime = ackWaitTime;
+    }
+
+    /**
+     * This operation will process ACKs from Kafka in the order in which
+     * {@link KafkaProducer#send(ProducerRecord)} invocation were made 
returning
+     * the index of the last ACKed message. Within this operation processing 
ACK
+     * simply means successful invocation of 'get()' operation on the
+     * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)}
+     * operation. Upon encountering any type of error while interrogating such
+     * {@link Future} the ACK loop will end. Messages that were not ACKed would
+     * be considered non-delivered and therefore could be resent at the later
+     * time.
+     *
+     * @param sendFutures list of {@link Future}s representing results of
+     * publishing to Kafka
+     *
+     * @param lastAckMessageIndex the index of the last ACKed message. It is
+     * important to provide the last ACKed message especially while re-trying 
so
+     * the proper index is maintained.
+     */
+    private int processAcks(List<Future<RecordMetadata>> sendFutures, int 
lastAckMessageIndex) {
+        boolean exceptionThrown = false;
+        for (int segmentCounter = 0; segmentCounter < sendFutures.size() && 
!exceptionThrown; segmentCounter++) {
+            Future<RecordMetadata> future = sendFutures.get(segmentCounter);
+            try {
+                future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
+                lastAckMessageIndex++;
+            } catch (InterruptedException e) {
+                exceptionThrown = true;
+                Thread.currentThread().interrupt();
+                this.warnOrError("Interrupted while waiting for acks from 
Kafka", null);
+            } catch (ExecutionException e) {
+                exceptionThrown = true;
+                this.warnOrError("Failed while waiting for acks from Kafka", 
e);
+            } catch (TimeoutException e) {
+                exceptionThrown = true;
+                this.warnOrError("Timed out while waiting for acks from 
Kafka", null);
+            }
+        }
+
+        return lastAckMessageIndex;
+    }
+
+    /**
+     * Will close the underlying {@link KafkaProducer} waiting if necessary for
+     * the same duration as supplied {@link #setAckWaitTime(long)}
+     */
+    @Override
+    public void close() {
+        this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     *
+     */
+    private void warnOrError(String message, Exception e) {
+        if (e == null) {
+            this.componentLog.warn(message);
+        } else {
+            this.componentLog.error(message, e);
+        }
+    }
+
+    /**
+     * Encapsulates the result received from publishing messages to Kafka
+     */
+    static class KafkaPublisherResult {
+
+        private final int messagesSent;
+        private final int lastMessageAcked;
+
+        KafkaPublisherResult(int messagesSent, int lastMessageAcked) {
+            this.messagesSent = messagesSent;
+            this.lastMessageAcked = lastMessageAcked;
+        }
+
+        public int getMessagesSent() {
+            return this.messagesSent;
+        }
+
+        public int getLastMessageAcked() {
+            return this.lastMessageAcked;
+        }
+
+        public boolean isAllAcked() {
+            return this.lastMessageAcked > -1 && this.messagesSent - 1 == 
this.lastMessageAcked;
+        }
+
+        @Override
+        public String toString() {
+            return "Sent:" + this.messagesSent + "; Last ACK:" + 
this.lastMessageAcked;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
new file mode 100644
index 0000000..64ab4ce
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+
+/**
+ * Collection of implementation of common Kafka {@link Partitioner}s.
+ */
+final public class Partitioners {
+
+    private Partitioners() {
+    }
+
+    /**
+     * {@link Partitioner} that implements 'round-robin' mechanism which evenly
+     * distributes load between all available partitions.
+     */
+    public static class RoundRobinPartitioner implements Partitioner {
+
+        private volatile int index;
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            // noop
+        }
+
+        @Override
+        public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes, Cluster cluster) {
+            return 
this.next(cluster.availablePartitionsForTopic(topic).size());
+        }
+
+        @Override
+        public void close() {
+            // noop
+        }
+
+        private synchronized int next(int numberOfPartitions) {
+            if (this.index >= numberOfPartitions) {
+                this.index = 0;
+            }
+            return index++;
+        }
+    }
+}

Reply via email to