Claudio Mattioni created NIFI-15892:
---------------------------------------

             Summary: ConsumeAMQP should nack batched deliveries when 
ProcessSession commit fails
                 Key: NIFI-15892
                 URL: https://issues.apache.org/jira/browse/NIFI-15892
             Project: Apache NiFi
          Issue Type: Bug
          Components: Extensions
    Affects Versions: 2.9.0
            Reporter: Claudio Mattioni


ConsumeAMQP with Auto-Acknowledge Messages set to false and Batch Size greater 
than 1 can leave RabbitMQ deliveries unacknowledged until the broker closes the 
channel due to consumer_timeout.

Current behavior observed with NiFi/MiNiFi 2.9.0:

- ConsumeAMQP is configured with:
  - Auto-Acknowledge Messages = false
  - Batch Size > 1
  - Prefetch Count > 1

- RabbitMQ eventually logs:
  Consumer ... has timed out waiting for a consumer acknowledgement of a 
delivery.
  Timeout used: 1800000 ms.
  operation none caused a channel exception precondition_failed:
  delivery acknowledgement on channel 1 timed out.

- MiNiFi then logs:
  AMQP failure, dropping the client
  AMQPException: AMQP client has lost connection
  Consumer is closed, discarding message

This resembles the older NIFI-5896 behavior, where Batch Size > 1 with 
Auto-Acknowledge disabled could result in messages remaining unacknowledged. 
NIFI-5896 was closed with a workaround of Batch Size = 1, but the current 
ConsumeAMQP implementation still appears to acknowledge batched messages only 
after ProcessSession commit, using a cumulative ack for the last delivery tag.

The current implementation handles the successful commit path, but does not 
explicitly nack/requeue outstanding batched deliveries when the ProcessSession 
commit fails or cannot complete successfully. In such cases, messages can 
remain unacknowledged until the RabbitMQ channel is closed by consumer_timeout.

Proposed improvement:

When Auto-Acknowledge Messages is false and a batch has been consumed:

1. On successful ProcessSession commit:
   - basicAck(lastDeliveryTag, true)

2. On ProcessSession commit failure:
   - basicNack(lastDeliveryTag, true, true)

This would preserve the documented at-least-once behavior while allowing Batch 
Size > 1 to be used safely.

P.S. This is marked as Major because it can make ConsumeAMQP unreliable with 
Auto-Acknowledge=false and Batch Size > 1, causing RabbitMQ channels to be 
closed due to consumer acknowledgement timeout. A workaround exists by using 
Batch Size = 1, but that significantly limits throughput.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to