[
https://issues.apache.org/jira/browse/NIFI-15483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083103#comment-18083103
]
ASF subversion and git services commented on NIFI-15483:
--------------------------------------------------------
Commit 8d15f03739761a777b69d743b3c3cb7e80de8f60 in nifi's branch
refs/heads/main from Rakesh Kumar Singh
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=8d15f037397 ]
NIFI-15483: Fixed PublishAMQP routing FlowFiles to success when broker cannot
deliver message
PublishAMQP uses mandatory=true on basicPublish() so the broker returns
messages it cannot route to any queue. However, the return arrives
asynchronously via ReturnListener.handleReturn() on the AMQP I/O thread
while the publishing thread had already moved on to
session.transfer(REL_SUCCESS).
The UndeliverableMessageLogger only logged a warning — it never signaled
failure back to publish() or onTrigger(), so every unroutable message was
silently counted as a success despite never reaching any consumer.
Fix:
- Enabled Publisher Confirms (channel.confirmSelect()) in the constructor.
The broker's basic.return frame for an unroutable message is guaranteed
to arrive before the corresponding confirm frame, so waitForConfirms()
acts as a synchronization barrier that makes return detection reliable.
- Added an AtomicReference<String> field (undeliverableReturnReason) that
UndeliverableMessageLogger.handleReturn() populates with exchange/routingKey/
replyCode/replyText when a message is returned.
- publish() now: resets the field before each call, calls waitForConfirms(5s)
to synchronize with the broker, then checks the field and throws AMQPException
if the message was returned — causing onTrigger() to route to REL_FAILURE.
- Broker NACKs (e.g., resource alarm) are also now surfaced as AMQPException
because waitForConfirms() returns false on NACK.
- Added regression tests to verify that AMQPPublisher and PublishAMQP correctly
route FlowFiles to REL_FAILURE for all broker-side failure modes:
- Added ShutdownSignalException to the catch block in AMQPPublisher.publish()
- Converts the channel-close signal into AMQPException so PublishAMQP routes
the FlowFile to REL_FAILURE with a descriptive error message
- Added ShutdownSignalException import
NIFI-15483: Added Delivery Guarantee property to make Publisher Confirms opt-in
Added a new Delivery Guarantee property to PublishAMQP with two options:
At most once (default): works like the original - sends the message without
waiting for a broker reply. If the message cannot be delivered, only a warning
is logged and the FlowFile routes to success. Best for high throughput.
At least once: turns on RabbitMQ Publisher Confirms. The processor waits for
the broker to confirm the message before routing. If the message is returned
or the broker sends a NACK, the FlowFile routes to failure instead of success.
This prevents silent data loss but can be much slower, especially with remote
brokers.
This closes #11213.
Signed-off-by: Peter Turcsanyi <[email protected]>
> PublishAMQP doesn't route on publish failure
> --------------------------------------------
>
> Key: NIFI-15483
> URL: https://issues.apache.org/jira/browse/NIFI-15483
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 2.6.0
> Environment: RHEL 9, NiFi 2.6.0 + RabbitMQ 4.2
> Reporter: Will James
> Assignee: Rakesh Kumar Singh
> Priority: Critical
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> The issue from NIFI-5639 still persists in NiFi v2. Essentially, If you
> attempt to publish a message using PublishAMQP using invalid properties (e.g.
> incorrect exchange name or user permission), the message would fail to
> publish but will still go to the 'success' relationship. This contradicts
> the PublishAMQP documentation for the the failure relationship: '{_}All
> FlowFiles that cannot be routed to the AMQP destination are routed to this
> relationship.'{_}
> Steps to reproduce:
> # Set up a *PublishAMQP* processor that successfully publishes to a RabbitMQ
> queue
> # Change the _*Exchange Name*_ property on the *PublishAMQP* to an exchange
> that doesn't exist in RabbitMQ.
> # Send a valid flow file to the *PublishAMQP* processor. RabbitMQ will fail
> to process the message, but NiFi still routes it to 'success'. Depending on
> the configuration of RabbitMQ, there may be an error bulletin on the NiFi
> processor. I've seen this happen when publishing using a user with limited
> queue/exchange permissions - NiFi shows a permission denied response and
> still routes to success.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)