http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
deleted file mode 100644
index 7660305..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.ConsumerTimeoutException;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-
-@SupportsBatching
-@InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Fetches messages from Apache Kafka")
-@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
-@WritesAttributes({
-        @WritesAttribute(attribute = "kafka.topic", description = "The name of 
the Kafka Topic from which the message was received"),
-        @WritesAttribute(attribute = "kafka.key", description = "The key of 
the Kafka message, if it exists and batch size is 1. If"
-                + " the message does not have a key, or if the batch size is 
greater than 1, this attribute will not be added"),
-        @WritesAttribute(attribute = "kafka.partition", description = "The 
partition of the Kafka Topic from which the message was received. This 
attribute is added only if the batch size is 1"),
-        @WritesAttribute(attribute = "kafka.offset", description = "The offset 
of the message within the Kafka partition. This attribute is added only if the 
batch size is 1")})
-@DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
-            description = "These properties will be added on the Kafka 
configuration after loading any provided configuration properties."
-        + " In the event a dynamic property represents a property that was 
already set as part of the static properties, its value wil be"
-        + " overriden with warning message describing the override."
-        + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration.";)
-public class GetKafka extends AbstractProcessor {
-
-    public static final String SMALLEST = "smallest";
-    public static final String LARGEST = "largest";
-
-    public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new 
PropertyDescriptor.Builder()
-            .name("ZooKeeper Connection String")
-            .description("The Connection String to use in order to connect to 
ZooKeeper. This is often a comma-separated list of <host>:<port>"
-                    + " combinations. For example, 
host1:2181,host2:2181,host3:2188")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .build();
-    public static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
-            .name("Topic Name")
-            .description("The Kafka Topic to pull messages from")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .build();
-    public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new 
PropertyDescriptor.Builder()
-            .name("Zookeeper Commit Frequency")
-            .description("Specifies how often to communicate with ZooKeeper to 
indicate which messages have been pulled. A longer time period will"
-                    + " result in better overall performance but can result in 
more data duplication if a NiFi node is lost")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .defaultValue("60 secs")
-            .build();
-    public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new 
PropertyDescriptor.Builder()
-            .name("ZooKeeper Communications Timeout")
-            .description("The amount of time to wait for a response from 
ZooKeeper before determining that there is a communications error")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .defaultValue("30 secs")
-            .build();
-    public static final PropertyDescriptor KAFKA_TIMEOUT = new 
PropertyDescriptor.Builder()
-            .name("Kafka Communications Timeout")
-            .description("The amount of time to wait for a response from Kafka 
before determining that there is a communications error")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .defaultValue("30 secs")
-            .build();
-    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Batch Size")
-            .description("Specifies the maximum number of messages to combine 
into a single FlowFile. These messages will be "
-                    + "concatenated together with the <Message Demarcator> 
string placed between the content of each message. "
-                    + "If the messages from Kafka should not be concatenated 
together, leave this value at 1.")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .defaultValue("1")
-            .build();
-    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
-            .name("Message Demarcator")
-            .description("Specifies the characters to use in order to 
demarcate multiple messages from Kafka. If the <Batch Size> "
-                    + "property is set to 1, this value is ignored. Otherwise, 
for each two subsequent messages in the batch, "
-                    + "this value will be placed in between them.")
-            .required(true)
-            .addValidator(Validator.VALID) // accept anything as a demarcator, 
including empty string
-            .expressionLanguageSupported(false)
-            .defaultValue("\\n")
-            .build();
-
-    public static final PropertyDescriptor CLIENT_NAME = new 
PropertyDescriptor.Builder()
-            .name("Client Name")
-            .description("Client Name to use when communicating with Kafka")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .build();
-    public static final PropertyDescriptor GROUP_ID = new 
PropertyDescriptor.Builder()
-            .name("Group ID")
-            .description("A Group ID is used to identify consumers that are 
within the same consumer group")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .build();
-
-    public static final PropertyDescriptor AUTO_OFFSET_RESET = new 
PropertyDescriptor.Builder()
-            .name("Auto Offset Reset")
-            .description("Automatically reset the offset to the smallest or 
largest offset available on the broker")
-            .required(true)
-            .allowableValues(SMALLEST, LARGEST)
-            .defaultValue(LARGEST)
-            .build();
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("All FlowFiles that are created are routed to this 
relationship")
-            .build();
-
-    private final BlockingQueue<ConsumerIterator<byte[], byte[]>> 
streamIterators = new LinkedBlockingQueue<>();
-    private volatile ConsumerConnector consumer;
-
-    private final AtomicBoolean consumerStreamsReady = new AtomicBoolean();
-
-    private volatile long deadlockTimeout;
-
-    private volatile ExecutorService executor;
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final PropertyDescriptor clientNameWithDefault = new 
PropertyDescriptor.Builder()
-                .fromPropertyDescriptor(CLIENT_NAME)
-                .defaultValue("NiFi-" + getIdentifier())
-                .build();
-        final PropertyDescriptor groupIdWithDefault = new 
PropertyDescriptor.Builder()
-                .fromPropertyDescriptor(GROUP_ID)
-                .defaultValue(getIdentifier())
-                .build();
-
-        final List<PropertyDescriptor> props = new ArrayList<>();
-        props.add(ZOOKEEPER_CONNECTION_STRING);
-        props.add(TOPIC);
-        props.add(ZOOKEEPER_COMMIT_DELAY);
-        props.add(BATCH_SIZE);
-        props.add(MESSAGE_DEMARCATOR);
-        props.add(clientNameWithDefault);
-        props.add(groupIdWithDefault);
-        props.add(KAFKA_TIMEOUT);
-        props.add(ZOOKEEPER_TIMEOUT);
-        props.add(AUTO_OFFSET_RESET);
-        return props;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>(1);
-        relationships.add(REL_SUCCESS);
-        return relationships;
-    }
-
-    public void createConsumers(final ProcessContext context) {
-        final String topic = context.getProperty(TOPIC).getValue();
-
-        final Properties props = new Properties();
-        props.setProperty("zookeeper.connect", 
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
-        props.setProperty("group.id", 
context.getProperty(GROUP_ID).getValue());
-        props.setProperty("client.id", 
context.getProperty(CLIENT_NAME).getValue());
-        props.setProperty("auto.commit.interval.ms", 
String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
-        props.setProperty("auto.offset.reset", 
context.getProperty(AUTO_OFFSET_RESET).getValue());
-        props.setProperty("zookeeper.connection.timeout.ms", 
context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
-        props.setProperty("socket.timeout.ms", 
context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
-
-        for (final Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
-            PropertyDescriptor descriptor = entry.getKey();
-            if (descriptor.isDynamic()) {
-                if (props.containsKey(descriptor.getName())) {
-                    this.getLogger().warn("Overriding existing property '" + 
descriptor.getName() + "' which had value of '"
-                        + props.getProperty(descriptor.getName()) + "' with 
dynamically set value '" + entry.getValue() + "'.");
-                }
-                props.setProperty(descriptor.getName(), entry.getValue());
-            }
-        }
-
-        /*
-         * Unless user sets it to some explicit value we are setting it to the
-         * lowest possible value of 1 millisecond to ensure the
-         * consumerStream.hasNext() doesn't block. See
-         * http://kafka.apache.org/documentation.html#configuration) as well as
-         * comment in 'catch ConsumerTimeoutException' in onTrigger() for more
-         * explanation as to the reasoning behind it.
-         */
-        if (!props.containsKey("consumer.timeout.ms")) {
-            this.getLogger().info("Setting 'consumer.timeout.ms' to 1 
milliseconds to avoid consumer"
-                            + " block in the event when no events are present 
in Kafka topic. If you wish to change this value "
-                            + " set it as dynamic property. If you wish to 
explicitly enable consumer block (at your own risk)"
-                            + " set its value to -1.");
-            props.setProperty("consumer.timeout.ms", "1");
-        }
-
-        int partitionCount = KafkaUtils.retrievePartitionCountForTopic(
-                context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), 
context.getProperty(TOPIC).getValue());
-
-        final ConsumerConfig consumerConfig = new ConsumerConfig(props);
-        consumer = Consumer.createJavaConsumerConnector(consumerConfig);
-
-        final Map<String, Integer> topicCountMap = new HashMap<>(1);
-
-        int concurrentTaskToUse = context.getMaxConcurrentTasks();
-        if (context.getMaxConcurrentTasks() < partitionCount){
-            this.getLogger().warn("The amount of concurrent tasks '" + 
context.getMaxConcurrentTasks() + "' configured for "
-                    + "this processor is less than the amount of partitions '" 
+ partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + 
"'. "
-                + "Consider making it equal to the amount of partition count 
for most efficient event consumption.");
-        } else if (context.getMaxConcurrentTasks() > partitionCount){
-            concurrentTaskToUse = partitionCount;
-            this.getLogger().warn("The amount of concurrent tasks '" + 
context.getMaxConcurrentTasks() + "' configured for "
-                    + "this processor is greater than the amount of partitions 
'" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + 
"'. "
-                + "Therefore those tasks would never see a message. To avoid 
that the '" + partitionCount + "'(partition count) will be used to consume 
events");
-        }
-
-        topicCountMap.put(topic, concurrentTaskToUse);
-
-        final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
-        final List<KafkaStream<byte[], byte[]>> streams = 
consumerMap.get(topic);
-
-        this.streamIterators.clear();
-
-        for (final KafkaStream<byte[], byte[]> stream : streams) {
-            streamIterators.add(stream.iterator());
-        }
-        this.consumerStreamsReady.set(true);
-    }
-
-    @OnStopped
-    public void shutdownConsumer() {
-        this.consumerStreamsReady.set(false);
-        if (consumer != null) {
-            try {
-                consumer.commitOffsets();
-            } finally {
-                consumer.shutdown();
-            }
-        }
-        if (this.executor != null) {
-            this.executor.shutdown();
-            try {
-                if (!this.executor.awaitTermination(30000, 
TimeUnit.MILLISECONDS)) {
-                    this.executor.shutdownNow();
-                    getLogger().warn("Executor did not stop in 30 sec. 
Terminated.");
-                }
-                this.executor = null;
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
-                
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
-                .build();
-    }
-
-    @OnScheduled
-    public void schedule(ProcessContext context) {
-        this.deadlockTimeout = 
context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        /*
-         * Will ensure that consumer streams are ready upon the first 
invocation
-         * of onTrigger. Will be reset to 'false' in the event of exception
-         */
-        synchronized (this.consumerStreamsReady) {
-            if (this.executor == null || this.executor.isShutdown()) {
-                this.executor = Executors.newCachedThreadPool();
-            }
-            if (!this.consumerStreamsReady.get()) {
-                Future<Void> f = this.executor.submit(new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        createConsumers(context);
-                        return null;
-                    }
-                });
-                try {
-                    f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                    shutdownConsumer();
-                    f.cancel(true);
-                    Thread.currentThread().interrupt();
-                    getLogger().warn("Interrupted while waiting to get 
connection", e);
-                } catch (ExecutionException e) {
-                    throw new IllegalStateException(e);
-                } catch (TimeoutException e) {
-                    shutdownConsumer();
-                    f.cancel(true);
-                    getLogger().warn("Timed out after " + this.deadlockTimeout 
+ " milliseconds while waiting to get connection", e);
-                }
-            }
-        }
-        //===
-        if (this.consumerStreamsReady.get()) {
-            Future<Void> consumptionFuture = this.executor.submit(new 
Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    ConsumerIterator<byte[], byte[]> iterator = 
getStreamIterator();
-                    if (iterator != null) {
-                        consumeFromKafka(context, session, iterator);
-                    }
-                    return null;
-                }
-            });
-            try {
-                consumptionFuture.get(this.deadlockTimeout, 
TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                shutdownConsumer();
-                consumptionFuture.cancel(true);
-                Thread.currentThread().interrupt();
-                getLogger().warn("Interrupted while consuming messages", e);
-            } catch (ExecutionException e) {
-                throw new IllegalStateException(e);
-            } catch (TimeoutException e) {
-                shutdownConsumer();
-                consumptionFuture.cancel(true);
-                getLogger().warn("Timed out after " + this.deadlockTimeout + " 
milliseconds while consuming messages", e);
-            }
-        }
-    }
-
-    protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
-        return this.streamIterators.poll();
-    }
-
-
-    private void consumeFromKafka(final ProcessContext context, final 
ProcessSession session,
-            ConsumerIterator<byte[], byte[]> iterator) throws ProcessException 
{
-
-        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
-        final String demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
-        final byte[] demarcatorBytes = 
demarcator.getBytes(StandardCharsets.UTF_8);
-        final String topic = context.getProperty(TOPIC).getValue();
-
-        FlowFile flowFile = session.create();
-
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put("kafka.topic", topic);
-        final long start = System.nanoTime();
-        int msgCount = 0;
-
-        try {
-            for (; msgCount < batchSize && iterator.hasNext(); msgCount++) {
-                final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
-
-                if (batchSize == 1) {
-                    final byte[] key = mam.key();
-                    // the kafka.key, kafka.offset, and kafka.partition 
attributes are added only
-                    // for a batch size of 1.
-                    if (key != null) {
-                        attributes.put("kafka.key", new String(key, 
StandardCharsets.UTF_8));
-                    }
-
-                    attributes.put("kafka.offset", 
String.valueOf(mam.offset()));
-                    attributes.put("kafka.partition", 
String.valueOf(mam.partition()));
-                }
-
-                // add the message to the FlowFile's contents
-                final boolean firstMessage = msgCount == 0;
-                flowFile = session.append(flowFile, new OutputStreamCallback() 
{
-                    @Override
-                    public void process(final OutputStream out) throws 
IOException {
-                        if (!firstMessage) {
-                            out.write(demarcatorBytes);
-                        }
-                        out.write(mam.message());
-                    }
-                });
-            }
-            this.releaseFlowFile(flowFile, session, attributes, start, topic, 
msgCount);
-        } catch (ConsumerTimeoutException e) {
-            /*
-             * By default Kafka blocks indefinitely if topic is empty via
-             * stream.hasNext(). If 'consumer.timeout.ms' property is set (see
-             * http://kafka.apache.org/documentation.html#configuration) the
-             * hasNext() will fail with this exception. To this processor it
-             * simply means there are no messages and current task should exit
-             * in non-failure releasing the flow file if it was able to
-             * accumulate any events.
-             */
-            this.releaseFlowFile(flowFile, session, attributes, start, topic, 
msgCount);
-        } catch (final Exception e) {
-            this.shutdownConsumer();
-            getLogger().error("Failed to receive FlowFile from Kafka due to 
{}", new Object[]{e});
-            if (flowFile != null) {
-                session.remove(flowFile);
-            }
-        } finally {
-            // Add the iterator back to the queue
-            if (iterator != null) {
-                streamIterators.offer(iterator);
-            }
-        }
-    }
-
-    /**
-     * Will release flow file. Releasing of the flow file in the context of 
this
-     * operation implies the following:
-     *
-     * If Empty then remove from session and return
-     * If has something then transfer to REL_SUCCESS
-     */
-    private void releaseFlowFile(FlowFile flowFile, ProcessSession session, 
Map<String, String> attributes, long start, String topic, int msgCount){
-        if (flowFile.getSize() == 0L) {
-            session.remove(flowFile);
-        } else {
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-            session.getProvenanceReporter().receive(flowFile, "kafka://" + 
topic, "Received " + msgCount + " Kafka messages", millis);
-            getLogger().info("Successfully received {} from Kafka with {} 
messages in {} millis", new Object[]{flowFile, msgCount, millis});
-            session.transfer(flowFile, REL_SUCCESS);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
deleted file mode 100644
index 5bc0e0e..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka;
-
-import java.io.Closeable;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.stream.io.util.StreamDemarcator;
-
-import kafka.producer.Partitioner;
-
-/**
- * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
- * with sending contents of the {@link FlowFile}s to Kafka.
- */
-class KafkaPublisher implements Closeable {
-
-    private final Producer<byte[], byte[]> kafkaProducer;
-
-    private long ackWaitTime = 30000;
-
-    private final ComponentLog componentLog;
-
-    private final Partitioner partitioner;
-
-    private final int ackCheckSize;
-
-    KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
-        this(kafkaProperties, 100, componentLog);
-    }
-
-    /**
-     * Creates an instance of this class as well as the instance of the
-     * corresponding Kafka {@link KafkaProducer} using provided Kafka
-     * configuration properties.
-     *
-     * @param kafkaProperties
-     *            instance of {@link Properties} used to bootstrap
-     *            {@link KafkaProducer}
-     */
-    KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog 
componentLog) {
-        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-        this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
-        this.ackCheckSize = ackCheckSize;
-        try {
-            if (kafkaProperties.containsKey("partitioner.class")) {
-                this.partitioner = (Partitioner) 
Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance();
-            } else {
-                this.partitioner = null;
-            }
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to create partitioner", e);
-        }
-        this.componentLog = componentLog;
-    }
-
-    /**
-     * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to
-     * determine how many messages to Kafka will be sent from a provided
-     * {@link InputStream} (see {@link PublishingContext#getContentStream()}).
-     * It supports two publishing modes:
-     * <ul>
-     * <li>Sending all messages constructed from
-     * {@link StreamDemarcator#nextToken()} operation.</li>
-     * <li>Sending only unacknowledged messages constructed from
-     * {@link StreamDemarcator#nextToken()} operation.</li>
-     * </ul>
-     * The unacknowledged messages are determined from the value of
-     * {@link PublishingContext#getLastAckedMessageIndex()}.
-     * <br>
-     * This method assumes content stream affinity where it is expected that 
the
-     * content stream that represents the same Kafka message(s) will remain the
-     * same across possible retries. This is required specifically for cases
-     * where delimiter is used and a single content stream may represent
-     * multiple Kafka messages. The
-     * {@link PublishingContext#getLastAckedMessageIndex()} will provide the
-     * index of the last ACKed message, so upon retry only messages with the
-     * higher index are sent.
-     *
-     * @param publishingContext
-     *            instance of {@link PublishingContext} which hold context
-     *            information about the message(s) to be sent.
-     * @return The index of the last successful offset.
-     */
-    KafkaPublisherResult publish(PublishingContext publishingContext) {
-        StreamDemarcator streamTokenizer = new 
StreamDemarcator(publishingContext.getContentStream(),
-            publishingContext.getDelimiterBytes(), 
publishingContext.getMaxRequestSize());
-
-        int prevLastAckedMessageIndex = 
publishingContext.getLastAckedMessageIndex();
-        List<Future<RecordMetadata>> resultFutures = new ArrayList<>();
-
-        byte[] messageBytes;
-        int tokenCounter = 0;
-        boolean continueSending = true;
-        KafkaPublisherResult result = null;
-        for (; continueSending && (messageBytes = streamTokenizer.nextToken()) 
!= null; tokenCounter++) {
-            if (prevLastAckedMessageIndex < tokenCounter) {
-                Integer partitionId = publishingContext.getPartitionId();
-                if (partitionId == null && publishingContext.getKeyBytes() != 
null) {
-                    partitionId = 
this.getPartition(publishingContext.getKeyBytes(), 
publishingContext.getTopic());
-                }
-                ProducerRecord<byte[], byte[]> message =
-                    new ProducerRecord<>(publishingContext.getTopic(), 
publishingContext.getPartitionId(), publishingContext.getKeyBytes(), 
messageBytes);
-                resultFutures.add(this.kafkaProducer.send(message));
-
-                if (tokenCounter % this.ackCheckSize == 0) {
-                    int lastAckedMessageIndex = 
this.processAcks(resultFutures, prevLastAckedMessageIndex);
-                    resultFutures.clear();
-                    if (lastAckedMessageIndex % this.ackCheckSize != 0) {
-                        continueSending = false;
-                        result = new KafkaPublisherResult(tokenCounter, 
lastAckedMessageIndex);
-                    }
-                    prevLastAckedMessageIndex = lastAckedMessageIndex;
-                }
-            }
-        }
-
-        if (result == null) {
-            int lastAckedMessageIndex = this.processAcks(resultFutures, 
prevLastAckedMessageIndex);
-            resultFutures.clear();
-            result = new KafkaPublisherResult(tokenCounter, 
lastAckedMessageIndex);
-        }
-        return result;
-    }
-
-    /**
-     * Sets the time this publisher will wait for the {@link Future#get()}
-     * operation (the Future returned by
-     * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing
-     * out.
-     *
-     * This value will also be used as a timeout when closing the underlying
-     * {@link KafkaProducer}. See {@link #close()}.
-     */
-    void setAckWaitTime(long ackWaitTime) {
-        this.ackWaitTime = ackWaitTime;
-    }
-
-    /**
-     * This operation will process ACKs from Kafka in the order in which
-     * {@link KafkaProducer#send(ProducerRecord)} invocation were made 
returning
-     * the index of the last ACKed message. Within this operation processing 
ACK
-     * simply means successful invocation of 'get()' operation on the
-     * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)}
-     * operation. Upon encountering any type of error while interrogating such
-     * {@link Future} the ACK loop will end. Messages that were not ACKed would
-     * be considered non-delivered and therefore could be resent at the later
-     * time.
-     *
-     * @param sendFutures
-     *            list of {@link Future}s representing results of publishing to
-     *            Kafka
-     *
-     * @param lastAckMessageIndex
-     *            the index of the last ACKed message. It is important to
-     *            provide the last ACKed message especially while re-trying so
-     *            the proper index is maintained.
-     */
-    private int processAcks(List<Future<RecordMetadata>> sendFutures, int 
lastAckMessageIndex) {
-        boolean exceptionThrown = false;
-        for (int segmentCounter = 0; segmentCounter < sendFutures.size() && 
!exceptionThrown; segmentCounter++) {
-            Future<RecordMetadata> future = sendFutures.get(segmentCounter);
-            try {
-                future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
-                lastAckMessageIndex++;
-            } catch (InterruptedException e) {
-                exceptionThrown = true;
-                Thread.currentThread().interrupt();
-                this.warnOrError("Interrupted while waiting for acks from 
Kafka", null);
-            } catch (ExecutionException e) {
-                exceptionThrown = true;
-                this.warnOrError("Failed while waiting for acks from Kafka", 
e);
-            } catch (TimeoutException e) {
-                exceptionThrown = true;
-                this.warnOrError("Timed out while waiting for acks from 
Kafka", null);
-            }
-        }
-
-        return lastAckMessageIndex;
-    }
-
-    /**
-     * Will close the underlying {@link KafkaProducer}
-     */
-    @Override
-    public void close() {
-        this.kafkaProducer.close();
-    }
-
-    /**
-     *
-     */
-    private void warnOrError(String message, Exception e) {
-        if (e == null) {
-            this.componentLog.warn(message);
-        } else {
-            this.componentLog.error(message);
-        }
-    }
-
-    static class KafkaPublisherResult {
-        private final int messagesSent;
-        private final int lastMessageAcked;
-        KafkaPublisherResult(int messagesSent, int lastMessageAcked) {
-            this.messagesSent = messagesSent;
-            this.lastMessageAcked = lastMessageAcked;
-        }
-
-        public int getMessagesSent() {
-            return this.messagesSent;
-        }
-
-        public int getLastMessageAcked() {
-            return this.lastMessageAcked;
-        }
-
-        public boolean isAllAcked() {
-            return this.lastMessageAcked > -1 && this.messagesSent - 1 == 
this.lastMessageAcked;
-        }
-
-        @Override
-        public String toString() {
-            return "Sent:" + this.messagesSent + "; Last ACK:" + 
this.lastMessageAcked;
-        }
-    }
-
-    /**
-     *
-     */
-    private int getPartition(Object key, String topicName) {
-        if (this.partitioner != null) {
-            int partSize = this.kafkaProducer.partitionsFor(topicName).size();
-            return this.partitioner.partition(key, partSize);
-        }
-        return 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
deleted file mode 100644
index 8ddea61..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.nifi.processors.kafka;
-
-import java.util.Collections;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import kafka.admin.AdminUtils;
-import kafka.api.TopicMetadata;
-import kafka.utils.ZKStringSerializer;
-import scala.collection.JavaConversions;
-
-/**
- * Utility class to support interruction with Kafka internals.
- *
- */
-class KafkaUtils {
-
-
-    /**
-     * Will retrieve the amount of partitions for a given Kafka topic.
-     */
-    static int retrievePartitionCountForTopic(String 
zookeeperConnectionString, String topicName) {
-        ZkClient zkClient = null;
-
-        try {
-            zkClient = new ZkClient(zookeeperConnectionString);
-            zkClient.setZkSerializer(new ZkSerializer() {
-                @Override
-                public byte[] serialize(Object o) throws ZkMarshallingError {
-                    return ZKStringSerializer.serialize(o);
-                }
-
-                @Override
-                public Object deserialize(byte[] bytes) throws 
ZkMarshallingError {
-                    return ZKStringSerializer.deserialize(bytes);
-                }
-            });
-            scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
-                    
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)),
 zkClient);
-            if (topicMetadatas != null && topicMetadatas.size() > 0) {
-                return 
JavaConversions.asJavaSet(topicMetadatas).iterator().next().partitionsMetadata().size();
-            } else {
-                throw new IllegalStateException("Failed to get metadata for 
topic " + topicName);
-            }
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to retrieve partitions for 
topic " + topicName, e);
-        } finally {
-            try {
-                zkClient.close();
-            } catch (Exception e2) {
-                // ignore
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
deleted file mode 100644
index 32d3606..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka;
-
-import java.util.Random;
-
-import kafka.producer.Partitioner;
-
-/**
- * Collection of implementation of common Kafka {@link Partitioner}s.
- */
-final public class Partitioners {
-
-    private Partitioners() {
-    }
-    /**
-     * {@link Partitioner} that implements 'round-robin' mechanism which evenly
-     * distributes load between all available partitions.
-     */
-    public static class RoundRobinPartitioner implements Partitioner {
-        private volatile int index;
-
-        @Override
-        public int partition(Object key, int numberOfPartitions) {
-            int partitionIndex = this.next(numberOfPartitions);
-            return partitionIndex;
-        }
-
-        private synchronized int next(int numberOfPartitions) {
-            if (this.index >= numberOfPartitions) {
-                this.index = 0;
-            }
-            return index++;
-        }
-    }
-
-    /**
-     * {@link Partitioner} that implements 'random' mechanism which randomly
-     * distributes the load between all available partitions.
-     */
-    public static class RandomPartitioner implements Partitioner {
-        private final Random random;
-
-        public RandomPartitioner() {
-            this.random = new Random();
-        }
-
-        @Override
-        public int partition(Object key, int numberOfPartitions) {
-            return this.random.nextInt(numberOfPartitions);
-        }
-    }
-
-    /**
-     * {@link Partitioner} that implements 'key hash' mechanism which
-     * distributes the load between all available partitions based on hashing
-     * the value of the key.
-     */
-    public static class HashPartitioner implements Partitioner {
-
-        @Override
-        public int partition(Object key, int numberOfPartitions) {
-            if (key != null) {
-                return (key.hashCode() & Integer.MAX_VALUE) % 
numberOfPartitions;
-            }
-            return 0;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java
deleted file mode 100644
index 914ac1a..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-
-/**
- * Holder of context information used by {@link KafkaPublisher} required to
- * publish messages to Kafka.
- */
-class PublishingContext {
-
-    private final InputStream contentStream;
-
-    private final String topic;
-
-    private final int lastAckedMessageIndex;
-
-    private volatile Integer partitionId;
-
-    /*
-     * We're using the default value from Kafka. We are using it to control the
-     * message size before it goes to to Kafka thus limiting possibility of a
-     * late failures in Kafka client.
-     */
-    private volatile int maxRequestSize = 1048576; // kafka default
-
-    private volatile boolean maxRequestSizeSet;
-
-    private volatile byte[] keyBytes;
-
-    private volatile byte[] delimiterBytes;
-
-
-
-    PublishingContext(InputStream contentStream, String topic) {
-        this(contentStream, topic, -1);
-    }
-
-    PublishingContext(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
-        this.validateInput(contentStream, topic, lastAckedMessageIndex);
-        this.contentStream = contentStream;
-        this.topic = topic;
-        this.lastAckedMessageIndex = lastAckedMessageIndex;
-    }
-
-    @Override
-    public String toString() {
-        return "topic: '" + this.topic + "'; delimiter: '" + new 
String(this.delimiterBytes, StandardCharsets.UTF_8) + "'";
-    }
-
-    int getLastAckedMessageIndex() {
-        return this.lastAckedMessageIndex;
-    }
-
-    int getMaxRequestSize() {
-        return this.maxRequestSize;
-    }
-
-    byte[] getKeyBytes() {
-        return this.keyBytes;
-    }
-
-    Integer getPartitionId() {
-        return partitionId;
-    }
-
-    public void setPartitionId(Integer partitionId) {
-        this.partitionId = partitionId;
-    }
-
-    byte[] getDelimiterBytes() {
-        return this.delimiterBytes;
-    }
-
-    InputStream getContentStream() {
-        return this.contentStream;
-    }
-
-    String getTopic() {
-        return this.topic;
-    }
-
-    void setKeyBytes(byte[] keyBytes) {
-        if (this.keyBytes == null) {
-            if (keyBytes != null) {
-                this.assertBytesValid(keyBytes);
-                this.keyBytes = keyBytes;
-            }
-        } else {
-            throw new IllegalArgumentException("'keyBytes' can only be set 
once per instance");
-        }
-    }
-
-    void setDelimiterBytes(byte[] delimiterBytes) {
-        if (this.delimiterBytes == null) {
-            if (delimiterBytes != null) {
-                this.assertBytesValid(delimiterBytes);
-                this.delimiterBytes = delimiterBytes;
-            }
-        } else {
-            throw new IllegalArgumentException("'delimiterBytes' can only be 
set once per instance");
-        }
-    }
-
-    void setMaxRequestSize(int maxRequestSize) {
-        if (!this.maxRequestSizeSet) {
-            if (maxRequestSize > 0) {
-                this.maxRequestSize = maxRequestSize;
-                this.maxRequestSizeSet = true;
-            } else {
-                throw new IllegalArgumentException("'maxRequestSize' must be > 
0");
-            }
-        } else {
-            throw new IllegalArgumentException("'maxRequestSize' can only be 
set once per instance");
-        }
-    }
-
-    private void assertBytesValid(byte[] bytes) {
-        if (bytes != null) {
-            if (bytes.length == 0) {
-                throw new IllegalArgumentException("'bytes' must not be 
empty");
-            }
-        }
-    }
-
-    private void validateInput(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
-        if (contentStream == null) {
-            throw new IllegalArgumentException("'contentStream' must not be 
null");
-        } else if (topic == null || topic.trim().length() == 0) {
-            throw new IllegalArgumentException("'topic' must not be null or 
empty");
-        } else if (lastAckedMessageIndex < -1) {
-            throw new IllegalArgumentException("'lastAckedMessageIndex' must 
be >= -1");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
deleted file mode 100644
index 4dc8d18..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
-
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult;
-
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
-@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka. The messages to send may be individual FlowFiles or may be 
delimited, using a "
-        + "user-specified delimiter, such as a new-line.")
-@DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
-                 description = "These properties will be added on the Kafka 
configuration after loading any provided configuration properties."
-        + " In the event a dynamic property represents a property that was 
already set as part of the static properties, its value wil be"
-        + " overriden with warning message describing the override."
-        + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration.";)
-public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
-
-    private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
-
-    private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + 
"(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
-
-    public static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("all", "Guarantee Replicated Delivery",
-            "FlowFile will be routed to"
-                    + " failure unless the message is replicated to the 
appropriate number of Kafka Nodes according to the Topic configuration");
-    public static final AllowableValue DELIVERY_ONE_NODE = new 
AllowableValue("1", "Guarantee Single Node Delivery",
-            "FlowFile will be routed"
-                    + " to success if the message is received by a single 
Kafka node, whether or not it is replicated. This is faster than"
-                    + " <Guarantee Replicated Delivery> but can result in data 
loss if a Kafka node crashes");
-    public static final AllowableValue DELIVERY_BEST_EFFORT = new 
AllowableValue("0", "Best Effort",
-            "FlowFile will be routed to success after"
-                    + " successfully writing the content to a Kafka node, 
without waiting for a response. This provides the best performance but may 
result"
-                    + " in data loss.");
-
-    /**
-     * AllowableValue for sending messages to Kafka without compression
-     */
-    public static final AllowableValue COMPRESSION_CODEC_NONE = new 
AllowableValue("none", "None",
-            "Compression will not be used for any topic.");
-
-    /**
-     * AllowableValue for sending messages to Kafka with GZIP compression
-     */
-    public static final AllowableValue COMPRESSION_CODEC_GZIP = new 
AllowableValue("gzip", "GZIP",
-            "Compress messages using GZIP");
-
-    /**
-     * AllowableValue for sending messages to Kafka with Snappy compression
-     */
-    public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new 
AllowableValue("snappy", "Snappy",
-            "Compress messages using Snappy");
-
-    static final AllowableValue ROUND_ROBIN_PARTITIONING = new 
AllowableValue("Round Robin", "Round Robin",
-            "Messages will be assigned partitions in a round-robin fashion, 
sending the first message to Partition 1, "
-                    + "the next Partition to Partition 2, and so on, wrapping 
as necessary.");
-    static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("Random Robin", "Random",
-            "Messages will be assigned to random partitions.");
-    static final AllowableValue USER_DEFINED_PARTITIONING = new 
AllowableValue("User-Defined", "User-Defined",
-            "The <Partition> property will be used to determine the partition. 
All messages within the same FlowFile will be "
-                    + "assigned to the same partition.");
-
-    public static final PropertyDescriptor SEED_BROKERS = new 
PropertyDescriptor.Builder()
-            .name("Known Brokers")
-            .description("A comma-separated list of known Kafka Brokers in the 
format <host>:<port>")
-            .required(true)
-            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
-            .expressionLanguageSupported(false)
-            .build();
-    public static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
-            .name("Topic Name")
-            .description("The Kafka Topic of interest")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
-    static final PropertyDescriptor PARTITION_STRATEGY = new 
PropertyDescriptor.Builder()
-            .name("Partition Strategy")
-            .description("Specifies how messages should be partitioned when 
sent to Kafka")
-            .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, 
USER_DEFINED_PARTITIONING)
-            .defaultValue(ROUND_ROBIN_PARTITIONING.getValue())
-            .required(true)
-            .build();
-    public static final PropertyDescriptor PARTITION = new 
PropertyDescriptor.Builder()
-            .name("Partition")
-            .description("Specifies which Kafka Partition to add the message 
to. If using a message delimiter, all messages "
-                            + "in the same FlowFile will be sent to the same 
partition. If a partition is specified but is not valid, "
-                            + "then all messages within the same FlowFile will 
use the same partition but it remains undefined which partition is used.")
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .required(false)
-            .build();
-    public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
-            .name("Kafka Key")
-            .description("The Key to use for the Message")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
-    public static final PropertyDescriptor DELIVERY_GUARANTEE = new 
PropertyDescriptor.Builder()
-            .name("Delivery Guarantee")
-            .description("Specifies the requirement for guaranteeing that a 
message is sent to Kafka").required(true)
-            .expressionLanguageSupported(false)
-            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, 
DELIVERY_REPLICATED)
-            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
-            .build();
-    public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
-            .name("Message Delimiter")
-            .description("Specifies the delimiter (interpreted in its UTF-8 
byte representation) to use for splitting apart multiple messages within a 
single FlowFile. "
-                            + "If not specified, the entire content of the 
FlowFile will be used as a single message. If specified, "
-                            + "the contents of the FlowFile will be split on 
this delimiter and each section sent as a separate Kafka "
-                            + "message. Note that if messages are delimited 
and some messages for a given FlowFile are transferred "
-                            + "successfully while others are not, the messages 
will be split into individual FlowFiles, such that those "
-                            + "messages that were successfully sent are routed 
to the 'success' relationship while other messages are "
-                            + "sent to the 'failure' relationship.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
-    public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Max Buffer Size")
-            .description("The maximum amount of data to buffer in memory 
before sending to Kafka")
-            .required(true)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .defaultValue("5 MB")
-            .build();
-    static final PropertyDescriptor MAX_RECORD_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Max Record Size")
-            .description("The maximum size that any individual record can be.")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .required(true)
-            .defaultValue("1 MB")
-            .build();
-    public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
-            .name("Communications Timeout")
-            .description("The amount of time to wait for a response from Kafka 
before determining that there is a communications error")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .defaultValue("30 secs").build();
-    public static final PropertyDescriptor CLIENT_NAME = new 
PropertyDescriptor.Builder()
-            .name("Client Name")
-            .description("Client Name to use when communicating with Kafka")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .build();
-    public static final PropertyDescriptor BATCH_NUM_MESSAGES = new 
PropertyDescriptor.Builder()
-            .name("Async Batch Size")
-            .displayName("Batch Size")
-            .description("This configuration controls the default batch size 
in bytes.The producer will attempt to batch records together into "
-                    + "fewer requests whenever multiple records are being sent 
to the same partition. This helps performance on both the client "
-                    + "and the server.")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("16384") // Kafka default
-            .build();
-    public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new 
PropertyDescriptor.Builder()
-            .name("Queue Buffering Max Time")
-            .description("Maximum time to buffer data before sending to Kafka. 
For example a setting of 100 ms"
-                    + " will try to batch together 100 milliseconds' worth of 
messages to send at once. This will improve"
-                    + " throughput but adds message delivery latency due to 
the buffering.")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
-            .name("Compression Codec")
-            .description("This parameter allows you to specify the compression 
codec for all"
-                    + " data generated by this producer.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, 
COMPRESSION_CODEC_SNAPPY)
-            .defaultValue(COMPRESSION_CODEC_NONE.getValue())
-            .build();
-
-    // Relationships
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Any FlowFile that is successfully sent to Kafka will 
be routed to this Relationship")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("Any FlowFile that cannot be sent to Kafka will be 
routed to this Relationship")
-            .build();
-
-    protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id";
-
-    protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx";
-
-    protected static final String FAILED_TOPIC_ATTR = "failed.topic";
-
-    protected static final String FAILED_KEY_ATTR = "failed.key";
-
-    protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter";
-
-    private static final List<PropertyDescriptor> propertyDescriptors;
-
-    private static final Set<Relationship> relationships;
-
-    static {
-        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
-        _propertyDescriptors.add(SEED_BROKERS);
-        _propertyDescriptors.add(TOPIC);
-        _propertyDescriptors.add(PARTITION_STRATEGY);
-        _propertyDescriptors.add(PARTITION);
-        _propertyDescriptors.add(KEY);
-        _propertyDescriptors.add(DELIVERY_GUARANTEE);
-        _propertyDescriptors.add(MESSAGE_DELIMITER);
-        _propertyDescriptors.add(MAX_BUFFER_SIZE);
-        _propertyDescriptors.add(MAX_RECORD_SIZE);
-        _propertyDescriptors.add(TIMEOUT);
-        _propertyDescriptors.add(BATCH_NUM_MESSAGES);
-        _propertyDescriptors.add(QUEUE_BUFFERING_MAX);
-        _propertyDescriptors.add(COMPRESSION_CODEC);
-        _propertyDescriptors.add(CLIENT_NAME);
-        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
-
-        Set<Relationship> _relationships = new HashSet<>();
-        _relationships.add(REL_SUCCESS);
-        _relationships.add(REL_FAILURE);
-        relationships = Collections.unmodifiableSet(_relationships);
-    }
-
-
-    /**
-     * Will rendezvous with Kafka if {@link ProcessSession} contains {@link 
FlowFile}
-     * producing a result {@link FlowFile}.
-     * <br>
-     * The result {@link FlowFile} that is successful is then transfered to 
{@link #REL_SUCCESS}
-     * <br>
-     * The result {@link FlowFile} that is failed is then transfered to {@link 
#REL_FAILURE}
-     *
-     */
-    @Override
-    protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session) throws ProcessException {
-        boolean processed = false;
-        FlowFile flowFile = session.get();
-        if (flowFile != null) {
-            flowFile = this.doRendezvousWithKafka(flowFile, context, session);
-            if (!this.isFailedFlowFile(flowFile)) {
-                session.getProvenanceReporter().send(flowFile,
-                        context.getProperty(SEED_BROKERS).getValue() + "/"
-                        + 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue());
-                session.transfer(flowFile, REL_SUCCESS);
-            } else {
-                session.transfer(session.penalize(flowFile), REL_FAILURE);
-            }
-            processed = true;
-        }
-        return processed;
-    }
-
-    /**
-     * Will rendezvous with {@link KafkaPublisher} after building
-     * {@link PublishingContext} and will produce the resulting {@link 
FlowFile}.
-     * The resulting FlowFile contains all required information to determine
-     * if message publishing originated from the provided FlowFile has actually
-     * succeeded fully, partially or failed completely (see
-     * {@link #isFailedFlowFile(FlowFile)}.
-     */
-    private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final 
ProcessContext context, final ProcessSession session) {
-        final AtomicReference<KafkaPublisherResult> publishResultRef = new 
AtomicReference<>();
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(InputStream contentStream) throws IOException {
-                PublishingContext publishingContext = 
PutKafka.this.buildPublishingContext(flowFile, context, contentStream);
-                KafkaPublisherResult result = 
PutKafka.this.kafkaResource.publish(publishingContext);
-                publishResultRef.set(result);
-            }
-        });
-
-        FlowFile resultFile = publishResultRef.get().isAllAcked()
-                ? this.cleanUpFlowFileIfNecessary(flowFile, session)
-                : session.putAllAttributes(flowFile, 
this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(),
 flowFile, context));
-
-        return resultFile;
-    }
-
-    /**
-     * Builds {@link PublishingContext} for message(s) to be sent to Kafka.
-     * {@link PublishingContext} contains all contextual information required 
by
-     * {@link KafkaPublisher} to publish to Kafka. Such information contains
-     * things like topic name, content stream, delimiter, key and last ACKed
-     * message for cases where provided FlowFile is being retried (failed in 
the
-     * past). <br>
-     * For the clean FlowFile (file that has been sent for the first time),
-     * PublishingContext will be built form {@link ProcessContext} associated
-     * with this invocation. <br>
-     * For the failed FlowFile, {@link PublishingContext} will be built from
-     * attributes of that FlowFile which by then will already contain required
-     * information (e.g., topic, key, delimiter etc.). This is required to
-     * ensure the affinity of the retry in the even where processor
-     * configuration has changed. However keep in mind that failed FlowFile is
-     * only considered a failed FlowFile if it is being re-processed by the 
same
-     * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
-     * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent 
to
-     * another PublishKafka processor it is treated as a fresh FlowFile
-     * regardless if it has #FAILED* attributes set.
-     */
-    private PublishingContext buildPublishingContext(FlowFile flowFile, 
ProcessContext context,
-            InputStream contentStream) {
-        String topicName;
-        byte[] keyBytes;
-        byte[] delimiterBytes = null;
-        int lastAckedMessageIndex = -1;
-        if (this.isFailedFlowFile(flowFile)) {
-            lastAckedMessageIndex = 
Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
-            topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
-            keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null
-                    ? 
flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null;
-            delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != 
null
-                    ? 
flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : 
null;
-
-        } else {
-            topicName = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
-            String _key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-            keyBytes = _key == null ? null : 
_key.getBytes(StandardCharsets.UTF_8);
-            delimiterBytes = context.getProperty(MESSAGE_DELIMITER).isSet() ? 
context.getProperty(MESSAGE_DELIMITER)
-                    
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8)
 : null;
-        }
-
-        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
-        publishingContext.setKeyBytes(keyBytes);
-        publishingContext.setDelimiterBytes(delimiterBytes);
-        publishingContext.setPartitionId(this.determinePartition(context, 
flowFile));
-        return publishingContext;
-    }
-
-    /**
-     * Returns 'true' if provided FlowFile is a failed FlowFile. A failed
-     * FlowFile contains {@link #FAILED_PROC_ID_ATTR}.
-     */
-    private boolean isFailedFlowFile(FlowFile flowFile) {
-        return 
this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR));
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session)
-            throws ProcessException {
-        KafkaPublisher kafkaPublisher = new 
KafkaPublisher(this.buildKafkaConfigProperties(context), this.getLogger());
-        return kafkaPublisher;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return propertyDescriptors;
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
-                
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
-                .build();
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
-
-        final String partitionStrategy = 
validationContext.getProperty(PARTITION_STRATEGY).getValue();
-        if 
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())
-                && !validationContext.getProperty(PARTITION).isSet()) {
-            results.add(new 
ValidationResult.Builder().subject("Partition").valid(false)
-                    .explanation("The <Partition> property must be set when 
configured to use the User-Defined Partitioning Strategy")
-                    .build());
-        }
-        return results;
-    }
-
-    /**
-     *
-     */
-    private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, 
ProcessSession session) {
-        if (this.isFailedFlowFile(flowFile)) {
-            Set<String> keysToRemove = new HashSet<>();
-            keysToRemove.add(FAILED_DELIMITER_ATTR);
-            keysToRemove.add(FAILED_KEY_ATTR);
-            keysToRemove.add(FAILED_TOPIC_ATTR);
-            keysToRemove.add(FAILED_PROC_ID_ATTR);
-            keysToRemove.add(FAILED_LAST_ACK_IDX);
-            flowFile = session.removeAllAttributes(flowFile, keysToRemove);
-        }
-        return flowFile;
-    }
-
-    /**
-     *
-     */
-    private Integer determinePartition(ProcessContext context, FlowFile 
flowFile) {
-        String partitionStrategy = 
context.getProperty(PARTITION_STRATEGY).getValue();
-        Integer partitionValue = null;
-        if 
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) {
-            String pv = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
-            if (pv != null){
-                partitionValue = 
Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
-            }
-        }
-        return partitionValue;
-    }
-
-    /**
-     * Builds a {@link Map} of FAILED_* attributes
-     *
-     * @see #FAILED_PROC_ID_ATTR
-     * @see #FAILED_LAST_ACK_IDX
-     * @see #FAILED_TOPIC_ATTR
-     * @see #FAILED_KEY_ATTR
-     * @see #FAILED_DELIMITER_ATTR
-     */
-    private Map<String, String> buildFailedFlowFileAttributes(int 
lastAckedMessageIndex, FlowFile sourceFlowFile,
-            ProcessContext context) {
-        Map<String, String> attributes = new HashMap<>();
-        attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier());
-        attributes.put(FAILED_LAST_ACK_IDX, 
String.valueOf(lastAckedMessageIndex));
-        attributes.put(FAILED_TOPIC_ATTR, 
context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue());
-        attributes.put(FAILED_KEY_ATTR, 
context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue());
-        attributes.put(FAILED_DELIMITER_ATTR, 
context.getProperty(MESSAGE_DELIMITER).isSet()
-                ? 
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(sourceFlowFile).getValue()
-                : null);
-        return attributes;
-    }
-
-    /**
-     *
-     */
-    private Properties buildKafkaConfigProperties(final ProcessContext 
context) {
-        Properties properties = new Properties();
-        String timeout = 
String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-        properties.setProperty("bootstrap.servers", 
context.getProperty(SEED_BROKERS).getValue());
-        properties.setProperty("acks", 
context.getProperty(DELIVERY_GUARANTEE).getValue());
-        properties.setProperty("buffer.memory", 
String.valueOf(context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue()));
-        properties.setProperty("compression.type", 
context.getProperty(COMPRESSION_CODEC).getValue());
-        properties.setProperty("batch.size", 
context.getProperty(BATCH_NUM_MESSAGES).getValue());
-
-        properties.setProperty("client.id", 
context.getProperty(CLIENT_NAME).getValue());
-        Long queueBufferingMillis = 
context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
-        if (queueBufferingMillis != null) {
-            properties.setProperty("linger.ms", 
String.valueOf(queueBufferingMillis));
-        }
-        properties.setProperty("max.request.size", 
String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue()));
-        properties.setProperty("timeout.ms", timeout);
-        properties.setProperty("metadata.fetch.timeout.ms", timeout);
-
-        String partitionStrategy = 
context.getProperty(PARTITION_STRATEGY).getValue();
-        String partitionerClass = null;
-        if 
(partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
-            partitionerClass = 
Partitioners.RoundRobinPartitioner.class.getName();
-        } else if 
(partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
-            partitionerClass = Partitioners.RandomPartitioner.class.getName();
-        }
-        properties.setProperty("partitioner.class", partitionerClass);
-
-        // Set Dynamic Properties
-        for (final Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
-            PropertyDescriptor descriptor = entry.getKey();
-            if (descriptor.isDynamic()) {
-                if (properties.containsKey(descriptor.getName())) {
-                    this.getLogger().warn("Overriding existing property '" + 
descriptor.getName() + "' which had value of '"
-                                    + 
properties.getProperty(descriptor.getName()) + "' with dynamically set value '"
-                                    + entry.getValue() + "'.");
-                }
-                properties.setProperty(descriptor.getName(), entry.getValue());
-            }
-        }
-        return properties;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
deleted file mode 100644
index b478d9f..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.processors.kafka.GetKafka
-org.apache.nifi.processors.kafka.PutKafka
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
deleted file mode 100644
index 10c7082..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
+++ /dev/null
@@ -1,45 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-    <!--
-      Licensed to the Apache Software Foundation (ASF) under one or more
-      contributor license agreements.  See the NOTICE file distributed with
-      this work for additional information regarding copyright ownership.
-      The ASF licenses this file to You under the Apache License, Version 2.0
-      (the "License"); you may not use this file except in compliance with
-      the License.  You may obtain a copy of the License at
-          http://www.apache.org/licenses/LICENSE-2.0
-      Unless required by applicable law or agreed to in writing, software
-      distributed under the License is distributed on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-      See the License for the specific language governing permissions and
-      limitations under the License.
-    -->
-    <head>
-        <meta charset="utf-8" />
-        <title>GetKafka</title>
-        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
-    </head>
-
-    <body>
-        <!-- Processor Documentation 
================================================== -->
-        <h2>Description:</h2>
-        <p>
-            This Processors polls <a href="http://kafka.apache.org/";>Apache 
Kafka</a>
-            for data. When a message is received from Kafka, this Processor 
emits a FlowFile
-            where the content of the FlowFile is the value of the Kafka 
message. If the
-            message has a key associated with it, an attribute named 
<code>kafka.key</code>
-            will be added to the FlowFile, with the value being the UTF-8 
Encoded value
-            of the Message's Key.
-        </p>
-        <p>
-            Kafka supports the notion of a Consumer Group when pulling 
messages in order to
-            provide scalability while still offering a publish-subscribe 
interface. Each
-            Consumer Group must have a unique identifier. The Consumer Group 
identifier that
-            is used by NiFi is the UUID of the Processor. This means that all 
of the nodes
-            within a cluster will use the same Consumer Group Identifier so 
that they do
-            not receive duplicate data but multiple GetKafka Processors can be 
used to pull
-            from multiple Topics, as each Processor will receive a different 
Processor UUID 
-            and therefore a different Consumer Group Identifier.
-        </p>
-    </body>
-</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
deleted file mode 100644
index d51ce95..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
+++ /dev/null
@@ -1,45 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-    <!--
-      Licensed to the Apache Software Foundation (ASF) under one or more
-      contributor license agreements.  See the NOTICE file distributed with
-      this work for additional information regarding copyright ownership.
-      The ASF licenses this file to You under the Apache License, Version 2.0
-      (the "License"); you may not use this file except in compliance with
-      the License.  You may obtain a copy of the License at
-          http://www.apache.org/licenses/LICENSE-2.0
-      Unless required by applicable law or agreed to in writing, software
-      distributed under the License is distributed on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-      See the License for the specific language governing permissions and
-      limitations under the License.
-    -->
-    <head>
-        <meta charset="utf-8" />
-        <title>PutKafka</title>
-        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
-    </head>
-
-    <body>
-        <!-- Processor Documentation 
================================================== -->
-        <h2>Description:</h2>
-        <p>
-            This Processors puts the contents of a FlowFile to a Topic in 
-            <a href="http://kafka.apache.org/";>Apache Kafka</a>. The full 
contents of
-            a FlowFile becomes the contents of a single message in Kafka.
-            This message is optionally assigned a key by using the
-            &lt;Kafka Key&gt; Property.
-        </p>
-
-        <p>
-            The Processor allows the user to configure an optional Message 
Delimiter that
-            can be used to send many messages per FlowFile. For example, a \n 
could be used
-            to indicate that the contents of the FlowFile should be used to 
send one message
-            per line of text. If the property is not set, the entire contents 
of the FlowFile
-            will be sent as a single message. When using the delimiter, if 
some messages are
-            successfully sent but other messages fail to send, the FlowFile 
will be FORKed into
-            two child FlowFiles, with the successfully sent messages being 
routed to 'success'
-            and the messages that could not be sent going to 'failure'.
-        </p>
-    </body>
-</html>

Reply via email to