http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java new file mode 100644 index 0000000..e5255f5 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.xml.bind.DatatypeConverter; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURITY_PROTOCOL; + +@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.9 Consumer API. " + + " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring" + + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the mean time" + + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.") +@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"}) +@WritesAttributes({ + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"), + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY_HEX, description = "The hex encoded key of message if present and if single message"), + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."), + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"), + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", + description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ") +public class ConsumeKafka extends AbstractProcessor { + + private static final long TWO_MB = 2L * 1024L * 1024L; + + static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset"); + + static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset"); + + static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group"); + + static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder() + .name("topic") + .displayName("Topic Name(s)") + .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma seperated.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() + .name(ConsumerConfig.GROUP_ID_CONFIG) + .displayName("Group ID") + .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() + .name(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) + .displayName("Offset Reset") + .description("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any " + + "more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.") + .required(true) + .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE) + .defaultValue(OFFSET_LATEST.getValue()) + .build(); + + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("message-demarcator") + .displayName("Message Demarcator") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .description("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains " + + "all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use " + + "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received " + + "will result in a single FlowFile which " + + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS") + .build(); + static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder() + .name("max.poll.records") + .displayName("Max Poll Records") + .description("Specifies the maximum number of records Kafka should return in a single poll.") + .required(false) + .defaultValue("10000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.") + .build(); + + static final List<PropertyDescriptor> DESCRIPTORS; + static final Set<Relationship> RELATIONSHIPS; + + private volatile byte[] demarcatorBytes = null; + private volatile ConsumerPool consumerPool = null; + + static { + List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors()); + descriptors.add(TOPICS); + descriptors.add(GROUP_ID); + descriptors.add(AUTO_OFFSET_RESET); + descriptors.add(MESSAGE_DEMARCATOR); + descriptors.add(MAX_POLL_RECORDS); + DESCRIPTORS = Collections.unmodifiableList(descriptors); + RELATIONSHIPS = Collections.singleton(REL_SUCCESS); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @OnScheduled + public void prepareProcessing(final ProcessContext context) { + this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() + ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8) + : null; + } + + @OnStopped + public void close() { + demarcatorBytes = null; + final ConsumerPool pool = consumerPool; + consumerPool = null; + if (pool != null) { + pool.close(); + } + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + return KafkaProcessorUtils.validateCommonProperties(validationContext); + } + + private synchronized ConsumerPool getConsumerPool(final ProcessContext context) { + ConsumerPool pool = consumerPool; + if (pool != null) { + return pool; + } + + final Map<String, String> props = new HashMap<>(); + KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props); + final String topicListing = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue(); + final List<String> topics = new ArrayList<>(); + for (final String topic : topicListing.split(",", 100)) { + final String trimmedName = topic.trim(); + if (!trimmedName.isEmpty()) { + topics.add(trimmedName); + } + } + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + return consumerPool = createConsumerPool(context.getMaxConcurrentTasks(), topics, props, getLogger()); + } + + protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { + return new ConsumerPool(maxLeases, topics, props, log); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final long startTimeNanos = System.nanoTime(); + final ConsumerPool pool = getConsumerPool(context); + if (pool == null) { + context.yield(); + return; + } + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap = new HashMap<>(); + + try (final ConsumerLease lease = pool.obtainConsumer()) { + try { + if (lease == null) { + context.yield(); + return; + } + + final boolean foundData = gatherDataFromKafka(lease, partitionRecordMap, context); + if (!foundData) { + session.rollback(); + return; + } + + writeSessionData(context, session, partitionRecordMap, startTimeNanos); + //At-least once commit handling (if order is reversed it is at-most once) + session.commit(); + commitOffsets(lease, partitionRecordMap); + } catch (final KafkaException ke) { + lease.poison(); + getLogger().error("Problem while accessing kafka consumer " + ke, ke); + context.yield(); + session.rollback(); + } + } + } + + private void commitOffsets(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap) { + final Map<TopicPartition, OffsetAndMetadata> partOffsetMap = new HashMap<>(); + partitionRecordMap.entrySet().stream() + .filter(entry -> !entry.getValue().isEmpty()) + .forEach((entry) -> { + long maxOffset = entry.getValue().stream() + .mapToLong(record -> record.offset()) + .max() + .getAsLong(); + partOffsetMap.put(entry.getKey(), new OffsetAndMetadata(maxOffset + 1L)); + }); + lease.commitOffsets(partOffsetMap); + } + + private void writeSessionData( + final ProcessContext context, final ProcessSession session, + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, + final long startTimeNanos) { + if (demarcatorBytes != null) { + partitionRecordMap.entrySet().stream() + .filter(entry -> !entry.getValue().isEmpty()) + .forEach(entry -> { + writeData(context, session, entry.getValue(), startTimeNanos); + }); + } else { + partitionRecordMap.entrySet().stream() + .filter(entry -> !entry.getValue().isEmpty()) + .flatMap(entry -> entry.getValue().stream()) + .forEach(record -> { + writeData(context, session, Collections.singletonList(record), startTimeNanos); + }); + } + } + + private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) { + final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0); + final String offset = String.valueOf(firstRecord.offset()); + final String keyHex = (firstRecord.key() != null) ? DatatypeConverter.printHexBinary(firstRecord.key()) : null; + final String topic = firstRecord.topic(); + final String partition = String.valueOf(firstRecord.partition()); + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, out -> { + boolean useDemarcator = false; + for (final ConsumerRecord<byte[], byte[]> record : records) { + if (useDemarcator) { + out.write(demarcatorBytes); + } + out.write(record.value()); + useDemarcator = true; + } + }); + final Map<String, String> kafkaAttrs = new HashMap<>(); + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset); + if (keyHex != null && records.size() == 1) { + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY_HEX, keyHex); + } + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition); + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic); + if (records.size() > 1) { + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(records.size())); + } + flowFile = session.putAllAttributes(flowFile, kafkaAttrs); + final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos); + final String transitUri = KafkaProcessorUtils.buildTransitURI( + context.getProperty(SECURITY_PROTOCOL).getValue(), + context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(), + topic); + session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis); + this.getLogger().debug("Created {} containing {} messages from Kafka topic {}, partition {}, starting offset {} in {} millis", + new Object[]{flowFile, records.size(), topic, partition, offset, executionDurationMillis}); + session.transfer(flowFile, REL_SUCCESS); + } + + /** + * Populates the given partitionRecordMap with new records until we poll + * that returns no records or until we have enough data. It is important to + * ensure we keep items grouped by their topic and partition so that when we + * bundle them we bundle them intelligently and so that we can set offsets + * properly even across multiple poll calls. + */ + private boolean gatherDataFromKafka(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, ProcessContext context) { + final long startNanos = System.nanoTime(); + boolean foundData = false; + ConsumerRecords<byte[], byte[]> records; + final int maxRecords = context.getProperty(MAX_POLL_RECORDS).asInteger(); + + do { + records = lease.poll(); + + for (final TopicPartition partition : records.partitions()) { + List<ConsumerRecord<byte[], byte[]>> currList = partitionRecordMap.get(partition); + if (currList == null) { + currList = new ArrayList<>(); + partitionRecordMap.put(partition, currList); + } + currList.addAll(records.records(partition)); + if (currList.size() > 0) { + foundData = true; + } + } + //If we received data and we still want to get more + } while (!records.isEmpty() && !checkIfGatheredEnoughData(partitionRecordMap, maxRecords, startNanos)); + return foundData; + } + + /** + * Determines if we have enough data as-is and should move on. + * + * @return true if we've been gathering for more than 500 ms or if we're + * demarcating and have more than 50 flowfiles worth or if we're per message + * and have more than 2000 flowfiles or if totalMessageSize is greater than + * two megabytes; false otherwise + * + * Implementation note: 500 millis and 5 MB are magic numbers. These may + * need to be tuned. They get at how often offsets will get committed to + * kafka relative to how many records will get buffered into memory in a + * poll call before writing to repos. + */ + private boolean checkIfGatheredEnoughData(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, final long maxRecords, final long startTimeNanos) { + + final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos); + + if (durationMillis > 500) { + return true; + } + + int topicPartitionsFilled = 0; + int totalRecords = 0; + long totalRecordSize = 0; + + for (final List<ConsumerRecord<byte[], byte[]>> recordList : partitionRecordMap.values()) { + if (!recordList.isEmpty()) { + topicPartitionsFilled++; + } + totalRecords += recordList.size(); + for (final ConsumerRecord<byte[], byte[]> rec : recordList) { + totalRecordSize += rec.value().length; + } + } + + if (demarcatorBytes != null && demarcatorBytes.length > 0) { + return topicPartitionsFilled > 50; + } else if (totalRecordSize > TWO_MB) { + return true; + } else { + return totalRecords > maxRecords; + } + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java new file mode 100644 index 0000000..b954eba --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.Closeable; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; + +/** + * This class represents a lease to access a Kafka Consumer object. The lease is + * intended to be obtained from a ConsumerPool. The lease is closeable to allow + * for the clean model of a try w/resources whereby non-exceptional cases mean + * the lease will be returned to the pool for future use by others. A given + * lease may only belong to a single thread a time. + */ +public interface ConsumerLease extends Closeable { + + /** + * Executes a poll on the underlying Kafka Consumer. + * + * @return ConsumerRecords retrieved in the poll. + * @throws KafkaException if issue occurs talking to underlying resource. + */ + ConsumerRecords<byte[], byte[]> poll() throws KafkaException; + + /** + * Notifies Kafka to commit the offsets for the specified topic/partition + * pairs to the specified offsets w/the given metadata. This can offer + * higher performance than the other commitOffsets call as it allows the + * kafka client to collect more data from Kafka before committing the + * offsets. + * + * @param offsets offsets + * @throws KafkaException if issue occurs talking to underlying resource. + */ + void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws KafkaException; + + /** + * Notifies that this lease is poisoned and should not be reused. + */ + void poison(); + + /** + * Notifies that this lease is to be returned. The pool may optionally reuse + * this lease with another client. No further references by the caller + * should occur after calling close. + */ + @Override + void close(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java new file mode 100644 index 0000000..3f20b8f --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.nifi.logging.ComponentLog; + +import java.io.Closeable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; + +/** + * A pool of Kafka Consumers for a given topic. Consumers can be obtained by + * calling 'obtainConsumer'. Once closed the pool is ready to be immediately + * used again. + */ +public class ConsumerPool implements Closeable { + + private final AtomicInteger activeLeaseCount = new AtomicInteger(0); + private final int maxLeases; + private final Queue<ConsumerLease> consumerLeases; + private final List<String> topics; + private final Map<String, Object> kafkaProperties; + private final ComponentLog logger; + + private final AtomicLong consumerCreatedCountRef = new AtomicLong(); + private final AtomicLong consumerClosedCountRef = new AtomicLong(); + private final AtomicLong leasesObtainedCountRef = new AtomicLong(); + private final AtomicLong productivePollCountRef = new AtomicLong(); + private final AtomicLong unproductivePollCountRef = new AtomicLong(); + + /** + * Creates a pool of KafkaConsumer objects that will grow up to the maximum + * indicated leases. Consumers are lazily initialized. + * + * @param maxLeases maximum number of active leases in the pool + * @param topics the topics to consume from + * @param kafkaProperties the properties for each consumer + * @param logger the logger to report any errors/warnings + */ + public ConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> kafkaProperties, final ComponentLog logger) { + this.maxLeases = maxLeases; + if (maxLeases <= 0) { + throw new IllegalArgumentException("Max leases value must be greather than zero."); + } + this.logger = logger; + if (topics == null || topics.isEmpty()) { + throw new IllegalArgumentException("Must have a list of one or more topics"); + } + this.topics = topics; + this.kafkaProperties = new HashMap<>(kafkaProperties); + this.consumerLeases = new ArrayDeque<>(); + } + + /** + * Obtains a consumer from the pool if one is available + * + * @return consumer from the pool + * @throws IllegalArgumentException if pool already contains + */ + public ConsumerLease obtainConsumer() { + final ConsumerLease lease; + final int activeLeases; + synchronized (this) { + lease = consumerLeases.poll(); + activeLeases = activeLeaseCount.get(); + } + if (lease == null && activeLeases >= maxLeases) { + logger.warn("No available consumers and cannot create any as max consumer leases limit reached - verify pool settings"); + return null; + } + leasesObtainedCountRef.incrementAndGet(); + return (lease == null) ? createConsumer() : lease; + } + + protected Consumer<byte[], byte[]> createKafkaConsumer() { + return new KafkaConsumer<>(kafkaProperties); + } + + private ConsumerLease createConsumer() { + final Consumer<byte[], byte[]> kafkaConsumer = createKafkaConsumer(); + consumerCreatedCountRef.incrementAndGet(); + try { + kafkaConsumer.subscribe(topics); + } catch (final KafkaException kex) { + try { + kafkaConsumer.close(); + consumerClosedCountRef.incrementAndGet(); + } catch (final Exception ex) { + consumerClosedCountRef.incrementAndGet(); + //ignore + } + throw kex; + } + + final ConsumerLease lease = new ConsumerLease() { + + private volatile boolean poisoned = false; + private volatile boolean closed = false; + + @Override + public ConsumerRecords<byte[], byte[]> poll() { + + if (poisoned) { + throw new KafkaException("The consumer is poisoned and should no longer be used"); + } + + try { + final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(50); + if (records.isEmpty()) { + unproductivePollCountRef.incrementAndGet(); + } else { + productivePollCountRef.incrementAndGet(); + } + return records; + } catch (final KafkaException kex) { + logger.warn("Unable to poll from Kafka consumer so will poison and close this " + kafkaConsumer, kex); + poison(); + close(); + throw kex; + } + } + + @Override + public void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offsets) { + + if (poisoned) { + throw new KafkaException("The consumer is poisoned and should no longer be used"); + } + try { + kafkaConsumer.commitSync(offsets); + } catch (final KafkaException kex) { + logger.warn("Unable to commit kafka consumer offsets so will poison and close this " + kafkaConsumer, kex); + poison(); + close(); + throw kex; + } + } + + @Override + public void close() { + if (closed) { + return; + } + if (poisoned || activeLeaseCount.get() > maxLeases) { + closeConsumer(kafkaConsumer); + activeLeaseCount.decrementAndGet(); + closed = true; + } else { + final boolean added; + synchronized (ConsumerPool.this) { + added = consumerLeases.offer(this); + } + if (!added) { + closeConsumer(kafkaConsumer); + activeLeaseCount.decrementAndGet(); + } + } + } + + @Override + public void poison() { + poisoned = true; + } + }; + activeLeaseCount.incrementAndGet(); + return lease; + } + + /** + * Closes all consumers in the pool. Can be safely recalled. + */ + @Override + public void close() { + final List<ConsumerLease> leases = new ArrayList<>(); + synchronized (this) { + ConsumerLease lease = null; + while ((lease = consumerLeases.poll()) != null) { + leases.add(lease); + } + } + for (final ConsumerLease lease : leases) { + lease.poison(); + lease.close(); + } + } + + private void closeConsumer(final Consumer consumer) { + try { + consumer.unsubscribe(); + } catch (Exception e) { + logger.warn("Failed while unsubscribing " + consumer, e); + } + + try { + consumer.close(); + consumerClosedCountRef.incrementAndGet(); + } catch (Exception e) { + consumerClosedCountRef.incrementAndGet(); + logger.warn("Failed while closing " + consumer, e); + } + } + + PoolStats getPoolStats() { + return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get(), productivePollCountRef.get(), unproductivePollCountRef.get()); + } + + static final class PoolStats { + + final long consumerCreatedCount; + final long consumerClosedCount; + final long leasesObtainedCount; + final long productivePollCount; + final long unproductivePollCount; + + PoolStats( + final long consumerCreatedCount, + final long consumerClosedCount, + final long leasesObtainedCount, + final long productivePollCount, + final long unproductivePollCount + ) { + this.consumerCreatedCount = consumerCreatedCount; + this.consumerClosedCount = consumerClosedCount; + this.leasesObtainedCount = leasesObtainedCount; + this.productivePollCount = productivePollCount; + this.unproductivePollCount = unproductivePollCount; + } + + @Override + public String toString() { + return "Created Consumers [" + consumerCreatedCount + "]\n" + + "Closed Consumers [" + consumerClosedCount + "]\n" + + "Leases Obtained [" + leasesObtainedCount + "]\n" + + "Productive Polls [" + productivePollCount + "]\n" + + "Unproductive Polls [" + unproductivePollCount + "]\n"; + } + + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java new file mode 100644 index 0000000..fd747fc --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.apache.kafka.clients.CommonClientConfigs; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class KafkaProcessorUtils { + + final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; + + private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; + + static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+"); + + static final String KAFKA_KEY_HEX = "kafka.key.hex"; + static final String KAFKA_TOPIC = "kafka.topic"; + static final String KAFKA_PARTITION = "kafka.partition"; + static final String KAFKA_OFFSET = "kafka.offset"; + static final String KAFKA_COUNT = "kafka.count"; + static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT"); + static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL"); + static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT"); + static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL"); + + static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder() + .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + .displayName("Kafka Brokers") + .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) + .expressionLanguageSupported(true) + .defaultValue("localhost:9092") + .build(); + static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() + .name("security.protocol") + .displayName("Security Protocol") + .description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL) + .defaultValue(SEC_PLAINTEXT.getValue()) + .build(); + static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder() + .name("sasl.kerberos.service.name") + .displayName("Kerberos Service Name") + .description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. " + + "Corresponds to Kafka's 'security.protocol' property." + + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("ssl.context.service") + .displayName("SSL Context Service") + .description("Specifies the SSL Context Service to use for communicating with Kafka.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + static List<PropertyDescriptor> getCommonPropertyDescriptors() { + return Arrays.asList( + BOOTSTRAP_SERVERS, + SECURITY_PROTOCOL, + KERBEROS_PRINCIPLE, + SSL_CONTEXT_SERVICE + ); + } + + static Collection<ValidationResult> validateCommonProperties(final ValidationContext validationContext) { + List<ValidationResult> results = new ArrayList<>(); + + String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue(); + + /* + * validates that if one of SASL (Kerberos) option is selected for + * security protocol, then Kerberos principal is provided as well + */ + if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) { + String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue(); + if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) { + results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) + .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" + + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '" + + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.") + .build()); + } + } + + //If SSL or SASL_SSL then CS must be set. + final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol); + final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet(); + if (csSet && !sslProtocol) { + results.add(new ValidationResult.Builder().subject(SECURITY_PROTOCOL.getDisplayName()).valid(false) + .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.").build()); + } + if (!csSet && sslProtocol) { + results.add(new ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false) + .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service").build()); + } + + final String enableAutoCommit = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue(); + if (enableAutoCommit != null && !enableAutoCommit.toLowerCase().equals("false")) { + results.add(new ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) + .explanation("Enable auto commit must be false. It is managed by the processor.").build()); + } + + final String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue(); + if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) { + results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) + .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build()); + } + + final String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue(); + if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) { + results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) + .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build()); + } + + final String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue(); + if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) { + results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) + .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build()); + } + + final String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue(); + if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) { + results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) + .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build()); + } + + return results; + } + + static final class KafkaConfigValidator implements Validator { + + final Class<?> classType; + + public KafkaConfigValidator(final Class classType) { + this.classType = classType; + } + + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + final boolean knownValue = KafkaProcessorUtils.isStaticStringFieldNamePresent(subject, classType, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class); + return new ValidationResult.Builder().subject(subject).explanation("Must be a known configuration parameter for this kafka client").valid(knownValue).build(); + } + }; + + /** + * Builds transit URI for provenance event. The transit URI will be in the + * form of <security.protocol>://<bootstrap.servers>/topic + */ + static String buildTransitURI(String securityProtocol, String brokers, String topic) { + StringBuilder builder = new StringBuilder(); + builder.append(securityProtocol); + builder.append("://"); + builder.append(brokers); + builder.append("/"); + builder.append(topic); + return builder.toString(); + } + + static void buildCommonKafkaProperties(final ProcessContext context, final Class kafkaConfigClass, final Map<String, String> mapToPopulate) { + for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) { + if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) { + // Translate SSLContext Service configuration into Kafka properties + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + if (sslContextService != null && sslContextService.isKeyStoreConfigured()) { + mapToPopulate.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile()); + mapToPopulate.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword()); + final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword(); + mapToPopulate.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPass); + mapToPopulate.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, sslContextService.getKeyStoreType()); + } + + if (sslContextService != null && sslContextService.isTrustStoreConfigured()) { + mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslContextService.getTrustStoreFile()); + mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslContextService.getTrustStorePassword()); + mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType()); + } + } + + String pName = propertyDescriptor.getName(); + String pValue = propertyDescriptor.isExpressionLanguageSupported() + ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue() + : context.getProperty(propertyDescriptor).getValue(); + if (pValue != null) { + if (pName.endsWith(".ms")) { // kafka standard time notation + pValue = String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), TimeUnit.MILLISECONDS)); + } + if (isStaticStringFieldNamePresent(pName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) { + mapToPopulate.put(pName, pValue); + } + } + } + } + + private static boolean isStaticStringFieldNamePresent(final String name, final Class... classes) { + return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name); + } + + private static Set<String> getPublicStaticStringFieldValues(final Class... classes) { + final Set<String> strings = new HashSet<>(); + for (final Class classType : classes) { + for (final Field field : classType.getDeclaredFields()) { + if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) { + try { + strings.add(String.valueOf(field.get(null))); + } catch (IllegalArgumentException | IllegalAccessException ex) { + //ignore + } + } + } + } + return strings; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java new file mode 100644 index 0000000..31a084f --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.Closeable; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.stream.io.util.StreamDemarcator; + +/** + * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor + * with sending contents of the {@link FlowFile}s to Kafka. + */ +class KafkaPublisher implements Closeable { + + private final Producer<byte[], byte[]> kafkaProducer; + + private volatile long ackWaitTime = 30000; + + private final ComponentLog componentLog; + + private final int ackCheckSize; + + KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) { + this(kafkaProperties, 100, componentLog); + } + + /** + * Creates an instance of this class as well as the instance of the + * corresponding Kafka {@link KafkaProducer} using provided Kafka + * configuration properties. + * + * @param kafkaProperties instance of {@link Properties} used to bootstrap + * {@link KafkaProducer} + */ + KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) { + this.kafkaProducer = new KafkaProducer<>(kafkaProperties); + this.ackCheckSize = ackCheckSize; + this.componentLog = componentLog; + } + + /** + * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to + * determine how many messages to Kafka will be sent from a provided + * {@link InputStream} (see {@link PublishingContext#getContentStream()}). + * It supports two publishing modes: + * <ul> + * <li>Sending all messages constructed from + * {@link StreamDemarcator#nextToken()} operation.</li> + * <li>Sending only unacknowledged messages constructed from + * {@link StreamDemarcator#nextToken()} operation.</li> + * </ul> + * The unacknowledged messages are determined from the value of + * {@link PublishingContext#getLastAckedMessageIndex()}. + * <br> + * This method assumes content stream affinity where it is expected that the + * content stream that represents the same Kafka message(s) will remain the + * same across possible retries. This is required specifically for cases + * where delimiter is used and a single content stream may represent + * multiple Kafka messages. The + * {@link PublishingContext#getLastAckedMessageIndex()} will provide the + * index of the last ACKed message, so upon retry only messages with the + * higher index are sent. + * + * @param publishingContext instance of {@link PublishingContext} which hold + * context information about the message(s) to be sent. + * @return The index of the last successful offset. + */ + KafkaPublisherResult publish(PublishingContext publishingContext) { + StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(), + publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize()); + + int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex(); + List<Future<RecordMetadata>> resultFutures = new ArrayList<>(); + + byte[] messageBytes; + int tokenCounter = 0; + boolean continueSending = true; + KafkaPublisherResult result = null; + for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) { + if (prevLastAckedMessageIndex < tokenCounter) { + ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), messageBytes); + resultFutures.add(this.kafkaProducer.send(message)); + + if (tokenCounter % this.ackCheckSize == 0) { + int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); + resultFutures.clear(); + if (lastAckedMessageIndex % this.ackCheckSize != 0) { + continueSending = false; + result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); + } + prevLastAckedMessageIndex = lastAckedMessageIndex; + } + } + } + + if (result == null) { + int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); + resultFutures.clear(); + result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); + } + return result; + } + + /** + * Sets the time this publisher will wait for the {@link Future#get()} + * operation (the Future returned by + * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing + * out. + * + * This value will also be used as a timeout when closing the underlying + * {@link KafkaProducer}. See {@link #close()}. + */ + void setAckWaitTime(long ackWaitTime) { + this.ackWaitTime = ackWaitTime; + } + + /** + * This operation will process ACKs from Kafka in the order in which + * {@link KafkaProducer#send(ProducerRecord)} invocation were made returning + * the index of the last ACKed message. Within this operation processing ACK + * simply means successful invocation of 'get()' operation on the + * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)} + * operation. Upon encountering any type of error while interrogating such + * {@link Future} the ACK loop will end. Messages that were not ACKed would + * be considered non-delivered and therefore could be resent at the later + * time. + * + * @param sendFutures list of {@link Future}s representing results of + * publishing to Kafka + * + * @param lastAckMessageIndex the index of the last ACKed message. It is + * important to provide the last ACKed message especially while re-trying so + * the proper index is maintained. + */ + private int processAcks(List<Future<RecordMetadata>> sendFutures, int lastAckMessageIndex) { + boolean exceptionThrown = false; + for (int segmentCounter = 0; segmentCounter < sendFutures.size() && !exceptionThrown; segmentCounter++) { + Future<RecordMetadata> future = sendFutures.get(segmentCounter); + try { + future.get(this.ackWaitTime, TimeUnit.MILLISECONDS); + lastAckMessageIndex++; + } catch (InterruptedException e) { + exceptionThrown = true; + Thread.currentThread().interrupt(); + this.warnOrError("Interrupted while waiting for acks from Kafka", null); + } catch (ExecutionException e) { + exceptionThrown = true; + this.warnOrError("Failed while waiting for acks from Kafka", e); + } catch (TimeoutException e) { + exceptionThrown = true; + this.warnOrError("Timed out while waiting for acks from Kafka", null); + } + } + + return lastAckMessageIndex; + } + + /** + * Will close the underlying {@link KafkaProducer} waiting if necessary for + * the same duration as supplied {@link #setAckWaitTime(long)} + */ + @Override + public void close() { + this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS); + } + + /** + * + */ + private void warnOrError(String message, Exception e) { + if (e == null) { + this.componentLog.warn(message); + } else { + this.componentLog.error(message, e); + } + } + + /** + * Encapsulates the result received from publishing messages to Kafka + */ + static class KafkaPublisherResult { + + private final int messagesSent; + private final int lastMessageAcked; + + KafkaPublisherResult(int messagesSent, int lastMessageAcked) { + this.messagesSent = messagesSent; + this.lastMessageAcked = lastMessageAcked; + } + + public int getMessagesSent() { + return this.messagesSent; + } + + public int getLastMessageAcked() { + return this.lastMessageAcked; + } + + public boolean isAllAcked() { + return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked; + } + + @Override + public String toString() { + return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java new file mode 100644 index 0000000..64ab4ce --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.util.Map; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +/** + * Collection of implementation of common Kafka {@link Partitioner}s. + */ +final public class Partitioners { + + private Partitioners() { + } + + /** + * {@link Partitioner} that implements 'round-robin' mechanism which evenly + * distributes load between all available partitions. + */ + public static class RoundRobinPartitioner implements Partitioner { + + private volatile int index; + + @Override + public void configure(Map<String, ?> configs) { + // noop + } + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + return this.next(cluster.availablePartitionsForTopic(topic).size()); + } + + @Override + public void close() { + // noop + } + + private synchronized int next(int numberOfPartitions) { + if (this.index >= numberOfPartitions) { + this.index = 0; + } + return index++; + } + } +}