Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/295#discussion_r57040041
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 ---
    @@ -446,444 +336,136 @@ protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String
                     .build();
         }
     
    -
         @Override
    -    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
    -        FlowFileMessageBatch batch;
    -        while ((batch = completeBatches.poll()) != null) {
    -            batch.completeSession();
    -        }
    +    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
    +        final List<ValidationResult> results = new ArrayList<>();
     
    -        final ProcessSession session = sessionFactory.createSession();
    -        final FlowFile flowFile = session.get();
    -        if (flowFile != null){
    -            Future<Void> consumptionFuture = this.executor.submit(new 
Callable<Void>() {
    -                @Override
    -                public Void call() throws Exception {
    -                    doOnTrigger(context, session, flowFile);
    -                    return null;
    -                }
    -            });
    -            try {
    -                consumptionFuture.get(this.deadlockTimeout, 
TimeUnit.MILLISECONDS);
    -            } catch (InterruptedException e) {
    -                consumptionFuture.cancel(true);
    -                Thread.currentThread().interrupt();
    -                getLogger().warn("Interrupted while sending messages", e);
    -            } catch (ExecutionException e) {
    -                throw new IllegalStateException(e);
    -            } catch (TimeoutException e) {
    -                consumptionFuture.cancel(true);
    -                getLogger().warn("Timed out after " + this.deadlockTimeout 
+ " milliseconds while sending messages", e);
    -            }
    -        } else {
    -            context.yield();
    +        final String partitionStrategy = 
validationContext.getProperty(PARTITION_STRATEGY).getValue();
    +        if 
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())
    +                && !validationContext.getProperty(PARTITION).isSet()) {
    +            results.add(new 
ValidationResult.Builder().subject("Partition").valid(false)
    +                    .explanation("The <Partition> property must be set 
when configured to use the User-Defined Partitioning Strategy")
    +                    .build());
             }
    +        return results;
         }
     
    -    private void doOnTrigger(final ProcessContext context, ProcessSession 
session, final FlowFile flowFile) throws ProcessException {
    -        final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    -        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
    -        final byte[] keyBytes = key == null ? null : 
key.getBytes(StandardCharsets.UTF_8);
    -        String delimiter = 
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
    -        if (delimiter != null) {
    -            delimiter = delimiter.replace("\\n", "\n").replace("\\r", 
"\r").replace("\\t", "\t");
    -        }
    -
    -        final Producer<byte[], byte[]> producer = getProducer();
    -
    -        if (delimiter == null) {
    -            // Send the entire FlowFile as a single message.
    -            final byte[] value = new byte[(int) flowFile.getSize()];
    -            session.read(flowFile, new InputStreamCallback() {
    -                @Override
    -                public void process(final InputStream in) throws 
IOException {
    -                    StreamUtils.fillBuffer(in, value);
    -                }
    -            });
    -
    -            final Integer partition;
    -            try {
    -                partition = getPartition(context, flowFile, topic);
    -            } catch (final Exception e) {
    -                getLogger().error("Failed to obtain a partition for {} due 
to {}", new Object[] {flowFile, e});
    -                session.transfer(session.penalize(flowFile), REL_FAILURE);
    -                session.commit();
    -                return;
    -            }
    -
    -            final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(topic, partition, keyBytes, value);
    -
    -            final FlowFileMessageBatch messageBatch = new 
FlowFileMessageBatch(session, flowFile, topic);
    -            messageBatch.setNumMessages(1);
    -            activeBatches.add(messageBatch);
    -
    -            try {
    -                producer.send(producerRecord, new Callback() {
    -                    @Override
    -                    public void onCompletion(final RecordMetadata 
metadata, final Exception exception) {
    -                        if (exception == null) {
    -                            // record was successfully sent.
    -                            messageBatch.addSuccessfulRange(0L, 
flowFile.getSize(), metadata.offset());
    -                        } else {
    -                            messageBatch.addFailedRange(0L, 
flowFile.getSize(), exception);
    -                        }
    -                    }
    -                });
    -            } catch (final BufferExhaustedException bee) {
    -                messageBatch.addFailedRange(0L, flowFile.getSize(), bee);
    -                context.yield();
    -                return;
    -            }
    -        } else {
    -            final byte[] delimiterBytes = 
delimiter.getBytes(StandardCharsets.UTF_8);
    -
    -            // The NonThreadSafeCircularBuffer allows us to add a byte 
from the stream one at a time and see
    -            // if it matches some pattern. We can use this to search for 
the delimiter as we read through
    -            // the stream of bytes in the FlowFile
    -            final NonThreadSafeCircularBuffer buffer = new 
NonThreadSafeCircularBuffer(delimiterBytes);
    -
    -            final LongHolder messagesSent = new LongHolder(0L);
    -            final FlowFileMessageBatch messageBatch = new 
FlowFileMessageBatch(session, flowFile, topic);
    -            activeBatches.add(messageBatch);
    -
    -            try (final ByteArrayOutputStream baos = new 
ByteArrayOutputStream()) {
    -                session.read(flowFile, new InputStreamCallback() {
    -                    @Override
    -                    public void process(final InputStream rawIn) throws 
IOException {
    -                        byte[] data = null; // contents of a single message
    -
    -                        boolean streamFinished = false;
    -
    -                        int nextByte;
    -                        try (final InputStream bufferedIn = new 
BufferedInputStream(rawIn);
    -                            final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
    -
    -                            long messageStartOffset = 
in.getBytesConsumed();
    -
    -                            // read until we're out of data.
    -                            while (!streamFinished) {
    -                                nextByte = in.read();
    -
    -                                if (nextByte > -1) {
    -                                    baos.write(nextByte);
    -                                }
    -
    -                                if (nextByte == -1) {
    -                                    // we ran out of data. This message is 
complete.
    -                                    data = baos.toByteArray();
    -                                    streamFinished = true;
    -                                } else if (buffer.addAndCompare((byte) 
nextByte)) {
    -                                    // we matched our delimiter. This 
message is complete. We want all of the bytes from the
    -                                    // underlying BAOS exception for the 
last 'delimiterBytes.length' bytes because we don't want
    -                                    // the delimiter itself to be sent.
    -                                    data = 
Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - 
delimiterBytes.length);
    -                                }
    -
    -                                if (data != null) {
    -                                    final long messageEndOffset = 
in.getBytesConsumed();
    -
    -                                    // If the message has no data, ignore 
it.
    -                                    if (data.length != 0) {
    -                                        final Integer partition;
    -                                        try {
    -                                            partition = 
getPartition(context, flowFile, topic);
    -                                        } catch (final Exception e) {
    -                                            
messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e);
    -                                            getLogger().error("Failed to 
obtain a partition for {} due to {}", new Object[] {flowFile, e});
    -                                            continue;
    -                                        }
    -
    -
    -                                        final ProducerRecord<byte[], 
byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data);
    -                                        final long rangeStart = 
messageStartOffset;
    -
    -                                        try {
    -                                            producer.send(producerRecord, 
new Callback() {
    -                                                @Override
    -                                                public void 
onCompletion(final RecordMetadata metadata, final Exception exception) {
    -                                                    if (exception == null) 
{
    -                                                        // record was 
successfully sent.
    -                                                        
messageBatch.addSuccessfulRange(rangeStart, messageEndOffset, 
metadata.offset());
    -                                                    } else {
    -                                                        
messageBatch.addFailedRange(rangeStart, messageEndOffset, exception);
    -                                                    }
    -                                                }
    -                                            });
    -
    -                                            messagesSent.incrementAndGet();
    -                                        } catch (final 
BufferExhaustedException bee) {
    -                                            // Not enough room in the 
buffer. Add from the beginning of this message to end of FlowFile as a failed 
range
    -                                            
messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee);
    -                                            context.yield();
    -                                            return;
    -                                        }
    -
    -                                    }
    -
    -                                    // reset BAOS so that we can start a 
new message.
    -                                    baos.reset();
    -                                    data = null;
    -                                    messageStartOffset = 
in.getBytesConsumed();
    -                                }
    -                            }
    -                        }
    -                    }
    -                });
    -
    -                messageBatch.setNumMessages(messagesSent.get());
    -            }
    +    /**
    +     *
    +     */
    +    private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, 
ProcessSession session) {
    +        if (flowFile.getAttribute(ATTR_FAILED_SEGMENTS) != null) {
    +            flowFile = session.removeAttribute(flowFile, 
ATTR_FAILED_SEGMENTS);
    +            flowFile = session.removeAttribute(flowFile, ATTR_KEY);
    +            flowFile = session.removeAttribute(flowFile, ATTR_TOPIC);
    +            flowFile = session.removeAttribute(flowFile, ATTR_DELIMITER);
             }
    +        return flowFile;
         }
     
    +    /**
    +     *
    +     */
    +   private Object determinePartition(SplittableMessageContext 
messageContext, ProcessContext context, FlowFile flowFile) {
    +       String partitionStrategy = 
context.getProperty(PARTITION_STRATEGY).getValue();
    +       String partitionValue = null;
    +       if 
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) {
    +           partitionValue = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
    +       }
    +       return partitionValue;
    +   }
     
    -    private static class Range {
    -        private final long start;
    -        private final long end;
    -        private final Long kafkaOffset;
    -
    -        public Range(final long start, final long end, final Long 
kafkaOffset) {
    -            this.start = start;
    -            this.end = end;
    -            this.kafkaOffset = kafkaOffset;
    -        }
    -
    -        public long getStart() {
    -            return start;
    -        }
    -
    -        public long getEnd() {
    -            return end;
    -        }
    -
    -        public Long getKafkaOffset() {
    -            return kafkaOffset;
    -        }
    -
    -        @Override
    -        public String toString() {
    -            return "Range[" + start + "-" + end + "]";
    +    /**
    +     *
    +     */
    +    private Map<String, String> 
buildFailedFlowFileAttributes(List<Integer> failedSegments,
    +            SplittableMessageContext messageContext) {
    +        StringBuffer buffer = new StringBuffer();
    --- End diff --
    
    It's all gone to a BitSet


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to