[ https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17351619#comment-17351619 ]
Michał Ciesielczyk commented on FLINK-22698: -------------------------------------------- [~austince] In other data sources it's handled similarly (via timeout) or by interrupting the consumer thread (for instance Kafka source has a consumer thread running in the background). Adding a delivery timeout will be simple and will do the job here, so I would not recommend refactoring the whole source into thread-based (but maybe its worth considering when implementing the FLIP-27 source). [~nicholasjiang] Yes, I will add it to configuration. I'm still not sure what should be the default. I would probably go with 0 - meaning no deliveryTimeout (and wait forever) - as this is the current approach (and if fixed in 1.12 or 1.13 this is the way to go). Although, probably in all real-world cases it makes no sense to wait forever for something, so maybe something like 30s would be better here (so in worst case, when there is no input data, the local LinkedBlockingQueue would be checked for new elements every 30s) The unit test should be quite straightforward like: 1) start the source (with very low deliveryTimeout) 2) cancel the source 3) wait until source stops (in case of timeout the test fails) 4) check if there was no exception (there shouldn't be any cause its stopped gracefully) The current tests use a mock that always returns some value from the consumer immediately. That is why the above case would pass the tests now (so additionally the mock will have to be slightly adjusted for this specific test only) > RabbitMQ source does not stop unless message arrives in queue > ------------------------------------------------------------- > > Key: FLINK-22698 > URL: https://issues.apache.org/jira/browse/FLINK-22698 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ > Affects Versions: 1.12.0 > Reporter: Austin Cawley-Edwards > Assignee: Michał Ciesielczyk > Priority: Major > Attachments: taskmanager_thread_dump.json > > > In a streaming job with multiple RMQSources, a stop-with-savepoint request > has unexpected behavior. Regular checkpoints and savepoints complete > successfully, it is only the stop-with-savepoint request where this behavior > is seen. > > *Expected Behavior:* > The stop-with-savepoint request stops the job with a FINISHED state. > > *Actual Behavior:* > The stop-with-savepoint request either times out or hangs indefinitely unless > a message arrives in all the queues that the job consumes from after the > stop-with-savepoint request is made. > > *Current workaround:* > Send a sentinel value to each of the queues consumed by the job that the > deserialization schema checks in its isEndOfStream method. This is cumbersome > and makes it difficult to do stateful upgrades, as coordination with another > system is now necessary. > > > The TaskManager thread dump is attached. > -- This message was sent by Atlassian Jira (v8.3.4#803005)