[ 
https://issues.apache.org/jira/browse/NIFI-1645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206939#comment-15206939
 ] 

ASF GitHub Bot commented on NIFI-1645:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/295#discussion_r57040041
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 ---
    @@ -446,444 +336,136 @@ protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String
                     .build();
         }
     
    -
         @Override
    -    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
    -        FlowFileMessageBatch batch;
    -        while ((batch = completeBatches.poll()) != null) {
    -            batch.completeSession();
    -        }
    +    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
    +        final List<ValidationResult> results = new ArrayList<>();
     
    -        final ProcessSession session = sessionFactory.createSession();
    -        final FlowFile flowFile = session.get();
    -        if (flowFile != null){
    -            Future<Void> consumptionFuture = this.executor.submit(new 
Callable<Void>() {
    -                @Override
    -                public Void call() throws Exception {
    -                    doOnTrigger(context, session, flowFile);
    -                    return null;
    -                }
    -            });
    -            try {
    -                consumptionFuture.get(this.deadlockTimeout, 
TimeUnit.MILLISECONDS);
    -            } catch (InterruptedException e) {
    -                consumptionFuture.cancel(true);
    -                Thread.currentThread().interrupt();
    -                getLogger().warn("Interrupted while sending messages", e);
    -            } catch (ExecutionException e) {
    -                throw new IllegalStateException(e);
    -            } catch (TimeoutException e) {
    -                consumptionFuture.cancel(true);
    -                getLogger().warn("Timed out after " + this.deadlockTimeout 
+ " milliseconds while sending messages", e);
    -            }
    -        } else {
    -            context.yield();
    +        final String partitionStrategy = 
validationContext.getProperty(PARTITION_STRATEGY).getValue();
    +        if 
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())
    +                && !validationContext.getProperty(PARTITION).isSet()) {
    +            results.add(new 
ValidationResult.Builder().subject("Partition").valid(false)
    +                    .explanation("The <Partition> property must be set 
when configured to use the User-Defined Partitioning Strategy")
    +                    .build());
             }
    +        return results;
         }
     
    -    private void doOnTrigger(final ProcessContext context, ProcessSession 
session, final FlowFile flowFile) throws ProcessException {
    -        final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    -        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
    -        final byte[] keyBytes = key == null ? null : 
key.getBytes(StandardCharsets.UTF_8);
    -        String delimiter = 
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
    -        if (delimiter != null) {
    -            delimiter = delimiter.replace("\\n", "\n").replace("\\r", 
"\r").replace("\\t", "\t");
    -        }
    -
    -        final Producer<byte[], byte[]> producer = getProducer();
    -
    -        if (delimiter == null) {
    -            // Send the entire FlowFile as a single message.
    -            final byte[] value = new byte[(int) flowFile.getSize()];
    -            session.read(flowFile, new InputStreamCallback() {
    -                @Override
    -                public void process(final InputStream in) throws 
IOException {
    -                    StreamUtils.fillBuffer(in, value);
    -                }
    -            });
    -
    -            final Integer partition;
    -            try {
    -                partition = getPartition(context, flowFile, topic);
    -            } catch (final Exception e) {
    -                getLogger().error("Failed to obtain a partition for {} due 
to {}", new Object[] {flowFile, e});
    -                session.transfer(session.penalize(flowFile), REL_FAILURE);
    -                session.commit();
    -                return;
    -            }
    -
    -            final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(topic, partition, keyBytes, value);
    -
    -            final FlowFileMessageBatch messageBatch = new 
FlowFileMessageBatch(session, flowFile, topic);
    -            messageBatch.setNumMessages(1);
    -            activeBatches.add(messageBatch);
    -
    -            try {
    -                producer.send(producerRecord, new Callback() {
    -                    @Override
    -                    public void onCompletion(final RecordMetadata 
metadata, final Exception exception) {
    -                        if (exception == null) {
    -                            // record was successfully sent.
    -                            messageBatch.addSuccessfulRange(0L, 
flowFile.getSize(), metadata.offset());
    -                        } else {
    -                            messageBatch.addFailedRange(0L, 
flowFile.getSize(), exception);
    -                        }
    -                    }
    -                });
    -            } catch (final BufferExhaustedException bee) {
    -                messageBatch.addFailedRange(0L, flowFile.getSize(), bee);
    -                context.yield();
    -                return;
    -            }
    -        } else {
    -            final byte[] delimiterBytes = 
delimiter.getBytes(StandardCharsets.UTF_8);
    -
    -            // The NonThreadSafeCircularBuffer allows us to add a byte 
from the stream one at a time and see
    -            // if it matches some pattern. We can use this to search for 
the delimiter as we read through
    -            // the stream of bytes in the FlowFile
    -            final NonThreadSafeCircularBuffer buffer = new 
NonThreadSafeCircularBuffer(delimiterBytes);
    -
    -            final LongHolder messagesSent = new LongHolder(0L);
    -            final FlowFileMessageBatch messageBatch = new 
FlowFileMessageBatch(session, flowFile, topic);
    -            activeBatches.add(messageBatch);
    -
    -            try (final ByteArrayOutputStream baos = new 
ByteArrayOutputStream()) {
    -                session.read(flowFile, new InputStreamCallback() {
    -                    @Override
    -                    public void process(final InputStream rawIn) throws 
IOException {
    -                        byte[] data = null; // contents of a single message
    -
    -                        boolean streamFinished = false;
    -
    -                        int nextByte;
    -                        try (final InputStream bufferedIn = new 
BufferedInputStream(rawIn);
    -                            final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
    -
    -                            long messageStartOffset = 
in.getBytesConsumed();
    -
    -                            // read until we're out of data.
    -                            while (!streamFinished) {
    -                                nextByte = in.read();
    -
    -                                if (nextByte > -1) {
    -                                    baos.write(nextByte);
    -                                }
    -
    -                                if (nextByte == -1) {
    -                                    // we ran out of data. This message is 
complete.
    -                                    data = baos.toByteArray();
    -                                    streamFinished = true;
    -                                } else if (buffer.addAndCompare((byte) 
nextByte)) {
    -                                    // we matched our delimiter. This 
message is complete. We want all of the bytes from the
    -                                    // underlying BAOS exception for the 
last 'delimiterBytes.length' bytes because we don't want
    -                                    // the delimiter itself to be sent.
    -                                    data = 
Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - 
delimiterBytes.length);
    -                                }
    -
    -                                if (data != null) {
    -                                    final long messageEndOffset = 
in.getBytesConsumed();
    -
    -                                    // If the message has no data, ignore 
it.
    -                                    if (data.length != 0) {
    -                                        final Integer partition;
    -                                        try {
    -                                            partition = 
getPartition(context, flowFile, topic);
    -                                        } catch (final Exception e) {
    -                                            
messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e);
    -                                            getLogger().error("Failed to 
obtain a partition for {} due to {}", new Object[] {flowFile, e});
    -                                            continue;
    -                                        }
    -
    -
    -                                        final ProducerRecord<byte[], 
byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data);
    -                                        final long rangeStart = 
messageStartOffset;
    -
    -                                        try {
    -                                            producer.send(producerRecord, 
new Callback() {
    -                                                @Override
    -                                                public void 
onCompletion(final RecordMetadata metadata, final Exception exception) {
    -                                                    if (exception == null) 
{
    -                                                        // record was 
successfully sent.
    -                                                        
messageBatch.addSuccessfulRange(rangeStart, messageEndOffset, 
metadata.offset());
    -                                                    } else {
    -                                                        
messageBatch.addFailedRange(rangeStart, messageEndOffset, exception);
    -                                                    }
    -                                                }
    -                                            });
    -
    -                                            messagesSent.incrementAndGet();
    -                                        } catch (final 
BufferExhaustedException bee) {
    -                                            // Not enough room in the 
buffer. Add from the beginning of this message to end of FlowFile as a failed 
range
    -                                            
messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee);
    -                                            context.yield();
    -                                            return;
    -                                        }
    -
    -                                    }
    -
    -                                    // reset BAOS so that we can start a 
new message.
    -                                    baos.reset();
    -                                    data = null;
    -                                    messageStartOffset = 
in.getBytesConsumed();
    -                                }
    -                            }
    -                        }
    -                    }
    -                });
    -
    -                messageBatch.setNumMessages(messagesSent.get());
    -            }
    +    /**
    +     *
    +     */
    +    private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, 
ProcessSession session) {
    +        if (flowFile.getAttribute(ATTR_FAILED_SEGMENTS) != null) {
    +            flowFile = session.removeAttribute(flowFile, 
ATTR_FAILED_SEGMENTS);
    +            flowFile = session.removeAttribute(flowFile, ATTR_KEY);
    +            flowFile = session.removeAttribute(flowFile, ATTR_TOPIC);
    +            flowFile = session.removeAttribute(flowFile, ATTR_DELIMITER);
             }
    +        return flowFile;
         }
     
    +    /**
    +     *
    +     */
    +   private Object determinePartition(SplittableMessageContext 
messageContext, ProcessContext context, FlowFile flowFile) {
    +       String partitionStrategy = 
context.getProperty(PARTITION_STRATEGY).getValue();
    +       String partitionValue = null;
    +       if 
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) {
    +           partitionValue = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
    +       }
    +       return partitionValue;
    +   }
     
    -    private static class Range {
    -        private final long start;
    -        private final long end;
    -        private final Long kafkaOffset;
    -
    -        public Range(final long start, final long end, final Long 
kafkaOffset) {
    -            this.start = start;
    -            this.end = end;
    -            this.kafkaOffset = kafkaOffset;
    -        }
    -
    -        public long getStart() {
    -            return start;
    -        }
    -
    -        public long getEnd() {
    -            return end;
    -        }
    -
    -        public Long getKafkaOffset() {
    -            return kafkaOffset;
    -        }
    -
    -        @Override
    -        public String toString() {
    -            return "Range[" + start + "-" + end + "]";
    +    /**
    +     *
    +     */
    +    private Map<String, String> 
buildFailedFlowFileAttributes(List<Integer> failedSegments,
    +            SplittableMessageContext messageContext) {
    +        StringBuffer buffer = new StringBuffer();
    --- End diff --
    
    It's all gone to a BitSet


> When using delimited data feature PutKafka ack'd ranges feature can break
> -------------------------------------------------------------------------
>
>                 Key: NIFI-1645
>                 URL: https://issues.apache.org/jira/browse/NIFI-1645
>             Project: Apache NiFi
>          Issue Type: Bug
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.6.0
>
>
> When using the delimited lines feature to send data to Kafka such that a 
> large set of lines that appear to be one 'flowfile' in NiFi is sent as a 
> series of 1..N messages in Kafka the mechanism of asynchronous 
> acknowledgement can break down whereby we will receive acknowledgements but 
> be unable to act on them appropriately because by then the session/data would 
> have already been considered successfully transferred.  This could in 
> certain/specific conditions mean failed acknowledgements would not result in 
> a retransfer.
> The logic this processor supports for creating child objects to address 
> failed/partial segments is extremely complicated and should likely be 
> rewritten to be greatly simplified.  Instead the SplitText feature should be 
> used to create more manageable chunks of data over which if any segment is 
> ack'd as a failure then the whole thing is failed and thus can be 
> retransmitted.  Always best to enable the user to prefer data loss or data 
> duplication on their own terms.
> Below is the relevant stack trace
> {code}
> 17:12:37 EDTERROR6162d00f-737f-3710-85f9-318c886af95f
> clpen0004.foo.com:8090PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] 
> PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] failed to process session 
> due to java.lang.IllegalStateException: 
> java.util.concurrent.ExecutionException: 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1458158883054-93724, 
> container=cont2, section=540], offset=756882, 
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in 
> this session (StandardProcessSession[id=97534]): 
> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1458158883054-93724, 
> container=cont2, section=540], offset=756882, 
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in 
> this session (StandardProcessSession[id=97534])
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to