zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428430651



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
                        moreAvailable = !receivedBuffers.isEmpty();
                }
 
+               if (next == null) {

Review comment:
       Let me further explain it. 
   
   The canceler thread will close the `InputGate` in advance so the task thread 
might be aware of the released state to exit early. So when the canceler thread 
called `RemoteInputChannel#releaseAllResources` before, then all the buffers in 
`receivedBuffers` were already drained and recycled. 
   
   But the task thread was not aware of this then, and it would probably call 
`getNextBuffer` to get a `null` buffer here. We only expect the `null` buffer 
in the case of `released` channel, so we throw the expected 
`CancelTaskException` to make task thread exit. If not released case, there 
must be some logic bugs. E.g this channel notifies gate of available data by 
fault.  So we throw `IllegalStateException` for such case, to avoid the 
misleading `NullPointerException` when reference with the buffer below.
   
   My fix in `#releaseAllResources` is only for avoiding concurrent pulling 
`receivedBuffers` by both task thread and canceler thread, which might cause 
recycle the same buffer twice and misleading exception thrown by netty stack.  
   
   Another option for modifying the logic in `#getNextBuffer` like below: 
   
   ```
   synchronized (receivedBuffers) {
                        if (isReleased.get()) {
                                throw new CancelTaskException("Queried for a 
buffer after channel has been released.");
                        }
                        next = receivedBuffers.poll();
                        moreAvailable = !receivedBuffers.isEmpty();
                }
   ```
   
   But it might still make sense to also judge whether the buffer is `null` out 
of the `synchronized`, which is not the race condition case, and only for 
avoiding potential logic bugs in data notification logic to cause misleading 
NPE. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to