http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java new file mode 100644 index 0000000..e29f2af --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import javax.xml.bind.DatatypeConverter; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.10"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 0.10 producer. " + + "The messages to send may be individual FlowFiles or may be delimited, using a " + + "user-specified delimiter, such as a new-line. " + + " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring" + + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the mean time" + + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", + description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ") +public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id"; + + protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx"; + + protected static final String FAILED_TOPIC_ATTR = "failed.topic"; + + protected static final String FAILED_KEY_ATTR = "failed.key"; + + protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter"; + + protected static final String MSG_COUNT = "msg.count"; + + static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", + "FlowFile will be routed to failure unless the message is replicated to the appropriate " + + "number of Kafka Nodes according to the Topic configuration"); + static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", + "FlowFile will be routed to success if the message is received by a single Kafka node, " + + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> " + + "but can result in data loss if a Kafka node crashes"); + static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", + "FlowFile will be routed to success after successfully writing the content to a Kafka node, " + + "without waiting for a response. This provides the best performance but may result in data loss."); + + static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(), + Partitioners.RoundRobinPartitioner.class.getSimpleName(), + "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + + "the next Partition to Partition 2, and so on, wrapping as necessary."); + static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", + "DefaultPartitioner", "Messages will be assigned to random partitions."); + + static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() + .name("topic") + .displayName("Topic Name") + .description("The name of the Kafka Topic to publish to.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() + .name(ProducerConfig.ACKS_CONFIG) + .displayName("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); + + static final PropertyDescriptor META_WAIT_TIME = new PropertyDescriptor.Builder() + .name(ProducerConfig.MAX_BLOCK_MS_CONFIG) + .displayName("Meta Data Wait Time") + .description("The amount of time KafkaConsumer will wait to obtain metadata during the 'send' call before failing the " + + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("30 sec") + .build(); + + static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("kafka-key") + .displayName("Kafka Key") + .description("The Key to use for the Message. It will be serialized as UTF-8 bytes. " + + "If not specified then the flow file attribute kafka.key.hex is used if present " + + "and we're not demarcating. In that case the hex string is coverted to its byte" + + "form and written as a byte[] key.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("message-demarcator") + .displayName("Message Demarcator") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " + + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " + + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on your OS.") + .build(); + + static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder() + .name(ProducerConfig.PARTITIONER_CLASS_CONFIG) + .displayName("Partitioner class") + .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.") + .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING) + .defaultValue(RANDOM_PARTITIONING.getValue()) + .required(false) + .build(); + + static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name(ProducerConfig.COMPRESSION_TYPE_CONFIG) + .displayName("Compression Type") + .description("This parameter allows you to specify the compression codec for all data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy", "lz4") + .defaultValue("none") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles for which all content was sent to Kafka.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); + + static final List<PropertyDescriptor> DESCRIPTORS; + + static final Set<Relationship> RELATIONSHIPS; + + private volatile String brokers; + + private final AtomicInteger taskCounter = new AtomicInteger(); + + private volatile boolean acceptTask = true; + + /* + * Will ensure that list of PropertyDescriptors is build only once, since + * all other lifecycle methods are invoked multiple times. + */ + static { + final List<PropertyDescriptor> _descriptors = new ArrayList<>(); + _descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors()); + _descriptors.add(TOPIC); + _descriptors.add(DELIVERY_GUARANTEE); + _descriptors.add(KEY); + _descriptors.add(MESSAGE_DEMARCATOR); + _descriptors.add(META_WAIT_TIME); + _descriptors.add(PARTITION_CLASS); + _descriptors.add(COMPRESSION_CODEC); + + DESCRIPTORS = Collections.unmodifiableList(_descriptors); + + final Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + RELATIONSHIPS = Collections.unmodifiableSet(_relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)).dynamic(true) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + return KafkaProcessorUtils.validateCommonProperties(validationContext); + } + + volatile KafkaPublisher kafkaPublisher; + + /** + * This thread-safe operation will delegate to + * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after first + * checking and creating (if necessary) Kafka resource which could be either + * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close and + * destroy the underlying Kafka resource upon catching an {@link Exception} + * raised by {@link #rendezvousWithKafka(ProcessContext, ProcessSession)}. + * After Kafka resource is destroyed it will be re-created upon the next + * invocation of this operation essentially providing a self healing + * mechanism to deal with potentially corrupted resource. + * <p> + * Keep in mind that upon catching an exception the state of this processor + * will be set to no longer accept any more tasks, until Kafka resource is + * reset. This means that in a multi-threaded situation currently executing + * tasks will be given a chance to complete while no new tasks will be + * accepted. + * + * @param context context + * @param sessionFactory factory + */ + @Override + public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + if (this.acceptTask) { // acts as a circuit breaker to allow existing tasks to wind down so 'kafkaPublisher' can be reset before new tasks are accepted. + this.taskCounter.incrementAndGet(); + final ProcessSession session = sessionFactory.createSession(); + try { + /* + * We can't be doing double null check here since as a pattern + * it only works for lazy init but not reset, which is what we + * are doing here. In fact the first null check is dangerous + * since 'kafkaPublisher' can become null right after its null + * check passed causing subsequent NPE. + */ + synchronized (this) { + if (this.kafkaPublisher == null) { + this.kafkaPublisher = this.buildKafkaResource(context, session); + } + } + + /* + * The 'processed' boolean flag does not imply any failure or success. It simply states that: + * - ConsumeKafka - some messages were received form Kafka and 1_ FlowFile were generated + * - PublishKafka0_10 - some messages were sent to Kafka based on existence of the input FlowFile + */ + boolean processed = this.rendezvousWithKafka(context, session); + session.commit(); + if (!processed) { + context.yield(); + } + } catch (Throwable e) { + this.acceptTask = false; + session.rollback(true); + this.getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, e}); + } finally { + synchronized (this) { + if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) { + this.close(); + this.acceptTask = true; + } + } + } + } else { + this.logger.debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset."); + this.getLogger().debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset."); + context.yield(); + } + } + + /** + * Will call {@link Closeable#close()} on the target resource after which + * the target resource will be set to null. Should only be called when there + * are no more threads being executed on this processor or when it has been + * verified that only a single thread remains. + * + * @see KafkaPublisher + * @see KafkaConsumer + */ + @OnStopped + public void close() { + try { + if (this.kafkaPublisher != null) { + try { + this.kafkaPublisher.close(); + } catch (Exception e) { + this.getLogger().warn("Failed while closing " + this.kafkaPublisher, e); + } + } + } finally { + this.kafkaPublisher = null; + } + } + + /** + * Will rendezvous with Kafka if {@link ProcessSession} contains + * {@link FlowFile} producing a result {@link FlowFile}. + * <br> + * The result {@link FlowFile} that is successful is then transfered to + * {@link #REL_SUCCESS} + * <br> + * The result {@link FlowFile} that is failed is then transfered to + * {@link #REL_FAILURE} + * + */ + protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile != null) { + long start = System.nanoTime(); + flowFile = this.doRendezvousWithKafka(flowFile, context, session); + Relationship relationship = REL_SUCCESS; + if (!this.isFailedFlowFile(flowFile)) { + String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + String transitUri = KafkaProcessorUtils.buildTransitURI(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(), this.brokers, topic); + session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", executionDuration); + this.getLogger().debug("Successfully sent {} to Kafka as {} message(s) in {} millis", + new Object[]{flowFile, flowFile.getAttribute(MSG_COUNT), executionDuration}); + } else { + relationship = REL_FAILURE; + flowFile = session.penalize(flowFile); + } + session.transfer(flowFile, relationship); + } + return flowFile != null; + } + + /** + * Builds and instance of {@link KafkaPublisher}. + */ + protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) { + final Map<String, String> kafkaProps = new HashMap<>(); + KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps); + kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); + final Properties props = new Properties(); + props.putAll(kafkaProps); + KafkaPublisher publisher = new KafkaPublisher(props, this.getLogger()); + return publisher; + } + + /** + * Will rendezvous with {@link KafkaPublisher} after building + * {@link PublishingContext} and will produce the resulting + * {@link FlowFile}. The resulting FlowFile contains all required + * information to determine if message publishing originated from the + * provided FlowFile has actually succeeded fully, partially or failed + * completely (see {@link #isFailedFlowFile(FlowFile)}. + */ + private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) { + final AtomicReference<KafkaPublisher.KafkaPublisherResult> publishResultRef = new AtomicReference<>(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream contentStream) throws IOException { + PublishingContext publishingContext = PublishKafka_0_10.this.buildPublishingContext(flowFile, context, contentStream); + KafkaPublisher.KafkaPublisherResult result = PublishKafka_0_10.this.kafkaPublisher.publish(publishingContext); + publishResultRef.set(result); + } + }); + + FlowFile resultFile = publishResultRef.get().isAllAcked() + ? this.cleanUpFlowFileIfNecessary(flowFile, session) + : session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(), flowFile, context)); + + if (!this.isFailedFlowFile(resultFile)) { + resultFile = session.putAttribute(resultFile, MSG_COUNT, String.valueOf(publishResultRef.get().getMessagesSent())); + } + return resultFile; + } + + /** + * Builds {@link PublishingContext} for message(s) to be sent to Kafka. + * {@link PublishingContext} contains all contextual information required by + * {@link KafkaPublisher} to publish to Kafka. Such information contains + * things like topic name, content stream, delimiter, key and last ACKed + * message for cases where provided FlowFile is being retried (failed in the + * past). + * <br> + * For the clean FlowFile (file that has been sent for the first time), + * PublishingContext will be built form {@link ProcessContext} associated + * with this invocation. + * <br> + * For the failed FlowFile, {@link PublishingContext} will be built from + * attributes of that FlowFile which by then will already contain required + * information (e.g., topic, key, delimiter etc.). This is required to + * ensure the affinity of the retry in the even where processor + * configuration has changed. However keep in mind that failed FlowFile is + * only considered a failed FlowFile if it is being re-processed by the same + * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see + * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to + * another PublishKafka0_10 processor it is treated as a fresh FlowFile + * regardless if it has #FAILED* attributes set. + */ + private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) { + String topicName; + byte[] keyBytes; + byte[] delimiterBytes = null; + int lastAckedMessageIndex = -1; + if (this.isFailedFlowFile(flowFile)) { + lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX)); + topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR); + keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null + ? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null; + delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null + ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null; + + } else { + topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8); + String keyHex = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY_HEX); + if (_key == null && keyHex != null && KafkaProcessorUtils.HEX_KEY_PATTERN.matcher(keyHex).matches()) { + keyBytes = DatatypeConverter.parseHexBinary(keyHex); + } + delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR) + .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; + } + + PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setKeyBytes(keyBytes); + publishingContext.setDelimiterBytes(delimiterBytes); + return publishingContext; + } + + /** + * Will remove FAILED_* attributes if FlowFile is no longer considered a + * failed FlowFile + * + * @see #isFailedFlowFile(FlowFile) + */ + private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) { + if (this.isFailedFlowFile(flowFile)) { + Set<String> keysToRemove = new HashSet<>(); + keysToRemove.add(FAILED_DELIMITER_ATTR); + keysToRemove.add(FAILED_KEY_ATTR); + keysToRemove.add(FAILED_TOPIC_ATTR); + keysToRemove.add(FAILED_PROC_ID_ATTR); + keysToRemove.add(FAILED_LAST_ACK_IDX); + flowFile = session.removeAllAttributes(flowFile, keysToRemove); + } + return flowFile; + } + + /** + * Builds a {@link Map} of FAILED_* attributes + * + * @see #FAILED_PROC_ID_ATTR + * @see #FAILED_LAST_ACK_IDX + * @see #FAILED_TOPIC_ATTR + * @see #FAILED_KEY_ATTR + * @see #FAILED_DELIMITER_ATTR + */ + private Map<String, String> buildFailedFlowFileAttributes(int lastAckedMessageIndex, FlowFile sourceFlowFile, ProcessContext context) { + Map<String, String> attributes = new HashMap<>(); + attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier()); + attributes.put(FAILED_LAST_ACK_IDX, String.valueOf(lastAckedMessageIndex)); + attributes.put(FAILED_TOPIC_ATTR, context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue()); + attributes.put(FAILED_KEY_ATTR, context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue()); + attributes.put(FAILED_DELIMITER_ATTR, context.getProperty(MESSAGE_DEMARCATOR).isSet() + ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(sourceFlowFile).getValue() : null); + return attributes; + } + + /** + * Returns 'true' if provided FlowFile is a failed FlowFile. A failed + * FlowFile contains {@link #FAILED_PROC_ID_ATTR}. + */ + private boolean isFailedFlowFile(FlowFile flowFile) { + return this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR)); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java new file mode 100644 index 0000000..bda29e6 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +/** + * Holder of context information used by {@link KafkaPublisher} required to + * publish messages to Kafka. + */ +class PublishingContext { + + private final InputStream contentStream; + + private final String topic; + + private final int lastAckedMessageIndex; + + /* + * We're using the default value from Kafka. We are using it to control the + * message size before it goes to to Kafka thus limiting possibility of a + * late failures in Kafka client. + */ + private int maxRequestSize = 1048576; // kafka default + + private boolean maxRequestSizeSet; + + private byte[] keyBytes; + + private byte[] delimiterBytes; + + PublishingContext(InputStream contentStream, String topic) { + this(contentStream, topic, -1); + } + + PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) { + this.validateInput(contentStream, topic, lastAckedMessageIndex); + this.contentStream = contentStream; + this.topic = topic; + this.lastAckedMessageIndex = lastAckedMessageIndex; + } + + @Override + public String toString() { + return "topic: '" + this.topic + "'; delimiter: '" + new String(this.delimiterBytes, StandardCharsets.UTF_8) + "'"; + } + + int getLastAckedMessageIndex() { + return this.lastAckedMessageIndex; + } + + int getMaxRequestSize() { + return this.maxRequestSize; + } + + byte[] getKeyBytes() { + return this.keyBytes; + } + + byte[] getDelimiterBytes() { + return this.delimiterBytes; + } + + InputStream getContentStream() { + return this.contentStream; + } + + String getTopic() { + return this.topic; + } + + void setKeyBytes(byte[] keyBytes) { + if (this.keyBytes == null) { + if (keyBytes != null) { + this.assertBytesValid(keyBytes); + this.keyBytes = keyBytes; + } + } else { + throw new IllegalArgumentException("'keyBytes' can only be set once per instance"); + } + } + + void setDelimiterBytes(byte[] delimiterBytes) { + if (this.delimiterBytes == null) { + if (delimiterBytes != null) { + this.assertBytesValid(delimiterBytes); + this.delimiterBytes = delimiterBytes; + } + } else { + throw new IllegalArgumentException("'delimiterBytes' can only be set once per instance"); + } + } + + void setMaxRequestSize(int maxRequestSize) { + if (!this.maxRequestSizeSet) { + if (maxRequestSize > 0) { + this.maxRequestSize = maxRequestSize; + this.maxRequestSizeSet = true; + } else { + throw new IllegalArgumentException("'maxRequestSize' must be > 0"); + } + } else { + throw new IllegalArgumentException("'maxRequestSize' can only be set once per instance"); + } + } + + private void assertBytesValid(byte[] bytes) { + if (bytes != null) { + if (bytes.length == 0) { + throw new IllegalArgumentException("'bytes' must not be empty"); + } + } + } + + private void validateInput(InputStream contentStream, String topic, int lastAckedMessageIndex) { + if (contentStream == null) { + throw new IllegalArgumentException("'contentStream' must not be null"); + } else if (topic == null || topic.trim().length() == 0) { + throw new IllegalArgumentException("'topic' must not be null or empty"); + } else if (lastAckedMessageIndex < -1) { + throw new IllegalArgumentException("'lastAckedMessageIndex' must be >= -1"); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..aa1d4e2 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_10 +org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html new file mode 100644 index 0000000..2ce6b51 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html @@ -0,0 +1,33 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>ConsumeKafka</title> + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <!-- Processor Documentation ================================================== --> + <h2>Description:</h2> + <p> + This Processors polls <a href="http://kafka.apache.org/">Apache Kafka</a> + for data using KafkaConsumer API available with Kafka 0.10+. When a message is received + from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value + of the Kafka message. + </p> + </body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html new file mode 100644 index 0000000..dfd92b3 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html @@ -0,0 +1,47 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>PublishKafka</title> + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <!-- Processor Documentation ================================================== --> + <h2>Description:</h2> + <p> + This Processors puts the contents of a FlowFile to a Topic in + <a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available + with Kafka 0.10+ API. The content of a FlowFile becomes the contents of a Kafka message. + This message is optionally assigned a key by using the <Kafka Key> Property. + </p> + + <p> + The Processor allows the user to configure an optional Message Demarcator that + can be used to send many messages per FlowFile. For example, a <i>\n</i> could be used + to indicate that the contents of the FlowFile should be used to send one message + per line of text. It also supports multi-char demarcators (e.g., 'my custom demarcator'). + If the property is not set, the entire contents of the FlowFile + will be sent as a single message. When using the demarcator, if some messages are + successfully sent but other messages fail to send, the resulting FlowFile will be + considered a failed FlowFuile and will have additional attributes to that effect. + One of such attributes is 'failed.last.idx' which indicates the index of the last message + that was successfully ACKed by Kafka. (if no demarcator is used the value of this index will be -1). + This will allow PublishKafka to only re-send un-ACKed messages on the next re-try. + </p> + </body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java new file mode 100644 index 0000000..c172b03 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; + +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ConsumeKafkaTest { + + static class MockConsumerPool extends ConsumerPool { + + final int actualMaxLeases; + final List<String> actualTopics; + final Map<String, String> actualKafkaProperties; + boolean throwKafkaExceptionOnPoll = false; + boolean throwKafkaExceptionOnCommit = false; + Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new ArrayDeque<>(); + Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = null; + Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null; + boolean wasConsumerLeasePoisoned = false; + boolean wasConsumerLeaseClosed = false; + boolean wasPoolClosed = false; + + public MockConsumerPool(int maxLeases, List<String> topics, Map<String, String> kafkaProperties, ComponentLog logger) { + super(maxLeases, topics, kafkaProperties, null); + actualMaxLeases = maxLeases; + actualTopics = topics; + actualKafkaProperties = kafkaProperties; + } + + @Override + public ConsumerLease obtainConsumer() { + return new ConsumerLease() { + @Override + public ConsumerRecords<byte[], byte[]> poll() { + if (throwKafkaExceptionOnPoll) { + throw new KafkaException("i planned to fail"); + } + final ConsumerRecords<byte[], byte[]> records = nextPlannedRecordsQueue.poll(); + return (records == null) ? ConsumerRecords.empty() : records; + } + + @Override + public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) { + if (throwKafkaExceptionOnCommit) { + throw new KafkaException("i planned to fail"); + } + actualCommitOffsets = offsets; + } + + @Override + public void poison() { + wasConsumerLeasePoisoned = true; + } + + @Override + public void close() { + wasConsumerLeaseClosed = true; + } + }; + } + + @Override + public void close() { + wasPoolClosed = true; + } + + void resetState() { + throwKafkaExceptionOnPoll = false; + throwKafkaExceptionOnCommit = false; + nextPlannedRecordsQueue = null; + nextExpectedCommitOffsets = null; + wasConsumerLeasePoisoned = false; + wasConsumerLeaseClosed = false; + wasPoolClosed = false; + } + + } + + @Test + public void validateCustomValidatorSettings() throws Exception { + ConsumeKafka_0_10 consumeKafka = new ConsumeKafka_0_10(); + TestRunner runner = TestRunners.newTestRunner(consumeKafka); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + runner.assertValid(); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo"); + runner.assertNotValid(); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + runner.assertValid(); + runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + runner.assertValid(); + runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + runner.assertNotValid(); + } + + @Test + public void validatePropertiesValidation() throws Exception { + ConsumeKafka_0_10 consumeKafka = new ConsumeKafka_0_10(); + TestRunner runner = TestRunners.newTestRunner(consumeKafka); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + + runner.removeProperty(ConsumeKafka_0_10.GROUP_ID); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("invalid because group.id is required")); + } + + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + } + + @Test + public void validateGetAllMessages() throws Exception { + String groupName = "validateGetAllMessages"; + + final byte[][] firstPassValues = new byte[][]{ + "Hello-1".getBytes(StandardCharsets.UTF_8), + "Hello-2".getBytes(StandardCharsets.UTF_8), + "Hello-3".getBytes(StandardCharsets.UTF_8) + }; + final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); + + final byte[][] secondPassValues = new byte[][]{ + "Hello-4".getBytes(StandardCharsets.UTF_8), + "Hello-5".getBytes(StandardCharsets.UTF_8), + "Hello-6".getBytes(StandardCharsets.UTF_8) + }; + final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues); + + final List<String> expectedTopics = new ArrayList<>(); + expectedTopics.add("foo"); + expectedTopics.add("bar"); + final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null); + mockPool.nextPlannedRecordsQueue.add(firstRecs); + mockPool.nextPlannedRecordsQueue.add(secondRecs); + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { + return mockPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + + runner.run(1, false); + + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); + + assertEquals(expectedTopics, mockPool.actualTopics); + + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count()); + + if (mockPool.nextPlannedRecordsQueue.isEmpty()) { + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count()); + assertEquals(2, mockPool.actualCommitOffsets.size()); + assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset()); + assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset()); + } else { + assertEquals(2, mockPool.actualCommitOffsets.size()); + assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset()); + } + + //asert that all consumers were closed as expected + //assert that the consumer pool was properly closed + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertFalse(mockPool.wasPoolClosed); + runner.run(1, true); + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertTrue(mockPool.wasPoolClosed); + + } + + @Test + public void validateGetLotsOfMessages() throws Exception { + String groupName = "validateGetLotsOfMessages"; + + final byte[][] firstPassValues = new byte[10010][1]; + for (final byte[] value : firstPassValues) { + value[0] = 0x12; + } + final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); + + final byte[][] secondPassValues = new byte[][]{ + "Hello-4".getBytes(StandardCharsets.UTF_8), + "Hello-5".getBytes(StandardCharsets.UTF_8), + "Hello-6".getBytes(StandardCharsets.UTF_8) + }; + final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues); + + final List<String> expectedTopics = new ArrayList<>(); + expectedTopics.add("foo"); + expectedTopics.add("bar"); + final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null); + mockPool.nextPlannedRecordsQueue.add(firstRecs); + mockPool.nextPlannedRecordsQueue.add(secondRecs); + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { + return mockPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + + runner.run(1, false); + + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); + + assertEquals(10010, flowFiles.stream().map(ff -> ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 0x12).count()); + assertEquals(1, mockPool.nextPlannedRecordsQueue.size()); + + assertEquals(1, mockPool.actualCommitOffsets.size()); + assertEquals(10011L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset()); + + //asert that all consumers were closed as expected + //assert that the consumer pool was properly closed + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertFalse(mockPool.wasPoolClosed); + runner.run(1, true); + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertTrue(mockPool.wasPoolClosed); + + } + + private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) { + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>(); + final TopicPartition tPart = new TopicPartition(topic, partition); + final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); + long offset = startingOffset; + for (final byte[] rawRecord : rawRecords) { + final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord); + records.add(rec); + } + map.put(tPart, records); + return new ConsumerRecords(map); + } + + private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) { + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>(); + for (final ConsumerRecords<byte[], byte[]> rec : records) { + rec.partitions().stream().forEach((part) -> { + final List<ConsumerRecord<byte[], byte[]>> conRecs = rec.records(part); + if (map.get(part) != null) { + throw new IllegalStateException("already have that topic/partition in the record map"); + } + map.put(part, conRecs); + }); + } + return new ConsumerRecords<>(map); + } + + @Test + public void validateGetAllMessagesWithProvidedDemarcator() throws Exception { + String groupName = "validateGetAllMessagesWithProvidedDemarcator"; + + final byte[][] firstPassValues = new byte[][]{ + "Hello-1".getBytes(StandardCharsets.UTF_8), + "Hello-2".getBytes(StandardCharsets.UTF_8), + "Hello-3".getBytes(StandardCharsets.UTF_8) + }; + + final byte[][] secondPassValues = new byte[][]{ + "Hello-4".getBytes(StandardCharsets.UTF_8), + "Hello-5".getBytes(StandardCharsets.UTF_8), + "Hello-6".getBytes(StandardCharsets.UTF_8) + }; + final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords( + createConsumerRecords("foo", 1, 1L, firstPassValues), + createConsumerRecords("bar", 1, 1L, secondPassValues) + ); + + final List<String> expectedTopics = new ArrayList<>(); + expectedTopics.add("foo"); + expectedTopics.add("bar"); + final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null); + mockPool.nextPlannedRecordsQueue.add(consumerRecs); + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { + return mockPool; + } + }; + + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah"); + + runner.run(1, false); + + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); + + assertEquals(2, flowFiles.size()); + + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4blahHello-5blahHello-6")).count()); + + //asert that all consumers were closed as expected + //assert that the consumer pool was properly closed + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertFalse(mockPool.wasPoolClosed); + runner.run(1, true); + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertTrue(mockPool.wasPoolClosed); + + assertEquals(2, mockPool.actualCommitOffsets.size()); + assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset()); + assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset()); + } + + @Test + public void validatePollException() throws Exception { + String groupName = "validatePollException"; + + final byte[][] firstPassValues = new byte[][]{ + "Hello-1".getBytes(StandardCharsets.UTF_8), + "Hello-2".getBytes(StandardCharsets.UTF_8), + "Hello-3".getBytes(StandardCharsets.UTF_8) + }; + + final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords( + createConsumerRecords("foo", 1, 1L, firstPassValues) + ); + + final List<String> expectedTopics = new ArrayList<>(); + expectedTopics.add("foo"); + final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null); + mockPool.nextPlannedRecordsQueue.add(consumerRecs); + mockPool.throwKafkaExceptionOnPoll = true; + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { + return mockPool; + } + }; + + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah"); + + runner.run(1, true); + + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); + + assertEquals(0, flowFiles.size()); + assertNull(null, mockPool.actualCommitOffsets); + + //asert that all consumers were closed as expected + //assert that the consumer pool was properly closed + assertTrue(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertTrue(mockPool.wasPoolClosed); + } + + @Test + public void validateCommitOffsetException() throws Exception { + String groupName = "validateCommitOffsetException"; + + final byte[][] firstPassValues = new byte[][]{ + "Hello-1".getBytes(StandardCharsets.UTF_8), + "Hello-2".getBytes(StandardCharsets.UTF_8), + "Hello-3".getBytes(StandardCharsets.UTF_8) + }; + + final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords( + createConsumerRecords("foo", 1, 1L, firstPassValues) + ); + + final List<String> expectedTopics = new ArrayList<>(); + expectedTopics.add("foo"); + final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null); + mockPool.nextPlannedRecordsQueue.add(consumerRecs); + mockPool.throwKafkaExceptionOnCommit = true; + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { + return mockPool; + } + }; + + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah"); + + runner.run(1, true); + + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); + + assertEquals(1, flowFiles.size()); + + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count()); + + //asert that all consumers were closed as expected + //assert that the consumer pool was properly closed + assertTrue(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertTrue(mockPool.wasPoolClosed); + + assertNull(null, mockPool.actualCommitOffsets); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java new file mode 100644 index 0000000..7f88ea2 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.util.Collections; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.KafkaException; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConsumerPoolTest { + + Consumer<byte[], byte[]> consumer = null; + ComponentLog logger = null; + + @Before + public void setup() { + consumer = mock(Consumer.class); + logger = mock(ComponentLog.class); + } + + @Test + public void validatePoolSimpleCreateClose() throws Exception { + + final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) { + @Override + protected Consumer<byte[], byte[]> createKafkaConsumer() { + return consumer; + } + }; + + when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty()); + + try (final ConsumerLease lease = testPool.obtainConsumer()) { + lease.poll(); + lease.commitOffsets(Collections.emptyMap()); + } + testPool.close(); + final PoolStats stats = testPool.getPoolStats(); + assertEquals(1, stats.consumerCreatedCount); + assertEquals(1, stats.consumerClosedCount); + assertEquals(1, stats.leasesObtainedCount); + assertEquals(1, stats.unproductivePollCount); + assertEquals(0, stats.productivePollCount); + } + + @Test + public void validatePoolSimpleBatchCreateClose() throws Exception { + + final ConsumerPool testPool = new ConsumerPool(5, Collections.singletonList("nifi"), Collections.emptyMap(), logger) { + @Override + protected Consumer<byte[], byte[]> createKafkaConsumer() { + return consumer; + } + }; + + when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty()); + + for (int i = 0; i < 100; i++) { + try (final ConsumerLease lease = testPool.obtainConsumer()) { + for (int j = 0; j < 100; j++) { + lease.poll(); + } + lease.commitOffsets(Collections.emptyMap()); + } + } + testPool.close(); + final PoolStats stats = testPool.getPoolStats(); + assertEquals(1, stats.consumerCreatedCount); + assertEquals(1, stats.consumerClosedCount); + assertEquals(100, stats.leasesObtainedCount); + assertEquals(10000, stats.unproductivePollCount); + assertEquals(0, stats.productivePollCount); + } + + @Test + public void validatePoolConsumerFails() throws Exception { + + final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) { + @Override + protected Consumer<byte[], byte[]> createKafkaConsumer() { + return consumer; + } + }; + + when(consumer.poll(anyInt())).thenThrow(new KafkaException()); + + try (final ConsumerLease lease = testPool.obtainConsumer()) { + lease.poll(); + fail(); + } catch (final KafkaException ke) { + + } + testPool.close(); + final PoolStats stats = testPool.getPoolStats(); + assertEquals(1, stats.consumerCreatedCount); + assertEquals(1, stats.consumerClosedCount); + assertEquals(1, stats.leasesObtainedCount); + assertEquals(0, stats.unproductivePollCount); + assertEquals(0, stats.productivePollCount); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java new file mode 100644 index 0000000..19c64af --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult; +import org.apache.nifi.processors.kafka.test.EmbeddedKafka; +import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.ConsumerTimeoutException; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import org.apache.kafka.clients.producer.ProducerConfig; + +public class KafkaPublisherTest { + + private static EmbeddedKafka kafkaLocal; + + private static EmbeddedKafkaProducerHelper producerHelper; + + @BeforeClass + public static void beforeClass() { + kafkaLocal = new EmbeddedKafka(); + kafkaLocal.start(); + producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal); + } + + @AfterClass + public static void afterClass() throws Exception { + producerHelper.close(); + kafkaLocal.stop(); + } + + @Test + public void validateSuccessfulSendAsWhole() throws Exception { + InputStream contentStream = new ByteArrayInputStream("Hello Kafka".getBytes(StandardCharsets.UTF_8)); + String topicName = "validateSuccessfulSendAsWhole"; + + Properties kafkaProperties = this.buildProducerProperties(); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); + + PublishingContext publishingContext = new PublishingContext(contentStream, topicName); + KafkaPublisherResult result = publisher.publish(publishingContext); + + assertEquals(0, result.getLastMessageAcked()); + assertEquals(1, result.getMessagesSent()); + contentStream.close(); + publisher.close(); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + assertNotNull(iter.next()); + try { + iter.next(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + } + + @Test + public void validateSuccessfulSendAsDelimited() throws Exception { + InputStream contentStream = new ByteArrayInputStream( + "Hello Kafka\nHello Kafka\nHello Kafka\nHello Kafka\n".getBytes(StandardCharsets.UTF_8)); + String topicName = "validateSuccessfulSendAsDelimited"; + + Properties kafkaProperties = this.buildProducerProperties(); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); + + PublishingContext publishingContext = new PublishingContext(contentStream, topicName); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + KafkaPublisherResult result = publisher.publish(publishingContext); + + assertEquals(3, result.getLastMessageAcked()); + assertEquals(4, result.getMessagesSent()); + contentStream.close(); + publisher.close(); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + } + + /* + * This test simulates the condition where not all messages were ACKed by + * Kafka + */ + @Test + public void validateRetries() throws Exception { + byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8); + InputStream contentStream = new ByteArrayInputStream(testValue); + String topicName = "validateSuccessfulReSendOfFailedSegments"; + + Properties kafkaProperties = this.buildProducerProperties(); + + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); + + // simulates the first re-try + int lastAckedMessageIndex = 1; + PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + + publisher.publish(publishingContext); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + String m1 = new String(iter.next().message()); + String m2 = new String(iter.next().message()); + assertEquals("Hello Kafka3", m1); + assertEquals("Hello Kafka4", m2); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + + // simulates the second re-try + lastAckedMessageIndex = 2; + contentStream = new ByteArrayInputStream(testValue); + publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + publisher.publish(publishingContext); + + m1 = new String(iter.next().message()); + assertEquals("Hello Kafka4", m1); + + publisher.close(); + } + + /* + * Similar to the above test, but it sets the first retry index to the last + * possible message index and second index to an out of bound index. The + * expectation is that no messages will be sent to Kafka + */ + @Test + public void validateRetriesWithWrongIndex() throws Exception { + byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8); + InputStream contentStream = new ByteArrayInputStream(testValue); + String topicName = "validateRetriesWithWrongIndex"; + + Properties kafkaProperties = this.buildProducerProperties(); + + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); + + // simulates the first re-try + int lastAckedMessageIndex = 3; + PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + + publisher.publish(publishingContext); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + + // simulates the second re-try + lastAckedMessageIndex = 6; + contentStream = new ByteArrayInputStream(testValue); + publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + publisher.publish(publishingContext); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + + publisher.close(); + } + + @Test + public void validateWithMultiByteCharactersNoDelimiter() throws Exception { + String data = "å THIS IS MY NEW TEXT.å IT HAS A NEWLINE."; + InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + String topicName = "validateWithMultiByteCharacters"; + + Properties kafkaProperties = this.buildProducerProperties(); + + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); + PublishingContext publishingContext = new PublishingContext(contentStream, topicName); + + publisher.publish(publishingContext); + publisher.close(); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + String r = new String(iter.next().message(), StandardCharsets.UTF_8); + assertEquals(data, r); + } + + @Test + public void validateWithNonDefaultPartitioner() throws Exception { + String data = "fooandbarandbaz"; + InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + String topicName = "validateWithNonDefaultPartitioner"; + + Properties kafkaProperties = this.buildProducerProperties(); + kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName()); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); + PublishingContext publishingContext = new PublishingContext(contentStream, topicName); + publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8)); + + try { + publisher.publish(publishingContext); + // partitioner should be invoked 3 times + assertTrue(TestPartitioner.counter == 3); + publisher.close(); + } finally { + TestPartitioner.counter = 0; + } + } + + private Properties buildProducerProperties() { + Properties kafkaProperties = new Properties(); + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaLocal.getKafkaPort()); + kafkaProperties.put("auto.create.topics.enable", "true"); + return kafkaProperties; + } + + private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) { + Properties props = new Properties(); + props.put("zookeeper.connect", "localhost:" + kafkaLocal.getZookeeperPort()); + props.put("group.id", "test"); + props.put("consumer.timeout.ms", "500"); + props.put("auto.offset.reset", "smallest"); + ConsumerConfig consumerConfig = new ConsumerConfig(props); + ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); + Map<String, Integer> topicCountMap = new HashMap<>(1); + topicCountMap.put(topic, 1); + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); + List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); + ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator(); + return iter; + } + + public static class TestPartitioner implements Partitioner { + + static int counter; + + @Override + public void configure(Map<String, ?> configs) { + // nothing to do, test + } + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, + Cluster cluster) { + counter++; + return 0; + } + + @Override + public void close() { + counter = 0; + } + } +}