[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r429018568 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws Exception { } } + @Test + public void testConcurrentGetNextBufferAndRelease() throws Exception { Review comment: I thought of some other considerations for this issue to share. In the ITCase, even though we can reproduce some potential concurrent bugs, it is hard to debug and find the root cause, because it is involved in all the components. I really have such feeling when debugging the `UnalignedCheckpointITCase` these days. Reversely, unit test only works on two concurrent methods directly, so it is easy to find the bugs by limiting the scopes/components. We already had 6 unit tests written by concurrent way in `RemoteInputChannelTest` before, to guarantee the stability among different concurrent methods executed by task thread, netty thread, canceler thread separately. If replaced by ITCase, we need to debug among all these methods to find the potential root cause. In general, it is better for unit tests only focus on one component or less, otherwise we should rely on ITCase. In this case, we only limit the scope inside `RemoteInputChannel` component, so it also makes sense from this aspect. Anyway besides the pros I mentioned above for unit tests, I also agree that the cons you concerned, merely the pros are a bit more than cons on my side. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428436876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws Exception { } } + @Test + public void testConcurrentGetNextBufferAndRelease() throws Exception { Review comment: I am not quite sure whether ITCase can stable reproduce the concurrent issues if not constructed well. E.g. we need to guarantee that when the canceler thread is releasing the channel, this channel must already be queued into the input gate with data in `receivedBuffers`, then it can simulate the scenario of executing `releaseAllResources` and `getNextBuffers` concurrently. Unit test can work on these two methods directly to stable reproduce the potential bugs, but ITCase might be hard to control and it might never have the chance to enter this path in practice. I found most of the previous race condition bugs in core codes was just missing such concurrent unit tests before, and simply verify the results by executing related methods by sequence, not concurrently. E.g. If we execute the #releaseAllResources() in different steps of `#getNextBuffer`, it would have different effects. So maybe only unit test can cover all the potential possibilities if we are not sure ITCase can achieve it. ``` Optional getNextBuffer() { 1. execute #releaseAllResources() here checkState(!isReleased.get(), "Queried for a buffer after channel has been closed."); checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue."); checkError(); final Buffer next; final boolean moreAvailable; 2. execute #releaseAllResources() here synchronized (receivedBuffers) { next = receivedBuffers.poll(); moreAvailable = !receivedBuffers.isEmpty(); } 3. execute #releaseAllResources() here numBytesIn.inc(next.getSize()); numBuffersIn.inc(); return Optional.of(new BufferAndAvailability(next, moreAvailable, 0)); } ``` Maybe it seems complex to let unit tests handle such scenarios. If ITCase can handle this work well, i am happy to make only simple unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428436876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws Exception { } } + @Test + public void testConcurrentGetNextBufferAndRelease() throws Exception { Review comment: I am not quite sure whether ITCase can stable reproduce the concurrent issues if not constructed well. E.g. we need to guarantee that when the canceler thread is releasing the channel, this channel must already be queued into the input gate with data in `receivedBuffers`, then it can simulate the scenario of executing `releaseAllResources` and `getNextBuffers` concurrently. Unit test can work on these two methods directly to stable reproduce the potential bugs, but ITCase might be hard to control and it might never have the chance to enter this path in practice. I found most of the previous race condition bugs in core codes was just missing such concurrent unit tests before, and simply verify the results by executing related methods by sequence, not concurrently. E.g. If we execute the #releaseAllResources() in different steps of `#getNextBuffer`, it would have different effects. So I think only unit test can cover all the potential possibilities if we are not sure ITCase can achieve it. ``` Optional getNextBuffer() { 1. execute #releaseAllResources() here checkState(!isReleased.get(), "Queried for a buffer after channel has been closed."); checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue."); checkError(); final Buffer next; final boolean moreAvailable; 2. execute #releaseAllResources() here synchronized (receivedBuffers) { next = receivedBuffers.poll(); moreAvailable = !receivedBuffers.isEmpty(); } 3. execute #releaseAllResources() here numBytesIn.inc(next.getSize()); numBuffersIn.inc(); return Optional.of(new BufferAndAvailability(next, moreAvailable, 0)); } ``` Maybe it seems complex to let unit tests handle such scenarios. If ITCase can handle this work well, i am happy to make only simple unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428436876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws Exception { } } + @Test + public void testConcurrentGetNextBufferAndRelease() throws Exception { Review comment: I am not quite sure whether ITCase can stable reproduce the concurrent issues if not constructed well. E.g. we need to guarantee that when the canceler thread is releasing the channel, this channel must already be queued into the input gate with data in `receivedBuffers`, then it can construct the scenario of executing `releaseAllResources` and `getNextBuffers` concurrently. Unit test can work on these two methods directly to stable reproduce the potential bugs, but ITCase might be hard to control and it might never have the chance to enter this path in practice. I found most of the previous race condition bugs in core codes was just missing such concurrent unit tests before, and simply verify the results by executing related methods by sequence, not concurrently. E.g. If we execute the #releaseAllResources() in different steps of `#getNextBuffer`, it would have different effects. So I think only unit test can cover all the potential possibilities if we are not sure ITCase can achieve it. ``` Optional getNextBuffer() { 1. execute #releaseAllResources() here checkState(!isReleased.get(), "Queried for a buffer after channel has been closed."); checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue."); checkError(); final Buffer next; final boolean moreAvailable; 2. execute #releaseAllResources() here synchronized (receivedBuffers) { next = receivedBuffers.poll(); moreAvailable = !receivedBuffers.isEmpty(); } 3. execute #releaseAllResources() here numBytesIn.inc(next.getSize()); numBuffersIn.inc(); return Optional.of(new BufferAndAvailability(next, moreAvailable, 0)); } ``` Maybe it seems complex to let unit tests handle such scenarios. If ITCase can handle this work well, i am happy to make only simple unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428436876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws Exception { } } + @Test + public void testConcurrentGetNextBufferAndRelease() throws Exception { Review comment: I am not quite sure whether ITCase can stable reproduce the concurrent issues if not constructed well. E.g. we need to guarantee that when the canceler thread is releasing the channel, this channel must already be queued into the input gate with data in `receivedBuffers`, then it can construct the scenario of executing `releaseAllResources` and `getNextBuffers` concurrently. Unit test can work on these two methods directly to stable reproduce the potential bugs, but ITCase might be hard to control and it might never have the chance to enter this path in practice. I found most of the previous race condition bugs in core codes was just missing such concurrent unit tests before, and simply verify the results by executing related methods by sequence, not concurrently. E.g. If we execute the #releaseAllResources() in different steps of `#getNextBuffer`, it would have different effects. So I think only unit test can cover all the potential possibilities if we are not sure ITCase can achieve it. ``` Optional getNextBuffer() { 1. execute #releaseAllResources() here checkState(!isReleased.get(), "Queried for a buffer after channel has been closed."); checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue."); checkError(); final Buffer next; final boolean moreAvailable; 2. execute #releaseAllResources() here synchronized (receivedBuffers) { next = receivedBuffers.poll(); moreAvailable = !receivedBuffers.isEmpty(); } 3. execute #releaseAllResources() here numBytesIn.inc(next.getSize()); numBuffersIn.inc(); return Optional.of(new BufferAndAvailability(next, moreAvailable, 0)); } ``` Maybe it seems complex to let unit tests handle such scenarios. If ITCase can handle this work well, i am happy to make only simple unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428436876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws Exception { } } + @Test + public void testConcurrentGetNextBufferAndRelease() throws Exception { Review comment: I am not quite sure whether ITCase can stable reproduce the concurrent issues if not constructed well. E.g. we need to guarantee that when the canceler thread is releasing the channel, this channel must already be queued into the input gate with data in `receivedBuffers`, then it can construct the scenario of executing `releaseAllResources` and `getNextBuffers` concurrently. Unit test can work on these two methods directly to stable reproduce the potential bugs, but ITCase might be hard to control and it might never have the chance to enter this path in practice. I found most of the previous race condition bugs in core codes was just missing such concurrent unit tests before, and simply verify the results by executing related methods by sequence, not concurrently. E.g. If we execute the #releaseAllResources() in different steps of `#getNextBuffer`, it would have different effects. So I think only unit test can cover all the potential possibilities if we are not sure ITCase can achieve it. ``` Optional getNextBuffer() { // 1. execute #releaseAllResources() here checkState(!isReleased.get(), "Queried for a buffer after channel has been closed."); checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue."); checkError(); final Buffer next; final boolean moreAvailable; //2. execute #releaseAllResources() here synchronized (receivedBuffers) { next = receivedBuffers.poll(); moreAvailable = !receivedBuffers.isEmpty(); } //3. execute #releaseAllResources() here numBytesIn.inc(next.getSize()); numBuffersIn.inc(); return Optional.of(new BufferAndAvailability(next, moreAvailable, 0)); } ``` Maybe it seems complex to let unit tests handle such scenarios. If ITCase can handle this work well, i am happy to make only simple unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428436876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws Exception { } } + @Test + public void testConcurrentGetNextBufferAndRelease() throws Exception { Review comment: I am not quite sure whether ITCase can stable reproduce the concurrent issues if not constructed well. E.g. we need to guarantee that when the canceler thread is releasing the channel, this channel must already be queued into the input gate with data in `receivedBuffers`, then it can construct the scenario of executing `releaseAllResources` and `getNextBuffers` concurrently. Unit test can work on these two methods directly to stable reproduce the potential bugs, but ITCase might be hard to control and it might never have the chance to enter this path in practice. I found most of the previous race condition bugs in core codes was just missing such concurrent unit tests before, and simply verify the results by executing related methods by sequence, not concurrently. E.g. If we execute the #releaseAllResources() in different steps of `#getNextBuffer`, it would have different effects. So I think only unit test can cover all the potential possibilities if we are not sure ITCase can achieve it. ``` Optional getNextBuffer() throws IOException { // 1. execute #releaseAllResources() here checkState(!isReleased.get(), "Queried for a buffer after channel has been closed."); checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue."); checkError(); final Buffer next; final boolean moreAvailable; //2. execute #releaseAllResources() here synchronized (receivedBuffers) { next = receivedBuffers.poll(); moreAvailable = !receivedBuffers.isEmpty(); } //3. execute #releaseAllResources() here numBytesIn.inc(next.getSize()); numBuffersIn.inc(); return Optional.of(new BufferAndAvailability(next, moreAvailable, 0)); } ``` Maybe it seems complex to let unit tests handle such scenarios. If ITCase can handle this work well, i am happy to make only simple unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428436876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws Exception { } } + @Test + public void testConcurrentGetNextBufferAndRelease() throws Exception { Review comment: I am not quite sure whether ITCase can stable reproduce the concurrent issues if not constructed well. E.g. we need to guarantee that when the canceler thread is releasing the channel, this channel must already be queued into the input gate with data in `receivedBuffers`, then it can construct the scenario of executing `releaseAllResources` and `getNextBuffers` concurrently. Unit test can work on these two methods directly to stable reproduce the potential bugs, but ITCase might be hard to control and it might never have the chance to enter this path in practice. I found most of the previous race condition bugs in core codes was just missing such concurrent unit tests before, and simply verify the results by executing related methods by sequence, not concurrently. E.g. If we execute the #releaseAllResources() in different steps of `#getNextBuffer`, it would have different effects. So I think only unit test can cover all the potential possibilities if we are not sure ITCase can achieve it. ``` Optional getNextBuffer() throws IOException { **// 1. execute #releaseAllResources() here** checkState(!isReleased.get(), "Queried for a buffer after channel has been closed."); checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue."); checkError(); final Buffer next; final boolean moreAvailable; **//2. execute #releaseAllResources() here** synchronized (receivedBuffers) { next = receivedBuffers.poll(); moreAvailable = !receivedBuffers.isEmpty(); } **//3. execute #releaseAllResources() here** numBytesIn.inc(next.getSize()); numBuffersIn.inc(); return Optional.of(new BufferAndAvailability(next, moreAvailable, 0)); } ``` Maybe it seems complex to let unit tests handle such scenarios. If ITCase can handle this work well, i am happy to make only simple unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428431678 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException { moreAvailable = !receivedBuffers.isEmpty(); } + if (next == null) { Review comment: FYI: before my changes, we can reproduce these two potential issues in my below introduced unit tests stable. One case is throwing NPE exception sometimes, and another case is throwing netty stack exception when same buffer recycled twice. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428430651 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException { moreAvailable = !receivedBuffers.isEmpty(); } + if (next == null) { Review comment: Let me further explain it. The canceler thread will close the `InputGate` in advance so the task thread might be aware of the released state to exit early. When the canceler thread called `RemoteInputChannel#releaseAllResources` before, then all the buffers in `receivedBuffers` would be drained and recycled. But the task thread was not aware of this then, and it would probably call `getNextBuffer` to get a `null` buffer here. We only expect the `null` buffer in the case of `released` channel, so we throw the expected `CancelTaskException` to make task thread exit. If not released case, there must be some logic bugs. E.g this channel notifies gate of available data by fault. So we throw `IllegalStateException` for such case, to avoid the misleading `NullPointerException` when reference with the buffer below. My fix in `#releaseAllResources` is only for avoiding concurrent pulling `receivedBuffers` by both task thread and canceler thread, which might cause recycle the same buffer twice and misleading exception thrown by netty stack. Another option for modifying the logic in `#getNextBuffer` like below: ``` synchronized (receivedBuffers) { if (isReleased.get()) { throw new CancelTaskException("Queried for a buffer after channel has been released."); } next = receivedBuffers.poll(); moreAvailable = !receivedBuffers.isEmpty(); } ``` But it might still make sense to also judge whether the buffer is `null` out of the `synchronized`, which is not the race condition case, and only for avoiding potential logic bugs in data notification logic to cause misleading NPE. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428430651 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException { moreAvailable = !receivedBuffers.isEmpty(); } + if (next == null) { Review comment: Let me further explain it. The canceler thread will close the `InputGate` in advance so the task thread might be aware of the released state to exit early. So when the canceler thread called `RemoteInputChannel#releaseAllResources` before, then all the buffers in `receivedBuffers` were already drained and recycled. But the task thread was not aware of this then, and it would probably call `getNextBuffer` to get a `null` buffer here. We only expect the `null` buffer in the case of `released` channel, so we throw the expected `CancelTaskException` to make task thread exit. If not released case, there must be some logic bugs. E.g this channel notifies gate of available data by fault. So we throw `IllegalStateException` for such case, to avoid the misleading `NullPointerException` when reference with the buffer below. My fix in `#releaseAllResources` is only for avoiding concurrent pulling `receivedBuffers` by both task thread and canceler thread, which might cause recycle the same buffer twice and misleading exception thrown by netty stack. Another option for modifying the logic in `#getNextBuffer` like below: ``` synchronized (receivedBuffers) { if (isReleased.get()) { throw new CancelTaskException("Queried for a buffer after channel has been released."); } next = receivedBuffers.poll(); moreAvailable = !receivedBuffers.isEmpty(); } ``` But it might still make sense to also judge whether the buffer is `null` out of the `synchronized`, which is not the race condition case, and only for avoiding potential logic bugs in data notification logic to cause misleading NPE. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428430651 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException { moreAvailable = !receivedBuffers.isEmpty(); } + if (next == null) { Review comment: Let me further explain it. The canceler thread will close the `InputGate` in advance so the task thread might be aware of the released state to exit early. So when the canceler thread called `RemoteInputChannel#releaseAllResources` before, then all the buffers in `receivedBuffers` were already drained and recycled. But the task thread was not aware of this then, and it would probably call `getNextBuffer` to get a `null` buffer here. We only expect the `null` buffer in the case of `released` channel, so we throw the expected `CancelTaskException` to make task thread exit. If not released case, there must be some logic bugs. E.g this channel notifies gate of available data by fault. So we throw `IllegalStateException` for such case, to avoid the misleading `NullPointerException` when reference with the buffer below. My fix in `#releaseAllResources` is only for avoiding concurrent pulling `receivedBuffers` by both task thread and canceler thread, which might cause recycle the same buffer twice and misleading exception thrown by netty stack. Another option for modifying the logic in `#getNextBuffer` like below: ``` synchronized (receivedBuffers) { if (isReleased.get()) { throw new CancelTaskException("Queried for a buffer after channel has been released."); } next = receivedBuffers.poll(); moreAvailable = !receivedBuffers.isEmpty(); } ``` But it might still make sense to also judge whether the buffer is `null` out of the `synchronized`, which is not the race condition case, and only for avoiding potential logic bugs in data notification logic to cause misleading NPE. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel
zhijiangW commented on a change in pull request #12261: URL: https://github.com/apache/flink/pull/12261#discussion_r428139107 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException { moreAvailable = !receivedBuffers.isEmpty(); } + if (next == null) { Review comment: I guess it can probably happen in practice. When the canceler thread already released the respective input channel, but the task thread might still call `getNextBuffer` in the case of released `receivedBuffers`, then it can get the `null` buffer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org