[ https://issues.apache.org/jira/browse/NIFI-1645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206939#comment-15206939 ]
ASF GitHub Bot commented on NIFI-1645: -------------------------------------- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/295#discussion_r57040041 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java --- @@ -446,444 +336,136 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String .build(); } - @Override - public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { - FlowFileMessageBatch batch; - while ((batch = completeBatches.poll()) != null) { - batch.completeSession(); - } + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(); - final ProcessSession session = sessionFactory.createSession(); - final FlowFile flowFile = session.get(); - if (flowFile != null){ - Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - doOnTrigger(context, session, flowFile); - return null; - } - }); - try { - consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - consumptionFuture.cancel(true); - Thread.currentThread().interrupt(); - getLogger().warn("Interrupted while sending messages", e); - } catch (ExecutionException e) { - throw new IllegalStateException(e); - } catch (TimeoutException e) { - consumptionFuture.cancel(true); - getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while sending messages", e); - } - } else { - context.yield(); + final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue(); + if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) + && !validationContext.getProperty(PARTITION).isSet()) { + results.add(new ValidationResult.Builder().subject("Partition").valid(false) + .explanation("The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy") + .build()); } + return results; } - private void doOnTrigger(final ProcessContext context, ProcessSession session, final FlowFile flowFile) throws ProcessException { - final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8); - String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); - if (delimiter != null) { - delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); - } - - final Producer<byte[], byte[]> producer = getProducer(); - - if (delimiter == null) { - // Send the entire FlowFile as a single message. - final byte[] value = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, value); - } - }); - - final Integer partition; - try { - partition = getPartition(context, flowFile, topic); - } catch (final Exception e) { - getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e}); - session.transfer(session.penalize(flowFile), REL_FAILURE); - session.commit(); - return; - } - - final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, value); - - final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic); - messageBatch.setNumMessages(1); - activeBatches.add(messageBatch); - - try { - producer.send(producerRecord, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception == null) { - // record was successfully sent. - messageBatch.addSuccessfulRange(0L, flowFile.getSize(), metadata.offset()); - } else { - messageBatch.addFailedRange(0L, flowFile.getSize(), exception); - } - } - }); - } catch (final BufferExhaustedException bee) { - messageBatch.addFailedRange(0L, flowFile.getSize(), bee); - context.yield(); - return; - } - } else { - final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8); - - // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see - // if it matches some pattern. We can use this to search for the delimiter as we read through - // the stream of bytes in the FlowFile - final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes); - - final LongHolder messagesSent = new LongHolder(0L); - final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic); - activeBatches.add(messageBatch); - - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - byte[] data = null; // contents of a single message - - boolean streamFinished = false; - - int nextByte; - try (final InputStream bufferedIn = new BufferedInputStream(rawIn); - final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { - - long messageStartOffset = in.getBytesConsumed(); - - // read until we're out of data. - while (!streamFinished) { - nextByte = in.read(); - - if (nextByte > -1) { - baos.write(nextByte); - } - - if (nextByte == -1) { - // we ran out of data. This message is complete. - data = baos.toByteArray(); - streamFinished = true; - } else if (buffer.addAndCompare((byte) nextByte)) { - // we matched our delimiter. This message is complete. We want all of the bytes from the - // underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want - // the delimiter itself to be sent. - data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length); - } - - if (data != null) { - final long messageEndOffset = in.getBytesConsumed(); - - // If the message has no data, ignore it. - if (data.length != 0) { - final Integer partition; - try { - partition = getPartition(context, flowFile, topic); - } catch (final Exception e) { - messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e); - getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e}); - continue; - } - - - final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data); - final long rangeStart = messageStartOffset; - - try { - producer.send(producerRecord, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception == null) { - // record was successfully sent. - messageBatch.addSuccessfulRange(rangeStart, messageEndOffset, metadata.offset()); - } else { - messageBatch.addFailedRange(rangeStart, messageEndOffset, exception); - } - } - }); - - messagesSent.incrementAndGet(); - } catch (final BufferExhaustedException bee) { - // Not enough room in the buffer. Add from the beginning of this message to end of FlowFile as a failed range - messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee); - context.yield(); - return; - } - - } - - // reset BAOS so that we can start a new message. - baos.reset(); - data = null; - messageStartOffset = in.getBytesConsumed(); - } - } - } - } - }); - - messageBatch.setNumMessages(messagesSent.get()); - } + /** + * + */ + private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) { + if (flowFile.getAttribute(ATTR_FAILED_SEGMENTS) != null) { + flowFile = session.removeAttribute(flowFile, ATTR_FAILED_SEGMENTS); + flowFile = session.removeAttribute(flowFile, ATTR_KEY); + flowFile = session.removeAttribute(flowFile, ATTR_TOPIC); + flowFile = session.removeAttribute(flowFile, ATTR_DELIMITER); } + return flowFile; } + /** + * + */ + private Object determinePartition(SplittableMessageContext messageContext, ProcessContext context, FlowFile flowFile) { + String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); + String partitionValue = null; + if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) { + partitionValue = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); + } + return partitionValue; + } - private static class Range { - private final long start; - private final long end; - private final Long kafkaOffset; - - public Range(final long start, final long end, final Long kafkaOffset) { - this.start = start; - this.end = end; - this.kafkaOffset = kafkaOffset; - } - - public long getStart() { - return start; - } - - public long getEnd() { - return end; - } - - public Long getKafkaOffset() { - return kafkaOffset; - } - - @Override - public String toString() { - return "Range[" + start + "-" + end + "]"; + /** + * + */ + private Map<String, String> buildFailedFlowFileAttributes(List<Integer> failedSegments, + SplittableMessageContext messageContext) { + StringBuffer buffer = new StringBuffer(); --- End diff -- It's all gone to a BitSet > When using delimited data feature PutKafka ack'd ranges feature can break > ------------------------------------------------------------------------- > > Key: NIFI-1645 > URL: https://issues.apache.org/jira/browse/NIFI-1645 > Project: Apache NiFi > Issue Type: Bug > Reporter: Oleg Zhurakousky > Assignee: Oleg Zhurakousky > Fix For: 0.6.0 > > > When using the delimited lines feature to send data to Kafka such that a > large set of lines that appear to be one 'flowfile' in NiFi is sent as a > series of 1..N messages in Kafka the mechanism of asynchronous > acknowledgement can break down whereby we will receive acknowledgements but > be unable to act on them appropriately because by then the session/data would > have already been considered successfully transferred. This could in > certain/specific conditions mean failed acknowledgements would not result in > a retransfer. > The logic this processor supports for creating child objects to address > failed/partial segments is extremely complicated and should likely be > rewritten to be greatly simplified. Instead the SplitText feature should be > used to create more manageable chunks of data over which if any segment is > ack'd as a failure then the whole thing is failed and thus can be > retransmitted. Always best to enable the user to prefer data loss or data > duplication on their own terms. > Below is the relevant stack trace > {code} > 17:12:37 EDTERROR6162d00f-737f-3710-85f9-318c886af95f > clpen0004.foo.com:8090PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] > PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] failed to process session > due to java.lang.IllegalStateException: > java.util.concurrent.ExecutionException: > org.apache.nifi.processor.exception.FlowFileHandlingException: > StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim > [resourceClaim=StandardResourceClaim[id=1458158883054-93724, > container=cont2, section=540], offset=756882, > length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in > this session (StandardProcessSession[id=97534]): > java.lang.IllegalStateException: java.util.concurrent.ExecutionException: > org.apache.nifi.processor.exception.FlowFileHandlingException: > StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim > [resourceClaim=StandardResourceClaim[id=1458158883054-93724, > container=cont2, section=540], offset=756882, > length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in > this session (StandardProcessSession[id=97534]) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)