StephanEwen edited a comment on pull request #11856:
URL: https://github.com/apache/flink/pull/11856#issuecomment-619636516


   The changes look fine to me, +1 to merge this.
   
   Maybe some comments for separate improvements, as follow-ups:
     - The use of `AtomicBoolean` is a bit confusing. It looks like these are 
mainly `volatile boolean` to report status with a `happens-before` 
ordering/visibility.
   
     - The whole "interrupting"-based flow control is difficult to follow, and 
I believe very difficult to get right. One interrupt flag clearing/setting can 
make the difference between correctness, a deadlock, or a "hot loop lock". And 
the interrupt flags are hellishly hard to reason about, especially because they 
can be mutated by any component outside the fetcher.
   
       One way to solve this is by turning the fetch queue into a "wakeup-able 
queue". That pulls a good amount of the the wakeup logic out of the fetcher 
class, making that one easier to read and to test. The 
[Handover](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java)
 in the current KafkaConsumer is like a "wakeup-able size-one queue", as an 
example. We probably need something slightly different (wake up consumer of the 
queue). A simple notify/wait implementation is quite straightforward, and if we 
have time we can make this more efficient with Java's `LockSupport` classes.
   
     - The fact that this Interruption-based flow is hard to get right looks 
like it is at least part of the reason for the original bug, so I think it is 
worth thinking about.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to