[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17125362#comment-17125362 ]
Austin Cawley-Edwards edited comment on FLINK-10195 at 6/4/20, 2:50 PM: ------------------------------------------------------------------------ Yes, that is correct, though I think it is still an improvement on what's here and allows the users to tune their job according to their needs, but I don't think the performance issue that you cite can be fixed with what Rabbit provides. EDIT: we might be able to handle this by updating prefetch counts dynamically if the buffer has space and there are many unacked messages waiting to be acked on checkpoint, though I think that might be too much for an initial implementation. If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and checkpointing interval. This could also be an opt-in/ opt-out change if there are cases that need it disabled, and we should definitely update the docs[1]. Our company still actively uses Rabbit – I'm happy to build off your PR and test it out in our jobs. [1]: [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source] was (Author: austince): Yes, that is correct, though I think it is still an improvement on what's here and allows the users to tune their job according to their needs, but I don't think the performance issue that you cite can be fixed with what Rabbit provides. EDIT: we might be able to handle this by updating prefetch counts dynamically if the buffer has space and there are many unacked messages waiting to be acked on checkpoint, though I think that might be too much for an initial implementation. If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and checkpointing interval. This could also be an opt-in/ opt-out change if there are cases that need it disabled, and we should definitely update the docs. Our company still actively uses Rabbit – I'm happy to build off your PR and test it out in our jobs. [1]: [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source] > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > ----------------------------------------------------------------- > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ > Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 > Reporter: Luka Jurukovski > Assignee: Luka Jurukovski > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)