zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428436876
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ########## @@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws Exception { } } + @Test + public void testConcurrentGetNextBufferAndRelease() throws Exception { Review comment: I am not quite sure whether ITCase can stable reproduce the concurrent issues if not constructed well. E.g. we need to guarantee that when the canceler thread is releasing the channel, this channel must already be queued into the input gate with data in `receivedBuffers`, then it can construct the scenario of executing `releaseAllResources` and `getNextBuffers` concurrently. Unit test can work on these two methods directly to stable reproduce the potential bugs, but ITCase might be hard to control and it might never have the chance to enter this path in practice. I found most of the previous race condition bugs in core codes was just missing such concurrent unit tests before, and simply verify the results by executing related methods by sequence, not concurrently. E.g. If we execute the #releaseAllResources() in different steps of `#getNextBuffer`, it would have different effects. So I think only unit test can cover all the potential possibilities if we are not sure ITCase can achieve it. ``` Optional<BufferAndAvailability> getNextBuffer() throws IOException { **// 1. execute #releaseAllResources() here** checkState(!isReleased.get(), "Queried for a buffer after channel has been closed."); checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue."); checkError(); final Buffer next; final boolean moreAvailable; **//2. execute #releaseAllResources() here** synchronized (receivedBuffers) { next = receivedBuffers.poll(); moreAvailable = !receivedBuffers.isEmpty(); } **//3. execute #releaseAllResources() here** numBytesIn.inc(next.getSize()); numBuffersIn.inc(); return Optional.of(new BufferAndAvailability(next, moreAvailable, 0)); } ``` Maybe it seems complex to let unit tests handle such scenarios. If ITCase can handle this work well, i am happy to make only simple unit tests. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org