[ https://issues.apache.org/jira/browse/NIFI-1645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206936#comment-15206936 ]
ASF GitHub Bot commented on NIFI-1645: -------------------------------------- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/295#discussion_r57039809 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Scanner; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.Partitioner; +import kafka.producer.ProducerConfig; + +/** + * Wrapper over {@link KafkaProducer} to assist {@link PutKafka} processor with + * sending content of {@link FlowFile}s to Kafka. + */ +public class KafkaPublisher implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class); + + private final Producer<byte[], byte[]> producer; + + private ProcessorLog processLog; + + /** + * Creates an instance of this class as well as the instance of the + * corresponding Kafka {@link KafkaProducer} using provided Kafka + * configuration properties. + */ + KafkaPublisher(Properties kafkaProperties) { + ProducerConfig producerConfig = new ProducerConfig(kafkaProperties); + this.producer = new Producer<>(producerConfig); + } + + /** + * + */ + void setProcessLog(ProcessorLog processLog) { + this.processLog = processLog; + } + + /** + * Publishes messages to Kafka topic. It supports three publishing + * mechanisms. + * <ul> + * <li>Sending the entire content stream as a single Kafka message.</li> + * <li>Splitting the incoming content stream into chunks and sending + * individual chunks as separate Kafka messages.</li> + * <li>Splitting the incoming content stream into chunks and sending only + * the chunks that have failed previously @see + * {@link SplittableMessageContext#getFailedSegments()}.</li> + * </ul> + * This method assumes content stream affinity where it is expected that the + * content stream that represents the same Kafka message(s) will remain the + * same across possible retries. This is required specifically for cases + * where delimiter is used and a single content stream may represent + * multiple Kafka messages. The failed segment list will keep the index of + * of each content stream segment that had failed to be sent to Kafka, so + * upon retry only the failed segments are sent. + * + * @param messageContext + * instance of {@link SplittableMessageContext} which hold + * context information about the message to be sent + * @param contentStream + * instance of open {@link InputStream} carrying the content of + * the message(s) to be send to Kafka + * @param partitionKey + * the value of the partition key. Only relevant is user wishes + * to provide a custom partition key instead of relying on + * variety of provided {@link Partitioner}(s) + * @return The list containing the failed segment indexes for messages that + * failed to be sent to Kafka. + */ + List<Integer> publish(SplittableMessageContext messageContext, InputStream contentStream, Object partitionKey) { + List<Integer> prevFailedSegmentIndexes = messageContext.getFailedSegments(); + List<Integer> failedSegments = new ArrayList<>(); + int segmentCounter = 0; + try (Scanner scanner = new Scanner(contentStream)) { + scanner.useDelimiter(messageContext.getDelimiterPattern()); + while (scanner.hasNext()) { + //TODO Improve content InputStream so it's skip supported so one can zoom straight to the correct segment --- End diff -- https://issues.apache.org/jira/browse/NIFI-1667 > When using delimited data feature PutKafka ack'd ranges feature can break > ------------------------------------------------------------------------- > > Key: NIFI-1645 > URL: https://issues.apache.org/jira/browse/NIFI-1645 > Project: Apache NiFi > Issue Type: Bug > Reporter: Oleg Zhurakousky > Assignee: Oleg Zhurakousky > Fix For: 0.6.0 > > > When using the delimited lines feature to send data to Kafka such that a > large set of lines that appear to be one 'flowfile' in NiFi is sent as a > series of 1..N messages in Kafka the mechanism of asynchronous > acknowledgement can break down whereby we will receive acknowledgements but > be unable to act on them appropriately because by then the session/data would > have already been considered successfully transferred. This could in > certain/specific conditions mean failed acknowledgements would not result in > a retransfer. > The logic this processor supports for creating child objects to address > failed/partial segments is extremely complicated and should likely be > rewritten to be greatly simplified. Instead the SplitText feature should be > used to create more manageable chunks of data over which if any segment is > ack'd as a failure then the whole thing is failed and thus can be > retransmitted. Always best to enable the user to prefer data loss or data > duplication on their own terms. > Below is the relevant stack trace > {code} > 17:12:37 EDTERROR6162d00f-737f-3710-85f9-318c886af95f > clpen0004.foo.com:8090PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] > PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] failed to process session > due to java.lang.IllegalStateException: > java.util.concurrent.ExecutionException: > org.apache.nifi.processor.exception.FlowFileHandlingException: > StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim > [resourceClaim=StandardResourceClaim[id=1458158883054-93724, > container=cont2, section=540], offset=756882, > length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in > this session (StandardProcessSession[id=97534]): > java.lang.IllegalStateException: java.util.concurrent.ExecutionException: > org.apache.nifi.processor.exception.FlowFileHandlingException: > StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim > [resourceClaim=StandardResourceClaim[id=1458158883054-93724, > container=cont2, section=540], offset=756882, > length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in > this session (StandardProcessSession[id=97534]) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)