[ 
https://issues.apache.org/jira/browse/NIFI-1645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206952#comment-15206952
 ] 

ASF GitHub Bot commented on NIFI-1645:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/295#discussion_r57040700
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
 ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka;
    +
    +import java.util.Random;
    +
    +import kafka.producer.Partitioner;
    +import kafka.utils.VerifiableProperties;
    +
    +/**
    + * Collection of implementation of common Kafka {@link Partitioner}s.
    + */
    +final public class Partitioners {
    +
    +    private Partitioners() {
    +    }
    +    /**
    +     * {@link Partitioner} that implements 'round-robin' mechanism which 
evenly
    +     * distributes load between all available partitions.
    +     */
    +    public static class RoundRobinPartitioner implements Partitioner {
    +        private volatile int index;
    --- End diff --
    
    The use of volatile has two meanings and in this case it's to ensure the 
visibility between the threads (basically T1 sets the value and T2 must see it 
right after it's been set. Without volatile it is actually not guaranteed. 
Think eventual consistency). It was never meant to enforce sequential 
distribution since in multi-threaded environment it's somewhat menaingless


> When using delimited data feature PutKafka ack'd ranges feature can break
> -------------------------------------------------------------------------
>
>                 Key: NIFI-1645
>                 URL: https://issues.apache.org/jira/browse/NIFI-1645
>             Project: Apache NiFi
>          Issue Type: Bug
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.6.0
>
>
> When using the delimited lines feature to send data to Kafka such that a 
> large set of lines that appear to be one 'flowfile' in NiFi is sent as a 
> series of 1..N messages in Kafka the mechanism of asynchronous 
> acknowledgement can break down whereby we will receive acknowledgements but 
> be unable to act on them appropriately because by then the session/data would 
> have already been considered successfully transferred.  This could in 
> certain/specific conditions mean failed acknowledgements would not result in 
> a retransfer.
> The logic this processor supports for creating child objects to address 
> failed/partial segments is extremely complicated and should likely be 
> rewritten to be greatly simplified.  Instead the SplitText feature should be 
> used to create more manageable chunks of data over which if any segment is 
> ack'd as a failure then the whole thing is failed and thus can be 
> retransmitted.  Always best to enable the user to prefer data loss or data 
> duplication on their own terms.
> Below is the relevant stack trace
> {code}
> 17:12:37 EDTERROR6162d00f-737f-3710-85f9-318c886af95f
> clpen0004.foo.com:8090PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] 
> PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] failed to process session 
> due to java.lang.IllegalStateException: 
> java.util.concurrent.ExecutionException: 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1458158883054-93724, 
> container=cont2, section=540], offset=756882, 
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in 
> this session (StandardProcessSession[id=97534]): 
> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1458158883054-93724, 
> container=cont2, section=540], offset=756882, 
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in 
> this session (StandardProcessSession[id=97534])
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to