StephanEwen commented on pull request #11856: URL: https://github.com/apache/flink/pull/11856#issuecomment-619636516
The changes look fine to me, +1 to merge this. Maybe some comments for separate improvements, as follow-ups: - The use of `AtomicBoolean` is a bit confusing. It looks like these are mainly `volatile boolean` to report status with a `happens-before` ordering/visibility. - The whole "interrupting"-based flow control is difficult to follow, and I believe very difficult to get right. One interrupt flag clearing/setting can make the difference between correctness, a deadlock, or a "hot loop lock". And the interrupt flags are hellishly hard to reason about, especially because they can be mutated by any component outside the fetcher. One way to solve this is by turning the fetch queue into a "wakeup-able queue". That pulls a good amount of the the wakeup logic out of the fetcher class, making that one easier to read and to test. The [Handover](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java) in the current KafkaConsumer is like a "wakeup-able size-one queue", as an example. We probably need something slightly different (wake up consumer of the queue). A simple notify/wait implementation is quite straightforward, and if we have time we can make this more efficient with Java's `LockSupport` classes. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org