[ 
https://issues.apache.org/jira/browse/NIFI-1645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206936#comment-15206936
 ] 

ASF GitHub Bot commented on NIFI-1645:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/295#discussion_r57039809
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
 ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Scanner;
    +
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import kafka.javaapi.producer.Producer;
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.Partitioner;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * Wrapper over {@link KafkaProducer} to assist {@link PutKafka} processor 
with
    + * sending content of {@link FlowFile}s to Kafka.
    + */
    +public class KafkaPublisher implements AutoCloseable {
    +
    +    private static final Logger logger = 
LoggerFactory.getLogger(KafkaPublisher.class);
    +
    +    private final Producer<byte[], byte[]> producer;
    +
    +    private ProcessorLog processLog;
    +
    +    /**
    +     * Creates an instance of this class as well as the instance of the
    +     * corresponding Kafka {@link KafkaProducer} using provided Kafka
    +     * configuration properties.
    +     */
    +    KafkaPublisher(Properties kafkaProperties) {
    +        ProducerConfig producerConfig = new 
ProducerConfig(kafkaProperties);
    +        this.producer = new Producer<>(producerConfig);
    +    }
    +
    +    /**
    +     *
    +     */
    +    void setProcessLog(ProcessorLog processLog) {
    +        this.processLog = processLog;
    +    }
    +
    +    /**
    +     * Publishes messages to Kafka topic. It supports three publishing
    +     * mechanisms.
    +     * <ul>
    +     * <li>Sending the entire content stream as a single Kafka 
message.</li>
    +     * <li>Splitting the incoming content stream into chunks and sending
    +     * individual chunks as separate Kafka messages.</li>
    +     * <li>Splitting the incoming content stream into chunks and sending 
only
    +     * the chunks that have failed previously @see
    +     * {@link SplittableMessageContext#getFailedSegments()}.</li>
    +     * </ul>
    +     * This method assumes content stream affinity where it is expected 
that the
    +     * content stream that represents the same Kafka message(s) will 
remain the
    +     * same across possible retries. This is required specifically for 
cases
    +     * where delimiter is used and a single content stream may represent
    +     * multiple Kafka messages. The failed segment list will keep the 
index of
    +     * of each content stream segment that had failed to be sent to Kafka, 
so
    +     * upon retry only the failed segments are sent.
    +     *
    +     * @param messageContext
    +     *            instance of {@link SplittableMessageContext} which hold
    +     *            context information about the message to be sent
    +     * @param contentStream
    +     *            instance of open {@link InputStream} carrying the 
content of
    +     *            the message(s) to be send to Kafka
    +     * @param partitionKey
    +     *            the value of the partition key. Only relevant is user 
wishes
    +     *            to provide a custom partition key instead of relying on
    +     *            variety of provided {@link Partitioner}(s)
    +     * @return The list containing the failed segment indexes for messages 
that
    +     *         failed to be sent to Kafka.
    +     */
    +    List<Integer> publish(SplittableMessageContext messageContext, 
InputStream contentStream, Object partitionKey) {
    +        List<Integer> prevFailedSegmentIndexes = 
messageContext.getFailedSegments();
    +        List<Integer> failedSegments = new ArrayList<>();
    +        int segmentCounter = 0;
    +        try (Scanner scanner = new Scanner(contentStream)) {
    +            scanner.useDelimiter(messageContext.getDelimiterPattern());
    +            while (scanner.hasNext()) {
    +                //TODO Improve content InputStream so it's skip supported 
so one can zoom straight to the correct segment
    --- End diff --
    
    https://issues.apache.org/jira/browse/NIFI-1667


> When using delimited data feature PutKafka ack'd ranges feature can break
> -------------------------------------------------------------------------
>
>                 Key: NIFI-1645
>                 URL: https://issues.apache.org/jira/browse/NIFI-1645
>             Project: Apache NiFi
>          Issue Type: Bug
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.6.0
>
>
> When using the delimited lines feature to send data to Kafka such that a 
> large set of lines that appear to be one 'flowfile' in NiFi is sent as a 
> series of 1..N messages in Kafka the mechanism of asynchronous 
> acknowledgement can break down whereby we will receive acknowledgements but 
> be unable to act on them appropriately because by then the session/data would 
> have already been considered successfully transferred.  This could in 
> certain/specific conditions mean failed acknowledgements would not result in 
> a retransfer.
> The logic this processor supports for creating child objects to address 
> failed/partial segments is extremely complicated and should likely be 
> rewritten to be greatly simplified.  Instead the SplitText feature should be 
> used to create more manageable chunks of data over which if any segment is 
> ack'd as a failure then the whole thing is failed and thus can be 
> retransmitted.  Always best to enable the user to prefer data loss or data 
> duplication on their own terms.
> Below is the relevant stack trace
> {code}
> 17:12:37 EDTERROR6162d00f-737f-3710-85f9-318c886af95f
> clpen0004.foo.com:8090PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] 
> PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] failed to process session 
> due to java.lang.IllegalStateException: 
> java.util.concurrent.ExecutionException: 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1458158883054-93724, 
> container=cont2, section=540], offset=756882, 
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in 
> this session (StandardProcessSession[id=97534]): 
> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1458158883054-93724, 
> container=cont2, section=540], offset=756882, 
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in 
> this session (StandardProcessSession[id=97534])
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to