zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428430651
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException { moreAvailable = !receivedBuffers.isEmpty(); } + if (next == null) { Review comment: Let me further explain it. The canceler thread will close the `InputGate` in advance so the task thread might be aware of the released state to exit early. When the canceler thread called `RemoteInputChannel#releaseAllResources` before, then all the buffers in `receivedBuffers` would be drained and recycled. But the task thread was not aware of this then, and it would probably call `getNextBuffer` to get a `null` buffer here. We only expect the `null` buffer in the case of `released` channel, so we throw the expected `CancelTaskException` to make task thread exit. If not released case, there must be some logic bugs. E.g this channel notifies gate of available data by fault. So we throw `IllegalStateException` for such case, to avoid the misleading `NullPointerException` when reference with the buffer below. My fix in `#releaseAllResources` is only for avoiding concurrent pulling `receivedBuffers` by both task thread and canceler thread, which might cause recycle the same buffer twice and misleading exception thrown by netty stack. Another option for modifying the logic in `#getNextBuffer` like below: ``` synchronized (receivedBuffers) { if (isReleased.get()) { throw new CancelTaskException("Queried for a buffer after channel has been released."); } next = receivedBuffers.poll(); moreAvailable = !receivedBuffers.isEmpty(); } ``` But it might still make sense to also judge whether the buffer is `null` out of the `synchronized`, which is not the race condition case, and only for avoiding potential logic bugs in data notification logic to cause misleading NPE. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org