http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java deleted file mode 100644 index 7660305..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ /dev/null @@ -1,484 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.Validator; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; - -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.ConsumerTimeoutException; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; - -@SupportsBatching -@InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Fetches messages from Apache Kafka") -@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"}) -@WritesAttributes({ - @WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"), - @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If" - + " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"), - @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"), - @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1")}) -@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", - description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." - + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be" - + " overriden with warning message describing the override." - + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.") -public class GetKafka extends AbstractProcessor { - - public static final String SMALLEST = "smallest"; - public static final String LARGEST = "largest"; - - public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder() - .name("ZooKeeper Connection String") - .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port>" - + " combinations. For example, host1:2181,host2:2181,host3:2188") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); - public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() - .name("Topic Name") - .description("The Kafka Topic to pull messages from") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); - public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder() - .name("Zookeeper Commit Frequency") - .description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will" - + " result in better overall performance but can result in more data duplication if a NiFi node is lost") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("60 secs") - .build(); - public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder() - .name("ZooKeeper Communications Timeout") - .description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("30 secs") - .build(); - public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder() - .name("Kafka Communications Timeout") - .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("30 secs") - .build(); - public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be " - + "concatenated together with the <Message Demarcator> string placed between the content of each message. " - + "If the messages from Kafka should not be concatenated together, leave this value at 1.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("1") - .build(); - public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() - .name("Message Demarcator") - .description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the <Batch Size> " - + "property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, " - + "this value will be placed in between them.") - .required(true) - .addValidator(Validator.VALID) // accept anything as a demarcator, including empty string - .expressionLanguageSupported(false) - .defaultValue("\\n") - .build(); - - public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() - .name("Client Name") - .description("Client Name to use when communicating with Kafka") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); - public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() - .name("Group ID") - .description("A Group ID is used to identify consumers that are within the same consumer group") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); - - public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() - .name("Auto Offset Reset") - .description("Automatically reset the offset to the smallest or largest offset available on the broker") - .required(true) - .allowableValues(SMALLEST, LARGEST) - .defaultValue(LARGEST) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All FlowFiles that are created are routed to this relationship") - .build(); - - private final BlockingQueue<ConsumerIterator<byte[], byte[]>> streamIterators = new LinkedBlockingQueue<>(); - private volatile ConsumerConnector consumer; - - private final AtomicBoolean consumerStreamsReady = new AtomicBoolean(); - - private volatile long deadlockTimeout; - - private volatile ExecutorService executor; - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(CLIENT_NAME) - .defaultValue("NiFi-" + getIdentifier()) - .build(); - final PropertyDescriptor groupIdWithDefault = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(GROUP_ID) - .defaultValue(getIdentifier()) - .build(); - - final List<PropertyDescriptor> props = new ArrayList<>(); - props.add(ZOOKEEPER_CONNECTION_STRING); - props.add(TOPIC); - props.add(ZOOKEEPER_COMMIT_DELAY); - props.add(BATCH_SIZE); - props.add(MESSAGE_DEMARCATOR); - props.add(clientNameWithDefault); - props.add(groupIdWithDefault); - props.add(KAFKA_TIMEOUT); - props.add(ZOOKEEPER_TIMEOUT); - props.add(AUTO_OFFSET_RESET); - return props; - } - - @Override - public Set<Relationship> getRelationships() { - final Set<Relationship> relationships = new HashSet<>(1); - relationships.add(REL_SUCCESS); - return relationships; - } - - public void createConsumers(final ProcessContext context) { - final String topic = context.getProperty(TOPIC).getValue(); - - final Properties props = new Properties(); - props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); - props.setProperty("group.id", context.getProperty(GROUP_ID).getValue()); - props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); - props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS))); - props.setProperty("auto.offset.reset", context.getProperty(AUTO_OFFSET_RESET).getValue()); - props.setProperty("zookeeper.connection.timeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString()); - props.setProperty("socket.timeout.ms", context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString()); - - for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { - PropertyDescriptor descriptor = entry.getKey(); - if (descriptor.isDynamic()) { - if (props.containsKey(descriptor.getName())) { - this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '" - + props.getProperty(descriptor.getName()) + "' with dynamically set value '" + entry.getValue() + "'."); - } - props.setProperty(descriptor.getName(), entry.getValue()); - } - } - - /* - * Unless user sets it to some explicit value we are setting it to the - * lowest possible value of 1 millisecond to ensure the - * consumerStream.hasNext() doesn't block. See - * http://kafka.apache.org/documentation.html#configuration) as well as - * comment in 'catch ConsumerTimeoutException' in onTrigger() for more - * explanation as to the reasoning behind it. - */ - if (!props.containsKey("consumer.timeout.ms")) { - this.getLogger().info("Setting 'consumer.timeout.ms' to 1 milliseconds to avoid consumer" - + " block in the event when no events are present in Kafka topic. If you wish to change this value " - + " set it as dynamic property. If you wish to explicitly enable consumer block (at your own risk)" - + " set its value to -1."); - props.setProperty("consumer.timeout.ms", "1"); - } - - int partitionCount = KafkaUtils.retrievePartitionCountForTopic( - context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue()); - - final ConsumerConfig consumerConfig = new ConsumerConfig(props); - consumer = Consumer.createJavaConsumerConnector(consumerConfig); - - final Map<String, Integer> topicCountMap = new HashMap<>(1); - - int concurrentTaskToUse = context.getMaxConcurrentTasks(); - if (context.getMaxConcurrentTasks() < partitionCount){ - this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " - + "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. " - + "Consider making it equal to the amount of partition count for most efficient event consumption."); - } else if (context.getMaxConcurrentTasks() > partitionCount){ - concurrentTaskToUse = partitionCount; - this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " - + "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. " - + "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to consume events"); - } - - topicCountMap.put(topic, concurrentTaskToUse); - - final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); - final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); - - this.streamIterators.clear(); - - for (final KafkaStream<byte[], byte[]> stream : streams) { - streamIterators.add(stream.iterator()); - } - this.consumerStreamsReady.set(true); - } - - @OnStopped - public void shutdownConsumer() { - this.consumerStreamsReady.set(false); - if (consumer != null) { - try { - consumer.commitOffsets(); - } finally { - consumer.shutdown(); - } - } - if (this.executor != null) { - this.executor.shutdown(); - try { - if (!this.executor.awaitTermination(30000, TimeUnit.MILLISECONDS)) { - this.executor.shutdownNow(); - getLogger().warn("Executor did not stop in 30 sec. Terminated."); - } - this.executor = null; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") - .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) - .build(); - } - - @OnScheduled - public void schedule(ProcessContext context) { - this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - /* - * Will ensure that consumer streams are ready upon the first invocation - * of onTrigger. Will be reset to 'false' in the event of exception - */ - synchronized (this.consumerStreamsReady) { - if (this.executor == null || this.executor.isShutdown()) { - this.executor = Executors.newCachedThreadPool(); - } - if (!this.consumerStreamsReady.get()) { - Future<Void> f = this.executor.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - createConsumers(context); - return null; - } - }); - try { - f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - shutdownConsumer(); - f.cancel(true); - Thread.currentThread().interrupt(); - getLogger().warn("Interrupted while waiting to get connection", e); - } catch (ExecutionException e) { - throw new IllegalStateException(e); - } catch (TimeoutException e) { - shutdownConsumer(); - f.cancel(true); - getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while waiting to get connection", e); - } - } - } - //=== - if (this.consumerStreamsReady.get()) { - Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - ConsumerIterator<byte[], byte[]> iterator = getStreamIterator(); - if (iterator != null) { - consumeFromKafka(context, session, iterator); - } - return null; - } - }); - try { - consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - shutdownConsumer(); - consumptionFuture.cancel(true); - Thread.currentThread().interrupt(); - getLogger().warn("Interrupted while consuming messages", e); - } catch (ExecutionException e) { - throw new IllegalStateException(e); - } catch (TimeoutException e) { - shutdownConsumer(); - consumptionFuture.cancel(true); - getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while consuming messages", e); - } - } - } - - protected ConsumerIterator<byte[], byte[]> getStreamIterator() { - return this.streamIterators.poll(); - } - - - private void consumeFromKafka(final ProcessContext context, final ProcessSession session, - ConsumerIterator<byte[], byte[]> iterator) throws ProcessException { - - final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); - final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); - final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); - final String topic = context.getProperty(TOPIC).getValue(); - - FlowFile flowFile = session.create(); - - final Map<String, String> attributes = new HashMap<>(); - attributes.put("kafka.topic", topic); - final long start = System.nanoTime(); - int msgCount = 0; - - try { - for (; msgCount < batchSize && iterator.hasNext(); msgCount++) { - final MessageAndMetadata<byte[], byte[]> mam = iterator.next(); - - if (batchSize == 1) { - final byte[] key = mam.key(); - // the kafka.key, kafka.offset, and kafka.partition attributes are added only - // for a batch size of 1. - if (key != null) { - attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8)); - } - - attributes.put("kafka.offset", String.valueOf(mam.offset())); - attributes.put("kafka.partition", String.valueOf(mam.partition())); - } - - // add the message to the FlowFile's contents - final boolean firstMessage = msgCount == 0; - flowFile = session.append(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - if (!firstMessage) { - out.write(demarcatorBytes); - } - out.write(mam.message()); - } - }); - } - this.releaseFlowFile(flowFile, session, attributes, start, topic, msgCount); - } catch (ConsumerTimeoutException e) { - /* - * By default Kafka blocks indefinitely if topic is empty via - * stream.hasNext(). If 'consumer.timeout.ms' property is set (see - * http://kafka.apache.org/documentation.html#configuration) the - * hasNext() will fail with this exception. To this processor it - * simply means there are no messages and current task should exit - * in non-failure releasing the flow file if it was able to - * accumulate any events. - */ - this.releaseFlowFile(flowFile, session, attributes, start, topic, msgCount); - } catch (final Exception e) { - this.shutdownConsumer(); - getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[]{e}); - if (flowFile != null) { - session.remove(flowFile); - } - } finally { - // Add the iterator back to the queue - if (iterator != null) { - streamIterators.offer(iterator); - } - } - } - - /** - * Will release flow file. Releasing of the flow file in the context of this - * operation implies the following: - * - * If Empty then remove from session and return - * If has something then transfer to REL_SUCCESS - */ - private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map<String, String> attributes, long start, String topic, int msgCount){ - if (flowFile.getSize() == 0L) { - session.remove(flowFile); - } else { - flowFile = session.putAllAttributes(flowFile, attributes); - final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, "Received " + msgCount + " Kafka messages", millis); - getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[]{flowFile, msgCount, millis}); - session.transfer(flowFile, REL_SUCCESS); - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java deleted file mode 100644 index 5bc0e0e..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka; - -import java.io.Closeable; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.stream.io.util.StreamDemarcator; - -import kafka.producer.Partitioner; - -/** - * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor - * with sending contents of the {@link FlowFile}s to Kafka. - */ -class KafkaPublisher implements Closeable { - - private final Producer<byte[], byte[]> kafkaProducer; - - private long ackWaitTime = 30000; - - private final ComponentLog componentLog; - - private final Partitioner partitioner; - - private final int ackCheckSize; - - KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) { - this(kafkaProperties, 100, componentLog); - } - - /** - * Creates an instance of this class as well as the instance of the - * corresponding Kafka {@link KafkaProducer} using provided Kafka - * configuration properties. - * - * @param kafkaProperties - * instance of {@link Properties} used to bootstrap - * {@link KafkaProducer} - */ - KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) { - kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - this.kafkaProducer = new KafkaProducer<>(kafkaProperties); - this.ackCheckSize = ackCheckSize; - try { - if (kafkaProperties.containsKey("partitioner.class")) { - this.partitioner = (Partitioner) Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance(); - } else { - this.partitioner = null; - } - } catch (Exception e) { - throw new IllegalStateException("Failed to create partitioner", e); - } - this.componentLog = componentLog; - } - - /** - * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to - * determine how many messages to Kafka will be sent from a provided - * {@link InputStream} (see {@link PublishingContext#getContentStream()}). - * It supports two publishing modes: - * <ul> - * <li>Sending all messages constructed from - * {@link StreamDemarcator#nextToken()} operation.</li> - * <li>Sending only unacknowledged messages constructed from - * {@link StreamDemarcator#nextToken()} operation.</li> - * </ul> - * The unacknowledged messages are determined from the value of - * {@link PublishingContext#getLastAckedMessageIndex()}. - * <br> - * This method assumes content stream affinity where it is expected that the - * content stream that represents the same Kafka message(s) will remain the - * same across possible retries. This is required specifically for cases - * where delimiter is used and a single content stream may represent - * multiple Kafka messages. The - * {@link PublishingContext#getLastAckedMessageIndex()} will provide the - * index of the last ACKed message, so upon retry only messages with the - * higher index are sent. - * - * @param publishingContext - * instance of {@link PublishingContext} which hold context - * information about the message(s) to be sent. - * @return The index of the last successful offset. - */ - KafkaPublisherResult publish(PublishingContext publishingContext) { - StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(), - publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize()); - - int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex(); - List<Future<RecordMetadata>> resultFutures = new ArrayList<>(); - - byte[] messageBytes; - int tokenCounter = 0; - boolean continueSending = true; - KafkaPublisherResult result = null; - for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) { - if (prevLastAckedMessageIndex < tokenCounter) { - Integer partitionId = publishingContext.getPartitionId(); - if (partitionId == null && publishingContext.getKeyBytes() != null) { - partitionId = this.getPartition(publishingContext.getKeyBytes(), publishingContext.getTopic()); - } - ProducerRecord<byte[], byte[]> message = - new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getPartitionId(), publishingContext.getKeyBytes(), messageBytes); - resultFutures.add(this.kafkaProducer.send(message)); - - if (tokenCounter % this.ackCheckSize == 0) { - int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); - resultFutures.clear(); - if (lastAckedMessageIndex % this.ackCheckSize != 0) { - continueSending = false; - result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); - } - prevLastAckedMessageIndex = lastAckedMessageIndex; - } - } - } - - if (result == null) { - int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); - resultFutures.clear(); - result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); - } - return result; - } - - /** - * Sets the time this publisher will wait for the {@link Future#get()} - * operation (the Future returned by - * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing - * out. - * - * This value will also be used as a timeout when closing the underlying - * {@link KafkaProducer}. See {@link #close()}. - */ - void setAckWaitTime(long ackWaitTime) { - this.ackWaitTime = ackWaitTime; - } - - /** - * This operation will process ACKs from Kafka in the order in which - * {@link KafkaProducer#send(ProducerRecord)} invocation were made returning - * the index of the last ACKed message. Within this operation processing ACK - * simply means successful invocation of 'get()' operation on the - * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)} - * operation. Upon encountering any type of error while interrogating such - * {@link Future} the ACK loop will end. Messages that were not ACKed would - * be considered non-delivered and therefore could be resent at the later - * time. - * - * @param sendFutures - * list of {@link Future}s representing results of publishing to - * Kafka - * - * @param lastAckMessageIndex - * the index of the last ACKed message. It is important to - * provide the last ACKed message especially while re-trying so - * the proper index is maintained. - */ - private int processAcks(List<Future<RecordMetadata>> sendFutures, int lastAckMessageIndex) { - boolean exceptionThrown = false; - for (int segmentCounter = 0; segmentCounter < sendFutures.size() && !exceptionThrown; segmentCounter++) { - Future<RecordMetadata> future = sendFutures.get(segmentCounter); - try { - future.get(this.ackWaitTime, TimeUnit.MILLISECONDS); - lastAckMessageIndex++; - } catch (InterruptedException e) { - exceptionThrown = true; - Thread.currentThread().interrupt(); - this.warnOrError("Interrupted while waiting for acks from Kafka", null); - } catch (ExecutionException e) { - exceptionThrown = true; - this.warnOrError("Failed while waiting for acks from Kafka", e); - } catch (TimeoutException e) { - exceptionThrown = true; - this.warnOrError("Timed out while waiting for acks from Kafka", null); - } - } - - return lastAckMessageIndex; - } - - /** - * Will close the underlying {@link KafkaProducer} - */ - @Override - public void close() { - this.kafkaProducer.close(); - } - - /** - * - */ - private void warnOrError(String message, Exception e) { - if (e == null) { - this.componentLog.warn(message); - } else { - this.componentLog.error(message); - } - } - - static class KafkaPublisherResult { - private final int messagesSent; - private final int lastMessageAcked; - KafkaPublisherResult(int messagesSent, int lastMessageAcked) { - this.messagesSent = messagesSent; - this.lastMessageAcked = lastMessageAcked; - } - - public int getMessagesSent() { - return this.messagesSent; - } - - public int getLastMessageAcked() { - return this.lastMessageAcked; - } - - public boolean isAllAcked() { - return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked; - } - - @Override - public String toString() { - return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked; - } - } - - /** - * - */ - private int getPartition(Object key, String topicName) { - if (this.partitioner != null) { - int partSize = this.kafkaProducer.partitionsFor(topicName).size(); - return this.partitioner.partition(key, partSize); - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java deleted file mode 100644 index 8ddea61..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java +++ /dev/null @@ -1,74 +0,0 @@ -package org.apache.nifi.processors.kafka; - -import java.util.Collections; - -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.exception.ZkMarshallingError; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import org.I0Itec.zkclient.serialize.ZkSerializer; - -import kafka.admin.AdminUtils; -import kafka.api.TopicMetadata; -import kafka.utils.ZKStringSerializer; -import scala.collection.JavaConversions; - -/** - * Utility class to support interruction with Kafka internals. - * - */ -class KafkaUtils { - - - /** - * Will retrieve the amount of partitions for a given Kafka topic. - */ - static int retrievePartitionCountForTopic(String zookeeperConnectionString, String topicName) { - ZkClient zkClient = null; - - try { - zkClient = new ZkClient(zookeeperConnectionString); - zkClient.setZkSerializer(new ZkSerializer() { - @Override - public byte[] serialize(Object o) throws ZkMarshallingError { - return ZKStringSerializer.serialize(o); - } - - @Override - public Object deserialize(byte[] bytes) throws ZkMarshallingError { - return ZKStringSerializer.deserialize(bytes); - } - }); - scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils - .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); - if (topicMetadatas != null && topicMetadatas.size() > 0) { - return JavaConversions.asJavaSet(topicMetadatas).iterator().next().partitionsMetadata().size(); - } else { - throw new IllegalStateException("Failed to get metadata for topic " + topicName); - } - } catch (Exception e) { - throw new IllegalStateException("Failed to retrieve partitions for topic " + topicName, e); - } finally { - try { - zkClient.close(); - } catch (Exception e2) { - // ignore - } - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java deleted file mode 100644 index 32d3606..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka; - -import java.util.Random; - -import kafka.producer.Partitioner; - -/** - * Collection of implementation of common Kafka {@link Partitioner}s. - */ -final public class Partitioners { - - private Partitioners() { - } - /** - * {@link Partitioner} that implements 'round-robin' mechanism which evenly - * distributes load between all available partitions. - */ - public static class RoundRobinPartitioner implements Partitioner { - private volatile int index; - - @Override - public int partition(Object key, int numberOfPartitions) { - int partitionIndex = this.next(numberOfPartitions); - return partitionIndex; - } - - private synchronized int next(int numberOfPartitions) { - if (this.index >= numberOfPartitions) { - this.index = 0; - } - return index++; - } - } - - /** - * {@link Partitioner} that implements 'random' mechanism which randomly - * distributes the load between all available partitions. - */ - public static class RandomPartitioner implements Partitioner { - private final Random random; - - public RandomPartitioner() { - this.random = new Random(); - } - - @Override - public int partition(Object key, int numberOfPartitions) { - return this.random.nextInt(numberOfPartitions); - } - } - - /** - * {@link Partitioner} that implements 'key hash' mechanism which - * distributes the load between all available partitions based on hashing - * the value of the key. - */ - public static class HashPartitioner implements Partitioner { - - @Override - public int partition(Object key, int numberOfPartitions) { - if (key != null) { - return (key.hashCode() & Integer.MAX_VALUE) % numberOfPartitions; - } - return 0; - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java deleted file mode 100644 index 914ac1a..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka; - -import java.io.InputStream; -import java.nio.charset.StandardCharsets; - -/** - * Holder of context information used by {@link KafkaPublisher} required to - * publish messages to Kafka. - */ -class PublishingContext { - - private final InputStream contentStream; - - private final String topic; - - private final int lastAckedMessageIndex; - - private volatile Integer partitionId; - - /* - * We're using the default value from Kafka. We are using it to control the - * message size before it goes to to Kafka thus limiting possibility of a - * late failures in Kafka client. - */ - private volatile int maxRequestSize = 1048576; // kafka default - - private volatile boolean maxRequestSizeSet; - - private volatile byte[] keyBytes; - - private volatile byte[] delimiterBytes; - - - - PublishingContext(InputStream contentStream, String topic) { - this(contentStream, topic, -1); - } - - PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) { - this.validateInput(contentStream, topic, lastAckedMessageIndex); - this.contentStream = contentStream; - this.topic = topic; - this.lastAckedMessageIndex = lastAckedMessageIndex; - } - - @Override - public String toString() { - return "topic: '" + this.topic + "'; delimiter: '" + new String(this.delimiterBytes, StandardCharsets.UTF_8) + "'"; - } - - int getLastAckedMessageIndex() { - return this.lastAckedMessageIndex; - } - - int getMaxRequestSize() { - return this.maxRequestSize; - } - - byte[] getKeyBytes() { - return this.keyBytes; - } - - Integer getPartitionId() { - return partitionId; - } - - public void setPartitionId(Integer partitionId) { - this.partitionId = partitionId; - } - - byte[] getDelimiterBytes() { - return this.delimiterBytes; - } - - InputStream getContentStream() { - return this.contentStream; - } - - String getTopic() { - return this.topic; - } - - void setKeyBytes(byte[] keyBytes) { - if (this.keyBytes == null) { - if (keyBytes != null) { - this.assertBytesValid(keyBytes); - this.keyBytes = keyBytes; - } - } else { - throw new IllegalArgumentException("'keyBytes' can only be set once per instance"); - } - } - - void setDelimiterBytes(byte[] delimiterBytes) { - if (this.delimiterBytes == null) { - if (delimiterBytes != null) { - this.assertBytesValid(delimiterBytes); - this.delimiterBytes = delimiterBytes; - } - } else { - throw new IllegalArgumentException("'delimiterBytes' can only be set once per instance"); - } - } - - void setMaxRequestSize(int maxRequestSize) { - if (!this.maxRequestSizeSet) { - if (maxRequestSize > 0) { - this.maxRequestSize = maxRequestSize; - this.maxRequestSizeSet = true; - } else { - throw new IllegalArgumentException("'maxRequestSize' must be > 0"); - } - } else { - throw new IllegalArgumentException("'maxRequestSize' can only be set once per instance"); - } - } - - private void assertBytesValid(byte[] bytes) { - if (bytes != null) { - if (bytes.length == 0) { - throw new IllegalArgumentException("'bytes' must not be empty"); - } - } - } - - private void validateInput(InputStream contentStream, String topic, int lastAckedMessageIndex) { - if (contentStream == null) { - throw new IllegalArgumentException("'contentStream' must not be null"); - } else if (topic == null || topic.trim().length() == 0) { - throw new IllegalArgumentException("'topic' must not be null or empty"); - } else if (lastAckedMessageIndex < -1) { - throw new IllegalArgumentException("'lastAckedMessageIndex' must be >= -1"); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java deleted file mode 100644 index 4dc8d18..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ /dev/null @@ -1,519 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Pattern; - -import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult; - -@InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) -@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a " - + "user-specified delimiter, such as a new-line.") -@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", - description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." - + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be" - + " overriden with warning message describing the override." - + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.") -public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { - - private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; - - private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; - - public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", - "FlowFile will be routed to" - + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); - public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", - "FlowFile will be routed" - + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" - + " <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes"); - public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", - "FlowFile will be routed to success after" - + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" - + " in data loss."); - - /** - * AllowableValue for sending messages to Kafka without compression - */ - public static final AllowableValue COMPRESSION_CODEC_NONE = new AllowableValue("none", "None", - "Compression will not be used for any topic."); - - /** - * AllowableValue for sending messages to Kafka with GZIP compression - */ - public static final AllowableValue COMPRESSION_CODEC_GZIP = new AllowableValue("gzip", "GZIP", - "Compress messages using GZIP"); - - /** - * AllowableValue for sending messages to Kafka with Snappy compression - */ - public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", - "Compress messages using Snappy"); - - static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin", - "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " - + "the next Partition to Partition 2, and so on, wrapping as necessary."); - static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random", - "Messages will be assigned to random partitions."); - static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined", - "The <Partition> property will be used to determine the partition. All messages within the same FlowFile will be " - + "assigned to the same partition."); - - public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() - .name("Known Brokers") - .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") - .required(true) - .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) - .expressionLanguageSupported(false) - .build(); - public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() - .name("Topic Name") - .description("The Kafka Topic of interest") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder() - .name("Partition Strategy") - .description("Specifies how messages should be partitioned when sent to Kafka") - .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING) - .defaultValue(ROUND_ROBIN_PARTITIONING.getValue()) - .required(true) - .build(); - public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() - .name("Partition") - .description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages " - + "in the same FlowFile will be sent to the same partition. If a partition is specified but is not valid, " - + "then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(true) - .required(false) - .build(); - public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() - .name("Kafka Key") - .description("The Key to use for the Message") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() - .name("Delivery Guarantee") - .description("Specifies the requirement for guaranteeing that a message is sent to Kafka").required(true) - .expressionLanguageSupported(false) - .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) - .defaultValue(DELIVERY_BEST_EFFORT.getValue()) - .build(); - public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() - .name("Message Delimiter") - .description("Specifies the delimiter (interpreted in its UTF-8 byte representation) to use for splitting apart multiple messages within a single FlowFile. " - + "If not specified, the entire content of the FlowFile will be used as a single message. If specified, " - + "the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka " - + "message. Note that if messages are delimited and some messages for a given FlowFile are transferred " - + "successfully while others are not, the messages will be split into individual FlowFiles, such that those " - + "messages that were successfully sent are routed to the 'success' relationship while other messages are " - + "sent to the 'failure' relationship.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Max Buffer Size") - .description("The maximum amount of data to buffer in memory before sending to Kafka") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("5 MB") - .build(); - static final PropertyDescriptor MAX_RECORD_SIZE = new PropertyDescriptor.Builder() - .name("Max Record Size") - .description("The maximum size that any individual record can be.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .required(true) - .defaultValue("1 MB") - .build(); - public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() - .name("Communications Timeout") - .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("30 secs").build(); - public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() - .name("Client Name") - .description("Client Name to use when communicating with Kafka") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); - public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() - .name("Async Batch Size") - .displayName("Batch Size") - .description("This configuration controls the default batch size in bytes.The producer will attempt to batch records together into " - + "fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client " - + "and the server.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("16384") // Kafka default - .build(); - public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() - .name("Queue Buffering Max Time") - .description("Maximum time to buffer data before sending to Kafka. For example a setting of 100 ms" - + " will try to batch together 100 milliseconds' worth of messages to send at once. This will improve" - + " throughput but adds message delivery latency due to the buffering.") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() - .name("Compression Codec") - .description("This parameter allows you to specify the compression codec for all" - + " data generated by this producer.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) - .defaultValue(COMPRESSION_CODEC_NONE.getValue()) - .build(); - - // Relationships - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") - .build(); - - protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id"; - - protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx"; - - protected static final String FAILED_TOPIC_ATTR = "failed.topic"; - - protected static final String FAILED_KEY_ATTR = "failed.key"; - - protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter"; - - private static final List<PropertyDescriptor> propertyDescriptors; - - private static final Set<Relationship> relationships; - - static { - List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); - _propertyDescriptors.add(SEED_BROKERS); - _propertyDescriptors.add(TOPIC); - _propertyDescriptors.add(PARTITION_STRATEGY); - _propertyDescriptors.add(PARTITION); - _propertyDescriptors.add(KEY); - _propertyDescriptors.add(DELIVERY_GUARANTEE); - _propertyDescriptors.add(MESSAGE_DELIMITER); - _propertyDescriptors.add(MAX_BUFFER_SIZE); - _propertyDescriptors.add(MAX_RECORD_SIZE); - _propertyDescriptors.add(TIMEOUT); - _propertyDescriptors.add(BATCH_NUM_MESSAGES); - _propertyDescriptors.add(QUEUE_BUFFERING_MAX); - _propertyDescriptors.add(COMPRESSION_CODEC); - _propertyDescriptors.add(CLIENT_NAME); - propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); - - Set<Relationship> _relationships = new HashSet<>(); - _relationships.add(REL_SUCCESS); - _relationships.add(REL_FAILURE); - relationships = Collections.unmodifiableSet(_relationships); - } - - - /** - * Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile} - * producing a result {@link FlowFile}. - * <br> - * The result {@link FlowFile} that is successful is then transfered to {@link #REL_SUCCESS} - * <br> - * The result {@link FlowFile} that is failed is then transfered to {@link #REL_FAILURE} - * - */ - @Override - protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException { - boolean processed = false; - FlowFile flowFile = session.get(); - if (flowFile != null) { - flowFile = this.doRendezvousWithKafka(flowFile, context, session); - if (!this.isFailedFlowFile(flowFile)) { - session.getProvenanceReporter().send(flowFile, - context.getProperty(SEED_BROKERS).getValue() + "/" - + context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue()); - session.transfer(flowFile, REL_SUCCESS); - } else { - session.transfer(session.penalize(flowFile), REL_FAILURE); - } - processed = true; - } - return processed; - } - - /** - * Will rendezvous with {@link KafkaPublisher} after building - * {@link PublishingContext} and will produce the resulting {@link FlowFile}. - * The resulting FlowFile contains all required information to determine - * if message publishing originated from the provided FlowFile has actually - * succeeded fully, partially or failed completely (see - * {@link #isFailedFlowFile(FlowFile)}. - */ - private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) { - final AtomicReference<KafkaPublisherResult> publishResultRef = new AtomicReference<>(); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream contentStream) throws IOException { - PublishingContext publishingContext = PutKafka.this.buildPublishingContext(flowFile, context, contentStream); - KafkaPublisherResult result = PutKafka.this.kafkaResource.publish(publishingContext); - publishResultRef.set(result); - } - }); - - FlowFile resultFile = publishResultRef.get().isAllAcked() - ? this.cleanUpFlowFileIfNecessary(flowFile, session) - : session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(), flowFile, context)); - - return resultFile; - } - - /** - * Builds {@link PublishingContext} for message(s) to be sent to Kafka. - * {@link PublishingContext} contains all contextual information required by - * {@link KafkaPublisher} to publish to Kafka. Such information contains - * things like topic name, content stream, delimiter, key and last ACKed - * message for cases where provided FlowFile is being retried (failed in the - * past). <br> - * For the clean FlowFile (file that has been sent for the first time), - * PublishingContext will be built form {@link ProcessContext} associated - * with this invocation. <br> - * For the failed FlowFile, {@link PublishingContext} will be built from - * attributes of that FlowFile which by then will already contain required - * information (e.g., topic, key, delimiter etc.). This is required to - * ensure the affinity of the retry in the even where processor - * configuration has changed. However keep in mind that failed FlowFile is - * only considered a failed FlowFile if it is being re-processed by the same - * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see - * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to - * another PublishKafka processor it is treated as a fresh FlowFile - * regardless if it has #FAILED* attributes set. - */ - private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, - InputStream contentStream) { - String topicName; - byte[] keyBytes; - byte[] delimiterBytes = null; - int lastAckedMessageIndex = -1; - if (this.isFailedFlowFile(flowFile)) { - lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX)); - topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR); - keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null - ? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null; - delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null - ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null; - - } else { - topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8); - delimiterBytes = context.getProperty(MESSAGE_DELIMITER).isSet() ? context.getProperty(MESSAGE_DELIMITER) - .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; - } - - PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); - publishingContext.setKeyBytes(keyBytes); - publishingContext.setDelimiterBytes(delimiterBytes); - publishingContext.setPartitionId(this.determinePartition(context, flowFile)); - return publishingContext; - } - - /** - * Returns 'true' if provided FlowFile is a failed FlowFile. A failed - * FlowFile contains {@link #FAILED_PROC_ID_ATTR}. - */ - private boolean isFailedFlowFile(FlowFile flowFile) { - return this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR)); - } - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @Override - protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) - throws ProcessException { - KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context), this.getLogger()); - return kafkaPublisher; - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return propertyDescriptors; - } - - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") - .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) - .build(); - } - - @Override - protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { - final List<ValidationResult> results = new ArrayList<>(); - - final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue(); - if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) - && !validationContext.getProperty(PARTITION).isSet()) { - results.add(new ValidationResult.Builder().subject("Partition").valid(false) - .explanation("The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy") - .build()); - } - return results; - } - - /** - * - */ - private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) { - if (this.isFailedFlowFile(flowFile)) { - Set<String> keysToRemove = new HashSet<>(); - keysToRemove.add(FAILED_DELIMITER_ATTR); - keysToRemove.add(FAILED_KEY_ATTR); - keysToRemove.add(FAILED_TOPIC_ATTR); - keysToRemove.add(FAILED_PROC_ID_ATTR); - keysToRemove.add(FAILED_LAST_ACK_IDX); - flowFile = session.removeAllAttributes(flowFile, keysToRemove); - } - return flowFile; - } - - /** - * - */ - private Integer determinePartition(ProcessContext context, FlowFile flowFile) { - String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); - Integer partitionValue = null; - if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) { - String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); - if (pv != null){ - partitionValue = Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue()); - } - } - return partitionValue; - } - - /** - * Builds a {@link Map} of FAILED_* attributes - * - * @see #FAILED_PROC_ID_ATTR - * @see #FAILED_LAST_ACK_IDX - * @see #FAILED_TOPIC_ATTR - * @see #FAILED_KEY_ATTR - * @see #FAILED_DELIMITER_ATTR - */ - private Map<String, String> buildFailedFlowFileAttributes(int lastAckedMessageIndex, FlowFile sourceFlowFile, - ProcessContext context) { - Map<String, String> attributes = new HashMap<>(); - attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier()); - attributes.put(FAILED_LAST_ACK_IDX, String.valueOf(lastAckedMessageIndex)); - attributes.put(FAILED_TOPIC_ATTR, context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue()); - attributes.put(FAILED_KEY_ATTR, context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue()); - attributes.put(FAILED_DELIMITER_ATTR, context.getProperty(MESSAGE_DELIMITER).isSet() - ? context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(sourceFlowFile).getValue() - : null); - return attributes; - } - - /** - * - */ - private Properties buildKafkaConfigProperties(final ProcessContext context) { - Properties properties = new Properties(); - String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - properties.setProperty("bootstrap.servers", context.getProperty(SEED_BROKERS).getValue()); - properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); - properties.setProperty("buffer.memory", String.valueOf(context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue())); - properties.setProperty("compression.type", context.getProperty(COMPRESSION_CODEC).getValue()); - properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue()); - - properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); - Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); - if (queueBufferingMillis != null) { - properties.setProperty("linger.ms", String.valueOf(queueBufferingMillis)); - } - properties.setProperty("max.request.size", String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue())); - properties.setProperty("timeout.ms", timeout); - properties.setProperty("metadata.fetch.timeout.ms", timeout); - - String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); - String partitionerClass = null; - if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) { - partitionerClass = Partitioners.RoundRobinPartitioner.class.getName(); - } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) { - partitionerClass = Partitioners.RandomPartitioner.class.getName(); - } - properties.setProperty("partitioner.class", partitionerClass); - - // Set Dynamic Properties - for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { - PropertyDescriptor descriptor = entry.getKey(); - if (descriptor.isDynamic()) { - if (properties.containsKey(descriptor.getName())) { - this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '" - + properties.getProperty(descriptor.getName()) + "' with dynamically set value '" - + entry.getValue() + "'."); - } - properties.setProperty(descriptor.getName(), entry.getValue()); - } - } - return properties; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor deleted file mode 100644 index b478d9f..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.processors.kafka.GetKafka -org.apache.nifi.processors.kafka.PutKafka \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html deleted file mode 100644 index 10c7082..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html +++ /dev/null @@ -1,45 +0,0 @@ -<!DOCTYPE html> -<html lang="en"> - <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - <head> - <meta charset="utf-8" /> - <title>GetKafka</title> - <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> - </head> - - <body> - <!-- Processor Documentation ================================================== --> - <h2>Description:</h2> - <p> - This Processors polls <a href="http://kafka.apache.org/">Apache Kafka</a> - for data. When a message is received from Kafka, this Processor emits a FlowFile - where the content of the FlowFile is the value of the Kafka message. If the - message has a key associated with it, an attribute named <code>kafka.key</code> - will be added to the FlowFile, with the value being the UTF-8 Encoded value - of the Message's Key. - </p> - <p> - Kafka supports the notion of a Consumer Group when pulling messages in order to - provide scalability while still offering a publish-subscribe interface. Each - Consumer Group must have a unique identifier. The Consumer Group identifier that - is used by NiFi is the UUID of the Processor. This means that all of the nodes - within a cluster will use the same Consumer Group Identifier so that they do - not receive duplicate data but multiple GetKafka Processors can be used to pull - from multiple Topics, as each Processor will receive a different Processor UUID - and therefore a different Consumer Group Identifier. - </p> - </body> -</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html deleted file mode 100644 index d51ce95..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html +++ /dev/null @@ -1,45 +0,0 @@ -<!DOCTYPE html> -<html lang="en"> - <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - <head> - <meta charset="utf-8" /> - <title>PutKafka</title> - <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> - </head> - - <body> - <!-- Processor Documentation ================================================== --> - <h2>Description:</h2> - <p> - This Processors puts the contents of a FlowFile to a Topic in - <a href="http://kafka.apache.org/">Apache Kafka</a>. The full contents of - a FlowFile becomes the contents of a single message in Kafka. - This message is optionally assigned a key by using the - <Kafka Key> Property. - </p> - - <p> - The Processor allows the user to configure an optional Message Delimiter that - can be used to send many messages per FlowFile. For example, a \n could be used - to indicate that the contents of the FlowFile should be used to send one message - per line of text. If the property is not set, the entire contents of the FlowFile - will be sent as a single message. When using the delimiter, if some messages are - successfully sent but other messages fail to send, the FlowFile will be FORKed into - two child FlowFiles, with the successfully sent messages being routed to 'success' - and the messages that could not be sent going to 'failure'. - </p> - </body> -</html>