[ https://issues.apache.org/jira/browse/FLINK-21373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-21373: ----------------------------------- Labels: auto-unassigned pull-request-available (was: auto-unassigned) > Port RabbitMQ Sink to FLIP-143 API > ---------------------------------- > > Key: FLINK-21373 > URL: https://issues.apache.org/jira/browse/FLINK-21373 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ > Reporter: Jan Westphal > Priority: Minor > Labels: auto-unassigned, pull-request-available > Fix For: 1.12.0 > > > *Structure* > The unified Sink API provides a Writer, a Committer and a GlobalCommitter. > Right now we don’t see the need to use the Committer and GlobalCommitter as > the Writer is sufficient to hold up to the consistencies. Since we are in the > need of asynchronous RabbitMQ callbacks to know whether or not a message was > published successfully and have to store unacknowledged messages in the > checkpoint, there would be a large bidirectional communication and state > exchange overhead between the Writer and the Committer. > *At-most-once* > The Writer receives a message from Flink and simply publishes it to RabbitMQ. > The current RabbitMQ Sink only provides this mode. > *At-least-once* > The objective here is, to receive an acknowledgement by RabbitMQ for > published messages. Therefore, before publishing a message, we store the > message in a Map with the sequence number as its key. If the message is > acknowledged by RabbitMQ we can remove it from the Map. If we don’t receive > an acknowledgement for a certain amount of time (or a RabbitMQ specific so > called negative acknowledgement) we will try to resend the message when > doing a checkpoint. > *Exactly-once* > On checkpointing we send all messages by Flink in transaction mode to > RabbitMQ. This way, all the messages get sent or are rolled back on failure. > All messages that are not sent successfully are written to the checkpoint and > are tried to be sent with the next checkpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)