zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428436876



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
##########
@@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws 
Exception {
                }
        }
 
+       @Test
+       public void testConcurrentGetNextBufferAndRelease() throws Exception {

Review comment:
       I am not quite sure whether ITCase can stable reproduce the concurrent 
issues if not constructed well. 
   
   E.g. we need to guarantee that when the canceler thread is releasing the 
channel, this channel must already be queued into the input gate with data in 
`receivedBuffers`, then it can construct the scenario of executing 
`releaseAllResources` and `getNextBuffers` concurrently.
   
   Unit test can work on these two methods directly to stable reproduce the 
potential bugs, but ITCase might be hard to control and it might never have the 
chance to enter this path in practice.
   
   I found most of the previous race condition bugs in core codes was just 
missing such concurrent unit tests before, and simply verify the results by 
executing related methods by sequence, not concurrently. 
   
   E.g. If we execute the #releaseAllResources() in different steps of 
`#getNextBuffer`, it would have different effects. So I think only unit test 
can cover all the potential possibilities if we are not sure ITCase can achieve 
it.
   
           ```
   Optional<BufferAndAvailability> getNextBuffer() throws IOException {
                    // 1. execute #releaseAllResources() here
     
                checkState(!isReleased.get(), "Queried for a buffer after 
channel has been closed.");
                checkState(partitionRequestClient != null, "Queried for a 
buffer before requesting a queue.");
   
                checkError();
   
                final Buffer next;
                final boolean moreAvailable;
   
                    //2. execute #releaseAllResources() here
   
                synchronized (receivedBuffers) {
                        next = receivedBuffers.poll();
                        moreAvailable = !receivedBuffers.isEmpty();
                }
   
                    //3. execute #releaseAllResources() here
   
                numBytesIn.inc(next.getSize());
                numBuffersIn.inc();
                return Optional.of(new BufferAndAvailability(next, 
moreAvailable, 0));
        }
   ```
   
   Maybe it seems complex to let unit tests handle such scenarios. If ITCase 
can handle this work well, i am happy to make only simple unit tests.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to