http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java new file mode 100644 index 0000000..7660305 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.ConsumerTimeoutException; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Fetches messages from Apache Kafka") +@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"}) +@WritesAttributes({ + @WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"), + @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If" + + " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"), + @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"), + @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1")}) +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", + description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be" + + " overriden with warning message describing the override." + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.") +public class GetKafka extends AbstractProcessor { + + public static final String SMALLEST = "smallest"; + public static final String LARGEST = "largest"; + + public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder() + .name("ZooKeeper Connection String") + .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port>" + + " combinations. For example, host1:2181,host2:2181,host3:2188") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() + .name("Topic Name") + .description("The Kafka Topic to pull messages from") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder() + .name("Zookeeper Commit Frequency") + .description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will" + + " result in better overall performance but can result in more data duplication if a NiFi node is lost") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("60 secs") + .build(); + public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder() + .name("ZooKeeper Communications Timeout") + .description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); + public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder() + .name("Kafka Communications Timeout") + .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be " + + "concatenated together with the <Message Demarcator> string placed between the content of each message. " + + "If the messages from Kafka should not be concatenated together, leave this value at 1.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1") + .build(); + public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("Message Demarcator") + .description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the <Batch Size> " + + "property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, " + + "this value will be placed in between them.") + .required(true) + .addValidator(Validator.VALID) // accept anything as a demarcator, including empty string + .expressionLanguageSupported(false) + .defaultValue("\\n") + .build(); + + public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() + .name("Client Name") + .description("Client Name to use when communicating with Kafka") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() + .name("Group ID") + .description("A Group ID is used to identify consumers that are within the same consumer group") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() + .name("Auto Offset Reset") + .description("Automatically reset the offset to the smallest or largest offset available on the broker") + .required(true) + .allowableValues(SMALLEST, LARGEST) + .defaultValue(LARGEST) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are created are routed to this relationship") + .build(); + + private final BlockingQueue<ConsumerIterator<byte[], byte[]>> streamIterators = new LinkedBlockingQueue<>(); + private volatile ConsumerConnector consumer; + + private final AtomicBoolean consumerStreamsReady = new AtomicBoolean(); + + private volatile long deadlockTimeout; + + private volatile ExecutorService executor; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(CLIENT_NAME) + .defaultValue("NiFi-" + getIdentifier()) + .build(); + final PropertyDescriptor groupIdWithDefault = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(GROUP_ID) + .defaultValue(getIdentifier()) + .build(); + + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(ZOOKEEPER_CONNECTION_STRING); + props.add(TOPIC); + props.add(ZOOKEEPER_COMMIT_DELAY); + props.add(BATCH_SIZE); + props.add(MESSAGE_DEMARCATOR); + props.add(clientNameWithDefault); + props.add(groupIdWithDefault); + props.add(KAFKA_TIMEOUT); + props.add(ZOOKEEPER_TIMEOUT); + props.add(AUTO_OFFSET_RESET); + return props; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(1); + relationships.add(REL_SUCCESS); + return relationships; + } + + public void createConsumers(final ProcessContext context) { + final String topic = context.getProperty(TOPIC).getValue(); + + final Properties props = new Properties(); + props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); + props.setProperty("group.id", context.getProperty(GROUP_ID).getValue()); + props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); + props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS))); + props.setProperty("auto.offset.reset", context.getProperty(AUTO_OFFSET_RESET).getValue()); + props.setProperty("zookeeper.connection.timeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString()); + props.setProperty("socket.timeout.ms", context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString()); + + for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + if (props.containsKey(descriptor.getName())) { + this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '" + + props.getProperty(descriptor.getName()) + "' with dynamically set value '" + entry.getValue() + "'."); + } + props.setProperty(descriptor.getName(), entry.getValue()); + } + } + + /* + * Unless user sets it to some explicit value we are setting it to the + * lowest possible value of 1 millisecond to ensure the + * consumerStream.hasNext() doesn't block. See + * http://kafka.apache.org/documentation.html#configuration) as well as + * comment in 'catch ConsumerTimeoutException' in onTrigger() for more + * explanation as to the reasoning behind it. + */ + if (!props.containsKey("consumer.timeout.ms")) { + this.getLogger().info("Setting 'consumer.timeout.ms' to 1 milliseconds to avoid consumer" + + " block in the event when no events are present in Kafka topic. If you wish to change this value " + + " set it as dynamic property. If you wish to explicitly enable consumer block (at your own risk)" + + " set its value to -1."); + props.setProperty("consumer.timeout.ms", "1"); + } + + int partitionCount = KafkaUtils.retrievePartitionCountForTopic( + context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue()); + + final ConsumerConfig consumerConfig = new ConsumerConfig(props); + consumer = Consumer.createJavaConsumerConnector(consumerConfig); + + final Map<String, Integer> topicCountMap = new HashMap<>(1); + + int concurrentTaskToUse = context.getMaxConcurrentTasks(); + if (context.getMaxConcurrentTasks() < partitionCount){ + this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " + + "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. " + + "Consider making it equal to the amount of partition count for most efficient event consumption."); + } else if (context.getMaxConcurrentTasks() > partitionCount){ + concurrentTaskToUse = partitionCount; + this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " + + "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. " + + "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to consume events"); + } + + topicCountMap.put(topic, concurrentTaskToUse); + + final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); + final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); + + this.streamIterators.clear(); + + for (final KafkaStream<byte[], byte[]> stream : streams) { + streamIterators.add(stream.iterator()); + } + this.consumerStreamsReady.set(true); + } + + @OnStopped + public void shutdownConsumer() { + this.consumerStreamsReady.set(false); + if (consumer != null) { + try { + consumer.commitOffsets(); + } finally { + consumer.shutdown(); + } + } + if (this.executor != null) { + this.executor.shutdown(); + try { + if (!this.executor.awaitTermination(30000, TimeUnit.MILLISECONDS)) { + this.executor.shutdownNow(); + getLogger().warn("Executor did not stop in 30 sec. Terminated."); + } + this.executor = null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) + .build(); + } + + @OnScheduled + public void schedule(ProcessContext context) { + this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + /* + * Will ensure that consumer streams are ready upon the first invocation + * of onTrigger. Will be reset to 'false' in the event of exception + */ + synchronized (this.consumerStreamsReady) { + if (this.executor == null || this.executor.isShutdown()) { + this.executor = Executors.newCachedThreadPool(); + } + if (!this.consumerStreamsReady.get()) { + Future<Void> f = this.executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + createConsumers(context); + return null; + } + }); + try { + f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + shutdownConsumer(); + f.cancel(true); + Thread.currentThread().interrupt(); + getLogger().warn("Interrupted while waiting to get connection", e); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } catch (TimeoutException e) { + shutdownConsumer(); + f.cancel(true); + getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while waiting to get connection", e); + } + } + } + //=== + if (this.consumerStreamsReady.get()) { + Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + ConsumerIterator<byte[], byte[]> iterator = getStreamIterator(); + if (iterator != null) { + consumeFromKafka(context, session, iterator); + } + return null; + } + }); + try { + consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + shutdownConsumer(); + consumptionFuture.cancel(true); + Thread.currentThread().interrupt(); + getLogger().warn("Interrupted while consuming messages", e); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } catch (TimeoutException e) { + shutdownConsumer(); + consumptionFuture.cancel(true); + getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while consuming messages", e); + } + } + } + + protected ConsumerIterator<byte[], byte[]> getStreamIterator() { + return this.streamIterators.poll(); + } + + + private void consumeFromKafka(final ProcessContext context, final ProcessSession session, + ConsumerIterator<byte[], byte[]> iterator) throws ProcessException { + + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); + final String topic = context.getProperty(TOPIC).getValue(); + + FlowFile flowFile = session.create(); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("kafka.topic", topic); + final long start = System.nanoTime(); + int msgCount = 0; + + try { + for (; msgCount < batchSize && iterator.hasNext(); msgCount++) { + final MessageAndMetadata<byte[], byte[]> mam = iterator.next(); + + if (batchSize == 1) { + final byte[] key = mam.key(); + // the kafka.key, kafka.offset, and kafka.partition attributes are added only + // for a batch size of 1. + if (key != null) { + attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8)); + } + + attributes.put("kafka.offset", String.valueOf(mam.offset())); + attributes.put("kafka.partition", String.valueOf(mam.partition())); + } + + // add the message to the FlowFile's contents + final boolean firstMessage = msgCount == 0; + flowFile = session.append(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + if (!firstMessage) { + out.write(demarcatorBytes); + } + out.write(mam.message()); + } + }); + } + this.releaseFlowFile(flowFile, session, attributes, start, topic, msgCount); + } catch (ConsumerTimeoutException e) { + /* + * By default Kafka blocks indefinitely if topic is empty via + * stream.hasNext(). If 'consumer.timeout.ms' property is set (see + * http://kafka.apache.org/documentation.html#configuration) the + * hasNext() will fail with this exception. To this processor it + * simply means there are no messages and current task should exit + * in non-failure releasing the flow file if it was able to + * accumulate any events. + */ + this.releaseFlowFile(flowFile, session, attributes, start, topic, msgCount); + } catch (final Exception e) { + this.shutdownConsumer(); + getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[]{e}); + if (flowFile != null) { + session.remove(flowFile); + } + } finally { + // Add the iterator back to the queue + if (iterator != null) { + streamIterators.offer(iterator); + } + } + } + + /** + * Will release flow file. Releasing of the flow file in the context of this + * operation implies the following: + * + * If Empty then remove from session and return + * If has something then transfer to REL_SUCCESS + */ + private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map<String, String> attributes, long start, String topic, int msgCount){ + if (flowFile.getSize() == 0L) { + session.remove(flowFile); + } else { + flowFile = session.putAllAttributes(flowFile, attributes); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, "Received " + msgCount + " Kafka messages", millis); + getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[]{flowFile, msgCount, millis}); + session.transfer(flowFile, REL_SUCCESS); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java new file mode 100644 index 0000000..5bc0e0e --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.io.Closeable; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.stream.io.util.StreamDemarcator; + +import kafka.producer.Partitioner; + +/** + * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor + * with sending contents of the {@link FlowFile}s to Kafka. + */ +class KafkaPublisher implements Closeable { + + private final Producer<byte[], byte[]> kafkaProducer; + + private long ackWaitTime = 30000; + + private final ComponentLog componentLog; + + private final Partitioner partitioner; + + private final int ackCheckSize; + + KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) { + this(kafkaProperties, 100, componentLog); + } + + /** + * Creates an instance of this class as well as the instance of the + * corresponding Kafka {@link KafkaProducer} using provided Kafka + * configuration properties. + * + * @param kafkaProperties + * instance of {@link Properties} used to bootstrap + * {@link KafkaProducer} + */ + KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) { + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + this.kafkaProducer = new KafkaProducer<>(kafkaProperties); + this.ackCheckSize = ackCheckSize; + try { + if (kafkaProperties.containsKey("partitioner.class")) { + this.partitioner = (Partitioner) Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance(); + } else { + this.partitioner = null; + } + } catch (Exception e) { + throw new IllegalStateException("Failed to create partitioner", e); + } + this.componentLog = componentLog; + } + + /** + * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to + * determine how many messages to Kafka will be sent from a provided + * {@link InputStream} (see {@link PublishingContext#getContentStream()}). + * It supports two publishing modes: + * <ul> + * <li>Sending all messages constructed from + * {@link StreamDemarcator#nextToken()} operation.</li> + * <li>Sending only unacknowledged messages constructed from + * {@link StreamDemarcator#nextToken()} operation.</li> + * </ul> + * The unacknowledged messages are determined from the value of + * {@link PublishingContext#getLastAckedMessageIndex()}. + * <br> + * This method assumes content stream affinity where it is expected that the + * content stream that represents the same Kafka message(s) will remain the + * same across possible retries. This is required specifically for cases + * where delimiter is used and a single content stream may represent + * multiple Kafka messages. The + * {@link PublishingContext#getLastAckedMessageIndex()} will provide the + * index of the last ACKed message, so upon retry only messages with the + * higher index are sent. + * + * @param publishingContext + * instance of {@link PublishingContext} which hold context + * information about the message(s) to be sent. + * @return The index of the last successful offset. + */ + KafkaPublisherResult publish(PublishingContext publishingContext) { + StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(), + publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize()); + + int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex(); + List<Future<RecordMetadata>> resultFutures = new ArrayList<>(); + + byte[] messageBytes; + int tokenCounter = 0; + boolean continueSending = true; + KafkaPublisherResult result = null; + for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) { + if (prevLastAckedMessageIndex < tokenCounter) { + Integer partitionId = publishingContext.getPartitionId(); + if (partitionId == null && publishingContext.getKeyBytes() != null) { + partitionId = this.getPartition(publishingContext.getKeyBytes(), publishingContext.getTopic()); + } + ProducerRecord<byte[], byte[]> message = + new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getPartitionId(), publishingContext.getKeyBytes(), messageBytes); + resultFutures.add(this.kafkaProducer.send(message)); + + if (tokenCounter % this.ackCheckSize == 0) { + int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); + resultFutures.clear(); + if (lastAckedMessageIndex % this.ackCheckSize != 0) { + continueSending = false; + result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); + } + prevLastAckedMessageIndex = lastAckedMessageIndex; + } + } + } + + if (result == null) { + int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); + resultFutures.clear(); + result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); + } + return result; + } + + /** + * Sets the time this publisher will wait for the {@link Future#get()} + * operation (the Future returned by + * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing + * out. + * + * This value will also be used as a timeout when closing the underlying + * {@link KafkaProducer}. See {@link #close()}. + */ + void setAckWaitTime(long ackWaitTime) { + this.ackWaitTime = ackWaitTime; + } + + /** + * This operation will process ACKs from Kafka in the order in which + * {@link KafkaProducer#send(ProducerRecord)} invocation were made returning + * the index of the last ACKed message. Within this operation processing ACK + * simply means successful invocation of 'get()' operation on the + * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)} + * operation. Upon encountering any type of error while interrogating such + * {@link Future} the ACK loop will end. Messages that were not ACKed would + * be considered non-delivered and therefore could be resent at the later + * time. + * + * @param sendFutures + * list of {@link Future}s representing results of publishing to + * Kafka + * + * @param lastAckMessageIndex + * the index of the last ACKed message. It is important to + * provide the last ACKed message especially while re-trying so + * the proper index is maintained. + */ + private int processAcks(List<Future<RecordMetadata>> sendFutures, int lastAckMessageIndex) { + boolean exceptionThrown = false; + for (int segmentCounter = 0; segmentCounter < sendFutures.size() && !exceptionThrown; segmentCounter++) { + Future<RecordMetadata> future = sendFutures.get(segmentCounter); + try { + future.get(this.ackWaitTime, TimeUnit.MILLISECONDS); + lastAckMessageIndex++; + } catch (InterruptedException e) { + exceptionThrown = true; + Thread.currentThread().interrupt(); + this.warnOrError("Interrupted while waiting for acks from Kafka", null); + } catch (ExecutionException e) { + exceptionThrown = true; + this.warnOrError("Failed while waiting for acks from Kafka", e); + } catch (TimeoutException e) { + exceptionThrown = true; + this.warnOrError("Timed out while waiting for acks from Kafka", null); + } + } + + return lastAckMessageIndex; + } + + /** + * Will close the underlying {@link KafkaProducer} + */ + @Override + public void close() { + this.kafkaProducer.close(); + } + + /** + * + */ + private void warnOrError(String message, Exception e) { + if (e == null) { + this.componentLog.warn(message); + } else { + this.componentLog.error(message); + } + } + + static class KafkaPublisherResult { + private final int messagesSent; + private final int lastMessageAcked; + KafkaPublisherResult(int messagesSent, int lastMessageAcked) { + this.messagesSent = messagesSent; + this.lastMessageAcked = lastMessageAcked; + } + + public int getMessagesSent() { + return this.messagesSent; + } + + public int getLastMessageAcked() { + return this.lastMessageAcked; + } + + public boolean isAllAcked() { + return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked; + } + + @Override + public String toString() { + return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked; + } + } + + /** + * + */ + private int getPartition(Object key, String topicName) { + if (this.partitioner != null) { + int partSize = this.kafkaProducer.partitionsFor(topicName).size(); + return this.partitioner.partition(key, partSize); + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java new file mode 100644 index 0000000..8ddea61 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java @@ -0,0 +1,74 @@ +package org.apache.nifi.processors.kafka; + +import java.util.Collections; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.I0Itec.zkclient.serialize.ZkSerializer; + +import kafka.admin.AdminUtils; +import kafka.api.TopicMetadata; +import kafka.utils.ZKStringSerializer; +import scala.collection.JavaConversions; + +/** + * Utility class to support interruction with Kafka internals. + * + */ +class KafkaUtils { + + + /** + * Will retrieve the amount of partitions for a given Kafka topic. + */ + static int retrievePartitionCountForTopic(String zookeeperConnectionString, String topicName) { + ZkClient zkClient = null; + + try { + zkClient = new ZkClient(zookeeperConnectionString); + zkClient.setZkSerializer(new ZkSerializer() { + @Override + public byte[] serialize(Object o) throws ZkMarshallingError { + return ZKStringSerializer.serialize(o); + } + + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError { + return ZKStringSerializer.deserialize(bytes); + } + }); + scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils + .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); + if (topicMetadatas != null && topicMetadatas.size() > 0) { + return JavaConversions.asJavaSet(topicMetadatas).iterator().next().partitionsMetadata().size(); + } else { + throw new IllegalStateException("Failed to get metadata for topic " + topicName); + } + } catch (Exception e) { + throw new IllegalStateException("Failed to retrieve partitions for topic " + topicName, e); + } finally { + try { + zkClient.close(); + } catch (Exception e2) { + // ignore + } + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java new file mode 100644 index 0000000..32d3606 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.util.Random; + +import kafka.producer.Partitioner; + +/** + * Collection of implementation of common Kafka {@link Partitioner}s. + */ +final public class Partitioners { + + private Partitioners() { + } + /** + * {@link Partitioner} that implements 'round-robin' mechanism which evenly + * distributes load between all available partitions. + */ + public static class RoundRobinPartitioner implements Partitioner { + private volatile int index; + + @Override + public int partition(Object key, int numberOfPartitions) { + int partitionIndex = this.next(numberOfPartitions); + return partitionIndex; + } + + private synchronized int next(int numberOfPartitions) { + if (this.index >= numberOfPartitions) { + this.index = 0; + } + return index++; + } + } + + /** + * {@link Partitioner} that implements 'random' mechanism which randomly + * distributes the load between all available partitions. + */ + public static class RandomPartitioner implements Partitioner { + private final Random random; + + public RandomPartitioner() { + this.random = new Random(); + } + + @Override + public int partition(Object key, int numberOfPartitions) { + return this.random.nextInt(numberOfPartitions); + } + } + + /** + * {@link Partitioner} that implements 'key hash' mechanism which + * distributes the load between all available partitions based on hashing + * the value of the key. + */ + public static class HashPartitioner implements Partitioner { + + @Override + public int partition(Object key, int numberOfPartitions) { + if (key != null) { + return (key.hashCode() & Integer.MAX_VALUE) % numberOfPartitions; + } + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java new file mode 100644 index 0000000..914ac1a --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +/** + * Holder of context information used by {@link KafkaPublisher} required to + * publish messages to Kafka. + */ +class PublishingContext { + + private final InputStream contentStream; + + private final String topic; + + private final int lastAckedMessageIndex; + + private volatile Integer partitionId; + + /* + * We're using the default value from Kafka. We are using it to control the + * message size before it goes to to Kafka thus limiting possibility of a + * late failures in Kafka client. + */ + private volatile int maxRequestSize = 1048576; // kafka default + + private volatile boolean maxRequestSizeSet; + + private volatile byte[] keyBytes; + + private volatile byte[] delimiterBytes; + + + + PublishingContext(InputStream contentStream, String topic) { + this(contentStream, topic, -1); + } + + PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) { + this.validateInput(contentStream, topic, lastAckedMessageIndex); + this.contentStream = contentStream; + this.topic = topic; + this.lastAckedMessageIndex = lastAckedMessageIndex; + } + + @Override + public String toString() { + return "topic: '" + this.topic + "'; delimiter: '" + new String(this.delimiterBytes, StandardCharsets.UTF_8) + "'"; + } + + int getLastAckedMessageIndex() { + return this.lastAckedMessageIndex; + } + + int getMaxRequestSize() { + return this.maxRequestSize; + } + + byte[] getKeyBytes() { + return this.keyBytes; + } + + Integer getPartitionId() { + return partitionId; + } + + public void setPartitionId(Integer partitionId) { + this.partitionId = partitionId; + } + + byte[] getDelimiterBytes() { + return this.delimiterBytes; + } + + InputStream getContentStream() { + return this.contentStream; + } + + String getTopic() { + return this.topic; + } + + void setKeyBytes(byte[] keyBytes) { + if (this.keyBytes == null) { + if (keyBytes != null) { + this.assertBytesValid(keyBytes); + this.keyBytes = keyBytes; + } + } else { + throw new IllegalArgumentException("'keyBytes' can only be set once per instance"); + } + } + + void setDelimiterBytes(byte[] delimiterBytes) { + if (this.delimiterBytes == null) { + if (delimiterBytes != null) { + this.assertBytesValid(delimiterBytes); + this.delimiterBytes = delimiterBytes; + } + } else { + throw new IllegalArgumentException("'delimiterBytes' can only be set once per instance"); + } + } + + void setMaxRequestSize(int maxRequestSize) { + if (!this.maxRequestSizeSet) { + if (maxRequestSize > 0) { + this.maxRequestSize = maxRequestSize; + this.maxRequestSizeSet = true; + } else { + throw new IllegalArgumentException("'maxRequestSize' must be > 0"); + } + } else { + throw new IllegalArgumentException("'maxRequestSize' can only be set once per instance"); + } + } + + private void assertBytesValid(byte[] bytes) { + if (bytes != null) { + if (bytes.length == 0) { + throw new IllegalArgumentException("'bytes' must not be empty"); + } + } + } + + private void validateInput(InputStream contentStream, String topic, int lastAckedMessageIndex) { + if (contentStream == null) { + throw new IllegalArgumentException("'contentStream' must not be null"); + } else if (topic == null || topic.trim().length() == 0) { + throw new IllegalArgumentException("'topic' must not be null or empty"); + } else if (lastAckedMessageIndex < -1) { + throw new IllegalArgumentException("'lastAckedMessageIndex' must be >= -1"); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java new file mode 100644 index 0000000..4dc8d18 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a " + + "user-specified delimiter, such as a new-line.") +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", + description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be" + + " overriden with warning message describing the override." + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.") +public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { + + private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; + + private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; + + public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", + "FlowFile will be routed to" + + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); + public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", + "FlowFile will be routed" + + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" + + " <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes"); + public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", + "FlowFile will be routed to success after" + + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" + + " in data loss."); + + /** + * AllowableValue for sending messages to Kafka without compression + */ + public static final AllowableValue COMPRESSION_CODEC_NONE = new AllowableValue("none", "None", + "Compression will not be used for any topic."); + + /** + * AllowableValue for sending messages to Kafka with GZIP compression + */ + public static final AllowableValue COMPRESSION_CODEC_GZIP = new AllowableValue("gzip", "GZIP", + "Compress messages using GZIP"); + + /** + * AllowableValue for sending messages to Kafka with Snappy compression + */ + public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", + "Compress messages using Snappy"); + + static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin", + "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + + "the next Partition to Partition 2, and so on, wrapping as necessary."); + static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random", + "Messages will be assigned to random partitions."); + static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined", + "The <Partition> property will be used to determine the partition. All messages within the same FlowFile will be " + + "assigned to the same partition."); + + public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() + .name("Known Brokers") + .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") + .required(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() + .name("Topic Name") + .description("The Kafka Topic of interest") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder() + .name("Partition Strategy") + .description("Specifies how messages should be partitioned when sent to Kafka") + .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING) + .defaultValue(ROUND_ROBIN_PARTITIONING.getValue()) + .required(true) + .build(); + public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() + .name("Partition") + .description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages " + + "in the same FlowFile will be sent to the same partition. If a partition is specified but is not valid, " + + "then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); + public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("Kafka Key") + .description("The Key to use for the Message") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() + .name("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka").required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); + public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() + .name("Message Delimiter") + .description("Specifies the delimiter (interpreted in its UTF-8 byte representation) to use for splitting apart multiple messages within a single FlowFile. " + + "If not specified, the entire content of the FlowFile will be used as a single message. If specified, " + + "the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka " + + "message. Note that if messages are delimited and some messages for a given FlowFile are transferred " + + "successfully while others are not, the messages will be split into individual FlowFiles, such that those " + + "messages that were successfully sent are routed to the 'success' relationship while other messages are " + + "sent to the 'failure' relationship.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Max Buffer Size") + .description("The maximum amount of data to buffer in memory before sending to Kafka") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("5 MB") + .build(); + static final PropertyDescriptor MAX_RECORD_SIZE = new PropertyDescriptor.Builder() + .name("Max Record Size") + .description("The maximum size that any individual record can be.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .required(true) + .defaultValue("1 MB") + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs").build(); + public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() + .name("Client Name") + .description("Client Name to use when communicating with Kafka") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() + .name("Async Batch Size") + .displayName("Batch Size") + .description("This configuration controls the default batch size in bytes.The producer will attempt to batch records together into " + + "fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client " + + "and the server.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("16384") // Kafka default + .build(); + public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() + .name("Queue Buffering Max Time") + .description("Maximum time to buffer data before sending to Kafka. For example a setting of 100 ms" + + " will try to batch together 100 milliseconds' worth of messages to send at once. This will improve" + + " throughput but adds message delivery latency due to the buffering.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("Compression Codec") + .description("This parameter allows you to specify the compression codec for all" + + " data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) + .defaultValue(COMPRESSION_CODEC_NONE.getValue()) + .build(); + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); + + protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id"; + + protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx"; + + protected static final String FAILED_TOPIC_ATTR = "failed.topic"; + + protected static final String FAILED_KEY_ATTR = "failed.key"; + + protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter"; + + private static final List<PropertyDescriptor> propertyDescriptors; + + private static final Set<Relationship> relationships; + + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(SEED_BROKERS); + _propertyDescriptors.add(TOPIC); + _propertyDescriptors.add(PARTITION_STRATEGY); + _propertyDescriptors.add(PARTITION); + _propertyDescriptors.add(KEY); + _propertyDescriptors.add(DELIVERY_GUARANTEE); + _propertyDescriptors.add(MESSAGE_DELIMITER); + _propertyDescriptors.add(MAX_BUFFER_SIZE); + _propertyDescriptors.add(MAX_RECORD_SIZE); + _propertyDescriptors.add(TIMEOUT); + _propertyDescriptors.add(BATCH_NUM_MESSAGES); + _propertyDescriptors.add(QUEUE_BUFFERING_MAX); + _propertyDescriptors.add(COMPRESSION_CODEC); + _propertyDescriptors.add(CLIENT_NAME); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + } + + + /** + * Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile} + * producing a result {@link FlowFile}. + * <br> + * The result {@link FlowFile} that is successful is then transfered to {@link #REL_SUCCESS} + * <br> + * The result {@link FlowFile} that is failed is then transfered to {@link #REL_FAILURE} + * + */ + @Override + protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException { + boolean processed = false; + FlowFile flowFile = session.get(); + if (flowFile != null) { + flowFile = this.doRendezvousWithKafka(flowFile, context, session); + if (!this.isFailedFlowFile(flowFile)) { + session.getProvenanceReporter().send(flowFile, + context.getProperty(SEED_BROKERS).getValue() + "/" + + context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue()); + session.transfer(flowFile, REL_SUCCESS); + } else { + session.transfer(session.penalize(flowFile), REL_FAILURE); + } + processed = true; + } + return processed; + } + + /** + * Will rendezvous with {@link KafkaPublisher} after building + * {@link PublishingContext} and will produce the resulting {@link FlowFile}. + * The resulting FlowFile contains all required information to determine + * if message publishing originated from the provided FlowFile has actually + * succeeded fully, partially or failed completely (see + * {@link #isFailedFlowFile(FlowFile)}. + */ + private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) { + final AtomicReference<KafkaPublisherResult> publishResultRef = new AtomicReference<>(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream contentStream) throws IOException { + PublishingContext publishingContext = PutKafka.this.buildPublishingContext(flowFile, context, contentStream); + KafkaPublisherResult result = PutKafka.this.kafkaResource.publish(publishingContext); + publishResultRef.set(result); + } + }); + + FlowFile resultFile = publishResultRef.get().isAllAcked() + ? this.cleanUpFlowFileIfNecessary(flowFile, session) + : session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(), flowFile, context)); + + return resultFile; + } + + /** + * Builds {@link PublishingContext} for message(s) to be sent to Kafka. + * {@link PublishingContext} contains all contextual information required by + * {@link KafkaPublisher} to publish to Kafka. Such information contains + * things like topic name, content stream, delimiter, key and last ACKed + * message for cases where provided FlowFile is being retried (failed in the + * past). <br> + * For the clean FlowFile (file that has been sent for the first time), + * PublishingContext will be built form {@link ProcessContext} associated + * with this invocation. <br> + * For the failed FlowFile, {@link PublishingContext} will be built from + * attributes of that FlowFile which by then will already contain required + * information (e.g., topic, key, delimiter etc.). This is required to + * ensure the affinity of the retry in the even where processor + * configuration has changed. However keep in mind that failed FlowFile is + * only considered a failed FlowFile if it is being re-processed by the same + * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see + * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to + * another PublishKafka processor it is treated as a fresh FlowFile + * regardless if it has #FAILED* attributes set. + */ + private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, + InputStream contentStream) { + String topicName; + byte[] keyBytes; + byte[] delimiterBytes = null; + int lastAckedMessageIndex = -1; + if (this.isFailedFlowFile(flowFile)) { + lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX)); + topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR); + keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null + ? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null; + delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null + ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null; + + } else { + topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8); + delimiterBytes = context.getProperty(MESSAGE_DELIMITER).isSet() ? context.getProperty(MESSAGE_DELIMITER) + .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; + } + + PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setKeyBytes(keyBytes); + publishingContext.setDelimiterBytes(delimiterBytes); + publishingContext.setPartitionId(this.determinePartition(context, flowFile)); + return publishingContext; + } + + /** + * Returns 'true' if provided FlowFile is a failed FlowFile. A failed + * FlowFile contains {@link #FAILED_PROC_ID_ATTR}. + */ + private boolean isFailedFlowFile(FlowFile flowFile) { + return this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR)); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) + throws ProcessException { + KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context), this.getLogger()); + return kafkaPublisher; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(); + + final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue(); + if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) + && !validationContext.getProperty(PARTITION).isSet()) { + results.add(new ValidationResult.Builder().subject("Partition").valid(false) + .explanation("The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy") + .build()); + } + return results; + } + + /** + * + */ + private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) { + if (this.isFailedFlowFile(flowFile)) { + Set<String> keysToRemove = new HashSet<>(); + keysToRemove.add(FAILED_DELIMITER_ATTR); + keysToRemove.add(FAILED_KEY_ATTR); + keysToRemove.add(FAILED_TOPIC_ATTR); + keysToRemove.add(FAILED_PROC_ID_ATTR); + keysToRemove.add(FAILED_LAST_ACK_IDX); + flowFile = session.removeAllAttributes(flowFile, keysToRemove); + } + return flowFile; + } + + /** + * + */ + private Integer determinePartition(ProcessContext context, FlowFile flowFile) { + String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); + Integer partitionValue = null; + if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) { + String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); + if (pv != null){ + partitionValue = Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue()); + } + } + return partitionValue; + } + + /** + * Builds a {@link Map} of FAILED_* attributes + * + * @see #FAILED_PROC_ID_ATTR + * @see #FAILED_LAST_ACK_IDX + * @see #FAILED_TOPIC_ATTR + * @see #FAILED_KEY_ATTR + * @see #FAILED_DELIMITER_ATTR + */ + private Map<String, String> buildFailedFlowFileAttributes(int lastAckedMessageIndex, FlowFile sourceFlowFile, + ProcessContext context) { + Map<String, String> attributes = new HashMap<>(); + attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier()); + attributes.put(FAILED_LAST_ACK_IDX, String.valueOf(lastAckedMessageIndex)); + attributes.put(FAILED_TOPIC_ATTR, context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue()); + attributes.put(FAILED_KEY_ATTR, context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue()); + attributes.put(FAILED_DELIMITER_ATTR, context.getProperty(MESSAGE_DELIMITER).isSet() + ? context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(sourceFlowFile).getValue() + : null); + return attributes; + } + + /** + * + */ + private Properties buildKafkaConfigProperties(final ProcessContext context) { + Properties properties = new Properties(); + String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + properties.setProperty("bootstrap.servers", context.getProperty(SEED_BROKERS).getValue()); + properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); + properties.setProperty("buffer.memory", String.valueOf(context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue())); + properties.setProperty("compression.type", context.getProperty(COMPRESSION_CODEC).getValue()); + properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue()); + + properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); + Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); + if (queueBufferingMillis != null) { + properties.setProperty("linger.ms", String.valueOf(queueBufferingMillis)); + } + properties.setProperty("max.request.size", String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue())); + properties.setProperty("timeout.ms", timeout); + properties.setProperty("metadata.fetch.timeout.ms", timeout); + + String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); + String partitionerClass = null; + if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) { + partitionerClass = Partitioners.RoundRobinPartitioner.class.getName(); + } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) { + partitionerClass = Partitioners.RandomPartitioner.class.getName(); + } + properties.setProperty("partitioner.class", partitionerClass); + + // Set Dynamic Properties + for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + if (properties.containsKey(descriptor.getName())) { + this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '" + + properties.getProperty(descriptor.getName()) + "' with dynamically set value '" + + entry.getValue() + "'."); + } + properties.setProperty(descriptor.getName(), entry.getValue()); + } + } + return properties; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..b478d9f --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.kafka.GetKafka +org.apache.nifi.processors.kafka.PutKafka \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html new file mode 100644 index 0000000..10c7082 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html @@ -0,0 +1,45 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>GetKafka</title> + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <!-- Processor Documentation ================================================== --> + <h2>Description:</h2> + <p> + This Processors polls <a href="http://kafka.apache.org/">Apache Kafka</a> + for data. When a message is received from Kafka, this Processor emits a FlowFile + where the content of the FlowFile is the value of the Kafka message. If the + message has a key associated with it, an attribute named <code>kafka.key</code> + will be added to the FlowFile, with the value being the UTF-8 Encoded value + of the Message's Key. + </p> + <p> + Kafka supports the notion of a Consumer Group when pulling messages in order to + provide scalability while still offering a publish-subscribe interface. Each + Consumer Group must have a unique identifier. The Consumer Group identifier that + is used by NiFi is the UUID of the Processor. This means that all of the nodes + within a cluster will use the same Consumer Group Identifier so that they do + not receive duplicate data but multiple GetKafka Processors can be used to pull + from multiple Topics, as each Processor will receive a different Processor UUID + and therefore a different Consumer Group Identifier. + </p> + </body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html new file mode 100644 index 0000000..d51ce95 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html @@ -0,0 +1,45 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>PutKafka</title> + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <!-- Processor Documentation ================================================== --> + <h2>Description:</h2> + <p> + This Processors puts the contents of a FlowFile to a Topic in + <a href="http://kafka.apache.org/">Apache Kafka</a>. The full contents of + a FlowFile becomes the contents of a single message in Kafka. + This message is optionally assigned a key by using the + <Kafka Key> Property. + </p> + + <p> + The Processor allows the user to configure an optional Message Delimiter that + can be used to send many messages per FlowFile. For example, a \n could be used + to indicate that the contents of the FlowFile should be used to send one message + per line of text. If the property is not set, the entire contents of the FlowFile + will be sent as a single message. When using the delimiter, if some messages are + successfully sent but other messages fail to send, the FlowFile will be FORKed into + two child FlowFiles, with the successfully sent messages being routed to 'success' + and the messages that could not be sent going to 'failure'. + </p> + </body> +</html>