Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/295#discussion_r57026907 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java --- @@ -446,444 +336,136 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String .build(); } - @Override - public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { - FlowFileMessageBatch batch; - while ((batch = completeBatches.poll()) != null) { - batch.completeSession(); - } + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(); - final ProcessSession session = sessionFactory.createSession(); - final FlowFile flowFile = session.get(); - if (flowFile != null){ - Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - doOnTrigger(context, session, flowFile); - return null; - } - }); - try { - consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - consumptionFuture.cancel(true); - Thread.currentThread().interrupt(); - getLogger().warn("Interrupted while sending messages", e); - } catch (ExecutionException e) { - throw new IllegalStateException(e); - } catch (TimeoutException e) { - consumptionFuture.cancel(true); - getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while sending messages", e); - } - } else { - context.yield(); + final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue(); + if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) + && !validationContext.getProperty(PARTITION).isSet()) { + results.add(new ValidationResult.Builder().subject("Partition").valid(false) + .explanation("The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy") + .build()); } + return results; } - private void doOnTrigger(final ProcessContext context, ProcessSession session, final FlowFile flowFile) throws ProcessException { - final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8); - String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); - if (delimiter != null) { - delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); - } - - final Producer<byte[], byte[]> producer = getProducer(); - - if (delimiter == null) { - // Send the entire FlowFile as a single message. - final byte[] value = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, value); - } - }); - - final Integer partition; - try { - partition = getPartition(context, flowFile, topic); - } catch (final Exception e) { - getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e}); - session.transfer(session.penalize(flowFile), REL_FAILURE); - session.commit(); - return; - } - - final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, value); - - final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic); - messageBatch.setNumMessages(1); - activeBatches.add(messageBatch); - - try { - producer.send(producerRecord, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception == null) { - // record was successfully sent. - messageBatch.addSuccessfulRange(0L, flowFile.getSize(), metadata.offset()); - } else { - messageBatch.addFailedRange(0L, flowFile.getSize(), exception); - } - } - }); - } catch (final BufferExhaustedException bee) { - messageBatch.addFailedRange(0L, flowFile.getSize(), bee); - context.yield(); - return; - } - } else { - final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8); - - // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see - // if it matches some pattern. We can use this to search for the delimiter as we read through - // the stream of bytes in the FlowFile - final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes); - - final LongHolder messagesSent = new LongHolder(0L); - final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic); - activeBatches.add(messageBatch); - - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - byte[] data = null; // contents of a single message - - boolean streamFinished = false; - - int nextByte; - try (final InputStream bufferedIn = new BufferedInputStream(rawIn); - final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { - - long messageStartOffset = in.getBytesConsumed(); - - // read until we're out of data. - while (!streamFinished) { - nextByte = in.read(); - - if (nextByte > -1) { - baos.write(nextByte); - } - - if (nextByte == -1) { - // we ran out of data. This message is complete. - data = baos.toByteArray(); - streamFinished = true; - } else if (buffer.addAndCompare((byte) nextByte)) { - // we matched our delimiter. This message is complete. We want all of the bytes from the - // underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want - // the delimiter itself to be sent. - data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length); - } - - if (data != null) { - final long messageEndOffset = in.getBytesConsumed(); - - // If the message has no data, ignore it. - if (data.length != 0) { - final Integer partition; - try { - partition = getPartition(context, flowFile, topic); - } catch (final Exception e) { - messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e); - getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e}); - continue; - } - - - final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data); - final long rangeStart = messageStartOffset; - - try { - producer.send(producerRecord, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception == null) { - // record was successfully sent. - messageBatch.addSuccessfulRange(rangeStart, messageEndOffset, metadata.offset()); - } else { - messageBatch.addFailedRange(rangeStart, messageEndOffset, exception); - } - } - }); - - messagesSent.incrementAndGet(); - } catch (final BufferExhaustedException bee) { - // Not enough room in the buffer. Add from the beginning of this message to end of FlowFile as a failed range - messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee); - context.yield(); - return; - } - - } - - // reset BAOS so that we can start a new message. - baos.reset(); - data = null; - messageStartOffset = in.getBytesConsumed(); - } - } - } - } - }); - - messageBatch.setNumMessages(messagesSent.get()); - } + /** + * + */ + private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) { + if (flowFile.getAttribute(ATTR_FAILED_SEGMENTS) != null) { + flowFile = session.removeAttribute(flowFile, ATTR_FAILED_SEGMENTS); + flowFile = session.removeAttribute(flowFile, ATTR_KEY); + flowFile = session.removeAttribute(flowFile, ATTR_TOPIC); + flowFile = session.removeAttribute(flowFile, ATTR_DELIMITER); } + return flowFile; } + /** + * + */ + private Object determinePartition(SplittableMessageContext messageContext, ProcessContext context, FlowFile flowFile) { + String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); + String partitionValue = null; + if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) { + partitionValue = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); + } + return partitionValue; + } - private static class Range { - private final long start; - private final long end; - private final Long kafkaOffset; - - public Range(final long start, final long end, final Long kafkaOffset) { - this.start = start; - this.end = end; - this.kafkaOffset = kafkaOffset; - } - - public long getStart() { - return start; - } - - public long getEnd() { - return end; - } - - public Long getKafkaOffset() { - return kafkaOffset; - } - - @Override - public String toString() { - return "Range[" + start + "-" + end + "]"; + /** + * + */ + private Map<String, String> buildFailedFlowFileAttributes(List<Integer> failedSegments, + SplittableMessageContext messageContext) { + StringBuffer buffer = new StringBuffer(); --- End diff -- I think we should use StringBuilder rather than StringBuffer here.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---