Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160694722 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +94,37 @@ public void run() { }); } + /** + * Try to enqueue the reader once receiving credit notification from the consumer or receiving + * non-empty reader notification from the producer. Only one thread would trigger the actual + * enqueue after checking the reader's availability, so there is no race condition here. + */ + @VisibleForTesting + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { + if (!reader.isRegisteredAvailable() && reader.isAvailable()) { + enqueueAvailableReader(reader); + } + } + + @VisibleForTesting + void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- actually, we can inline this into the first one, make this `private`, and reduce `Mockito` use in the tests if we just made the `availableReaders` field accessible to the tests, e.g. ``` /** * Accesses internal state to verify reader registration in the unit tests. * * <p><strong>Do not use anywhere else!</strong> * * @return readers which are enqueued available for transferring data */ @VisibleForTesting ArrayDeque<SequenceNumberingViewReader> getAvailableReaders() { return availableReaders; } ```
---