Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/295#discussion_r57096148
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
 ---
    @@ -104,21 +128,52 @@ BitSet publish(SplittableMessageContext 
messageContext, InputStream contentStrea
                     byte[] content = scanner.next().getBytes();
                     if (content.length > 0){
                         byte[] key = messageContext.getKeyBytes();
    -                    partitionKey = partitionKey == null ? key : 
partitionKey;// the whole thing may still be null
                         String topicName = messageContext.getTopicName();
    +                    if (partitionKey == null && key != null) {
    +                        partitionKey = this.getPartition(key, topicName);
    +                    }
                         if (prevFailedSegmentIndexes == null || 
prevFailedSegmentIndexes.get(segmentCounter)) {
    -                        KeyedMessage<byte[], byte[]> message = new 
KeyedMessage<byte[], byte[]>(topicName, key, partitionKey, content);
    -                        if (!this.toKafka(message)) {
    -                            failedSegments.set(segmentCounter);
    -                        }
    +                        ProducerRecord<byte[], byte[]> message = new 
ProducerRecord<byte[], byte[]>(topicName, partitionKey, key, content);
    +                        sendFutures.add(this.toKafka(message));
                         }
                     }
                     segmentCounter++;
                 }
             }
    +        segmentCounter = 0;
    +        for (Future<RecordMetadata> future : sendFutures) {
    +            try {
    +                future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
    --- End diff --
    
    Here, we could end up waiting the ack wait time for each one of the 
messages. If we sent 1,000 messages we could wait 1,000 times the length of the 
ack wait time. Perhaps we should instead do something like:
    ```
    final long endTime = System.currentTimeMillis() + ackWaitTime;
    for (Future<RecordMetadata> future : sendFutures) {
        long toWait = endTime - System.currentTimeMillis();
        if (toWait < 0 && !future.isDone() {
          // consider failure
        } else {
            try {
                future.get(toWait, TimeUnit.MILLISECONDS);
            }
            catch (...) {
                ...
            }
       }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to