[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-21 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r429018568



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
##
@@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws 
Exception {
}
}
 
+   @Test
+   public void testConcurrentGetNextBufferAndRelease() throws Exception {

Review comment:
   I thought of some other considerations for this issue to share.
   
   In the ITCase, even though we can reproduce some potential concurrent bugs, 
it is hard to debug and find the root cause, because it is involved in all the 
components. I really have such feeling when debugging the 
`UnalignedCheckpointITCase` these days.
   
   Reversely, unit test only works on two concurrent methods directly, so it is 
easy to find the bugs by limiting the scopes/components. We already had 6 unit 
tests written by concurrent way in `RemoteInputChannelTest` before, to 
guarantee the stability among different concurrent methods executed by task 
thread, netty thread, canceler thread separately.  If replaced by ITCase, we 
need to debug among all these methods to find the potential root cause.
   
   In general, it is better for unit tests only focus on one component or less, 
otherwise we should rely on ITCase. In this case, we only limit the scope 
inside `RemoteInputChannel` component, so it also makes sense from this aspect. 
   
   Anyway besides the pros I mentioned above for unit tests, I also agree that 
the cons you concerned, merely the pros are a bit more than cons on my side. :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-20 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428436876



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
##
@@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws 
Exception {
}
}
 
+   @Test
+   public void testConcurrentGetNextBufferAndRelease() throws Exception {

Review comment:
   I am not quite sure whether ITCase can stable reproduce the concurrent 
issues if not constructed well. 
   
   E.g. we need to guarantee that when the canceler thread is releasing the 
channel, this channel must already be queued into the input gate with data in 
`receivedBuffers`, then it can simulate the scenario of executing 
`releaseAllResources` and `getNextBuffers` concurrently.
   
   Unit test can work on these two methods directly to stable reproduce the 
potential bugs, but ITCase might be hard to control and it might never have the 
chance to enter this path in practice.
   
   I found most of the previous race condition bugs in core codes was just 
missing such concurrent unit tests before, and simply verify the results by 
executing related methods by sequence, not concurrently. 
   
   E.g. If we execute the #releaseAllResources() in different steps of 
`#getNextBuffer`, it would have different effects. So maybe only unit test can 
cover all the potential possibilities if we are not sure ITCase can achieve it.
 
   ```
   Optional getNextBuffer() {
1. execute #releaseAllResources() here
 
checkState(!isReleased.get(), "Queried for a buffer after 
channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a 
buffer before requesting a queue.");
   
checkError();
   
final Buffer next;
final boolean moreAvailable;
   
2. execute #releaseAllResources() here
   
synchronized (receivedBuffers) {
next = receivedBuffers.poll();
moreAvailable = !receivedBuffers.isEmpty();
}
   
3. execute #releaseAllResources() here
   
numBytesIn.inc(next.getSize());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next, 
moreAvailable, 0));
}
   
   ```
   
   Maybe it seems complex to let unit tests handle such scenarios. If ITCase 
can handle this work well, i am happy to make only simple unit tests.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-20 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428436876



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
##
@@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws 
Exception {
}
}
 
+   @Test
+   public void testConcurrentGetNextBufferAndRelease() throws Exception {

Review comment:
   I am not quite sure whether ITCase can stable reproduce the concurrent 
issues if not constructed well. 
   
   E.g. we need to guarantee that when the canceler thread is releasing the 
channel, this channel must already be queued into the input gate with data in 
`receivedBuffers`, then it can simulate the scenario of executing 
`releaseAllResources` and `getNextBuffers` concurrently.
   
   Unit test can work on these two methods directly to stable reproduce the 
potential bugs, but ITCase might be hard to control and it might never have the 
chance to enter this path in practice.
   
   I found most of the previous race condition bugs in core codes was just 
missing such concurrent unit tests before, and simply verify the results by 
executing related methods by sequence, not concurrently. 
   
   E.g. If we execute the #releaseAllResources() in different steps of 
`#getNextBuffer`, it would have different effects. So I think only unit test 
can cover all the potential possibilities if we are not sure ITCase can achieve 
it.
   
 
   ```
   Optional getNextBuffer() {
1. execute #releaseAllResources() here
 
checkState(!isReleased.get(), "Queried for a buffer after 
channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a 
buffer before requesting a queue.");
   
checkError();
   
final Buffer next;
final boolean moreAvailable;
   
2. execute #releaseAllResources() here
   
synchronized (receivedBuffers) {
next = receivedBuffers.poll();
moreAvailable = !receivedBuffers.isEmpty();
}
   
3. execute #releaseAllResources() here
   
numBytesIn.inc(next.getSize());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next, 
moreAvailable, 0));
}
   
   ```
   
   Maybe it seems complex to let unit tests handle such scenarios. If ITCase 
can handle this work well, i am happy to make only simple unit tests.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-20 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428436876



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
##
@@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws 
Exception {
}
}
 
+   @Test
+   public void testConcurrentGetNextBufferAndRelease() throws Exception {

Review comment:
   I am not quite sure whether ITCase can stable reproduce the concurrent 
issues if not constructed well. 
   
   E.g. we need to guarantee that when the canceler thread is releasing the 
channel, this channel must already be queued into the input gate with data in 
`receivedBuffers`, then it can construct the scenario of executing 
`releaseAllResources` and `getNextBuffers` concurrently.
   
   Unit test can work on these two methods directly to stable reproduce the 
potential bugs, but ITCase might be hard to control and it might never have the 
chance to enter this path in practice.
   
   I found most of the previous race condition bugs in core codes was just 
missing such concurrent unit tests before, and simply verify the results by 
executing related methods by sequence, not concurrently. 
   
   E.g. If we execute the #releaseAllResources() in different steps of 
`#getNextBuffer`, it would have different effects. So I think only unit test 
can cover all the potential possibilities if we are not sure ITCase can achieve 
it.
   
 
   ```
   Optional getNextBuffer() {
1. execute #releaseAllResources() here
 
checkState(!isReleased.get(), "Queried for a buffer after 
channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a 
buffer before requesting a queue.");
   
checkError();
   
final Buffer next;
final boolean moreAvailable;
   
2. execute #releaseAllResources() here
   
synchronized (receivedBuffers) {
next = receivedBuffers.poll();
moreAvailable = !receivedBuffers.isEmpty();
}
   
3. execute #releaseAllResources() here
   
numBytesIn.inc(next.getSize());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next, 
moreAvailable, 0));
}
   
   ```
   
   Maybe it seems complex to let unit tests handle such scenarios. If ITCase 
can handle this work well, i am happy to make only simple unit tests.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-20 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428436876



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
##
@@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws 
Exception {
}
}
 
+   @Test
+   public void testConcurrentGetNextBufferAndRelease() throws Exception {

Review comment:
   I am not quite sure whether ITCase can stable reproduce the concurrent 
issues if not constructed well. 
   
   E.g. we need to guarantee that when the canceler thread is releasing the 
channel, this channel must already be queued into the input gate with data in 
`receivedBuffers`, then it can construct the scenario of executing 
`releaseAllResources` and `getNextBuffers` concurrently.
   
   Unit test can work on these two methods directly to stable reproduce the 
potential bugs, but ITCase might be hard to control and it might never have the 
chance to enter this path in practice.
   
   I found most of the previous race condition bugs in core codes was just 
missing such concurrent unit tests before, and simply verify the results by 
executing related methods by sequence, not concurrently. 
   
   E.g. If we execute the #releaseAllResources() in different steps of 
`#getNextBuffer`, it would have different effects. So I think only unit test 
can cover all the potential possibilities if we are not sure ITCase can achieve 
it.
   
   ```
   Optional getNextBuffer() {
1. execute #releaseAllResources() here
 
checkState(!isReleased.get(), "Queried for a buffer after 
channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a 
buffer before requesting a queue.");
   
checkError();
   
final Buffer next;
final boolean moreAvailable;
   
2. execute #releaseAllResources() here
   
synchronized (receivedBuffers) {
next = receivedBuffers.poll();
moreAvailable = !receivedBuffers.isEmpty();
}
   
3. execute #releaseAllResources() here
   
numBytesIn.inc(next.getSize());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next, 
moreAvailable, 0));
}
   ```
   
   Maybe it seems complex to let unit tests handle such scenarios. If ITCase 
can handle this work well, i am happy to make only simple unit tests.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-20 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428436876



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
##
@@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws 
Exception {
}
}
 
+   @Test
+   public void testConcurrentGetNextBufferAndRelease() throws Exception {

Review comment:
   I am not quite sure whether ITCase can stable reproduce the concurrent 
issues if not constructed well. 
   
   E.g. we need to guarantee that when the canceler thread is releasing the 
channel, this channel must already be queued into the input gate with data in 
`receivedBuffers`, then it can construct the scenario of executing 
`releaseAllResources` and `getNextBuffers` concurrently.
   
   Unit test can work on these two methods directly to stable reproduce the 
potential bugs, but ITCase might be hard to control and it might never have the 
chance to enter this path in practice.
   
   I found most of the previous race condition bugs in core codes was just 
missing such concurrent unit tests before, and simply verify the results by 
executing related methods by sequence, not concurrently. 
   
   E.g. If we execute the #releaseAllResources() in different steps of 
`#getNextBuffer`, it would have different effects. So I think only unit test 
can cover all the potential possibilities if we are not sure ITCase can achieve 
it.
   
   ```
   Optional getNextBuffer() {
// 1. execute #releaseAllResources() here
 
checkState(!isReleased.get(), "Queried for a buffer after 
channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a 
buffer before requesting a queue.");
   
checkError();
   
final Buffer next;
final boolean moreAvailable;
   
//2. execute #releaseAllResources() here
   
synchronized (receivedBuffers) {
next = receivedBuffers.poll();
moreAvailable = !receivedBuffers.isEmpty();
}
   
//3. execute #releaseAllResources() here
   
numBytesIn.inc(next.getSize());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next, 
moreAvailable, 0));
}
   ```
   
   Maybe it seems complex to let unit tests handle such scenarios. If ITCase 
can handle this work well, i am happy to make only simple unit tests.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-20 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428436876



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
##
@@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws 
Exception {
}
}
 
+   @Test
+   public void testConcurrentGetNextBufferAndRelease() throws Exception {

Review comment:
   I am not quite sure whether ITCase can stable reproduce the concurrent 
issues if not constructed well. 
   
   E.g. we need to guarantee that when the canceler thread is releasing the 
channel, this channel must already be queued into the input gate with data in 
`receivedBuffers`, then it can construct the scenario of executing 
`releaseAllResources` and `getNextBuffers` concurrently.
   
   Unit test can work on these two methods directly to stable reproduce the 
potential bugs, but ITCase might be hard to control and it might never have the 
chance to enter this path in practice.
   
   I found most of the previous race condition bugs in core codes was just 
missing such concurrent unit tests before, and simply verify the results by 
executing related methods by sequence, not concurrently. 
   
   E.g. If we execute the #releaseAllResources() in different steps of 
`#getNextBuffer`, it would have different effects. So I think only unit test 
can cover all the potential possibilities if we are not sure ITCase can achieve 
it.
   
   ```
   Optional getNextBuffer() throws IOException {
// 1. execute #releaseAllResources() here
 
checkState(!isReleased.get(), "Queried for a buffer after 
channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a 
buffer before requesting a queue.");
   
checkError();
   
final Buffer next;
final boolean moreAvailable;
   
//2. execute #releaseAllResources() here
   
synchronized (receivedBuffers) {
next = receivedBuffers.poll();
moreAvailable = !receivedBuffers.isEmpty();
}
   
//3. execute #releaseAllResources() here
   
numBytesIn.inc(next.getSize());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next, 
moreAvailable, 0));
}
   ```
   
   Maybe it seems complex to let unit tests handle such scenarios. If ITCase 
can handle this work well, i am happy to make only simple unit tests.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-20 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428436876



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
##
@@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws 
Exception {
}
}
 
+   @Test
+   public void testConcurrentGetNextBufferAndRelease() throws Exception {

Review comment:
   I am not quite sure whether ITCase can stable reproduce the concurrent 
issues if not constructed well. 
   
   E.g. we need to guarantee that when the canceler thread is releasing the 
channel, this channel must already be queued into the input gate with data in 
`receivedBuffers`, then it can construct the scenario of executing 
`releaseAllResources` and `getNextBuffers` concurrently.
   
   Unit test can work on these two methods directly to stable reproduce the 
potential bugs, but ITCase might be hard to control and it might never have the 
chance to enter this path in practice.
   
   I found most of the previous race condition bugs in core codes was just 
missing such concurrent unit tests before, and simply verify the results by 
executing related methods by sequence, not concurrently. 
   
   E.g. If we execute the #releaseAllResources() in different steps of 
`#getNextBuffer`, it would have different effects. So I think only unit test 
can cover all the potential possibilities if we are not sure ITCase can achieve 
it.
   
   ```
   Optional getNextBuffer() throws IOException {
**// 1. execute #releaseAllResources() here**
 
checkState(!isReleased.get(), "Queried for a buffer after 
channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a 
buffer before requesting a queue.");
   
checkError();
   
final Buffer next;
final boolean moreAvailable;
   
**//2. execute #releaseAllResources() here**
   
synchronized (receivedBuffers) {
next = receivedBuffers.poll();
moreAvailable = !receivedBuffers.isEmpty();
}
   
**//3. execute #releaseAllResources() here**
   
numBytesIn.inc(next.getSize());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next, 
moreAvailable, 0));
}
   ```
   
   Maybe it seems complex to let unit tests handle such scenarios. If ITCase 
can handle this work well, i am happy to make only simple unit tests.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-20 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428431678



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
moreAvailable = !receivedBuffers.isEmpty();
}
 
+   if (next == null) {

Review comment:
   FYI: before my changes, we can reproduce these two potential issues in 
my below introduced unit tests stable. One case is throwing NPE exception 
sometimes, and another case is throwing netty stack exception when same buffer 
recycled twice.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-20 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428430651



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
moreAvailable = !receivedBuffers.isEmpty();
}
 
+   if (next == null) {

Review comment:
   Let me further explain it. 
   
   The canceler thread will close the `InputGate` in advance so the task thread 
might be aware of the released state to exit early. When the canceler thread 
called `RemoteInputChannel#releaseAllResources` before, then all the buffers in 
`receivedBuffers` would be drained and recycled. 
   
   But the task thread was not aware of this then, and it would probably call 
`getNextBuffer` to get a `null` buffer here. We only expect the `null` buffer 
in the case of `released` channel, so we throw the expected 
`CancelTaskException` to make task thread exit. If not released case, there 
must be some logic bugs. E.g this channel notifies gate of available data by 
fault.  So we throw `IllegalStateException` for such case, to avoid the 
misleading `NullPointerException` when reference with the buffer below.
   
   My fix in `#releaseAllResources` is only for avoiding concurrent pulling 
`receivedBuffers` by both task thread and canceler thread, which might cause 
recycle the same buffer twice and misleading exception thrown by netty stack.  
   
   Another option for modifying the logic in `#getNextBuffer` like below: 
   
   ```
 synchronized (receivedBuffers) {
if (isReleased.get()) {
throw new CancelTaskException("Queried for a 
buffer after channel has been released.");
}
next = receivedBuffers.poll();
moreAvailable = !receivedBuffers.isEmpty();
}
   ```
   
   But it might still make sense to also judge whether the buffer is `null` out 
of the `synchronized`, which is not the race condition case, and only for 
avoiding potential logic bugs in data notification logic to cause misleading 
NPE. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-20 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428430651



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
moreAvailable = !receivedBuffers.isEmpty();
}
 
+   if (next == null) {

Review comment:
   Let me further explain it. 
   
   The canceler thread will close the `InputGate` in advance so the task thread 
might be aware of the released state to exit early. So when the canceler thread 
called `RemoteInputChannel#releaseAllResources` before, then all the buffers in 
`receivedBuffers` were already drained and recycled. 
   
   But the task thread was not aware of this then, and it would probably call 
`getNextBuffer` to get a `null` buffer here. We only expect the `null` buffer 
in the case of `released` channel, so we throw the expected 
`CancelTaskException` to make task thread exit. If not released case, there 
must be some logic bugs. E.g this channel notifies gate of available data by 
fault.  So we throw `IllegalStateException` for such case, to avoid the 
misleading `NullPointerException` when reference with the buffer below.
   
   My fix in `#releaseAllResources` is only for avoiding concurrent pulling 
`receivedBuffers` by both task thread and canceler thread, which might cause 
recycle the same buffer twice and misleading exception thrown by netty stack.  
   
   Another option for modifying the logic in `#getNextBuffer` like below: 
   
   ```
 synchronized (receivedBuffers) {
if (isReleased.get()) {
throw new CancelTaskException("Queried for a 
buffer after channel has been released.");
}
next = receivedBuffers.poll();
moreAvailable = !receivedBuffers.isEmpty();
}
   ```
   
   But it might still make sense to also judge whether the buffer is `null` out 
of the `synchronized`, which is not the race condition case, and only for 
avoiding potential logic bugs in data notification logic to cause misleading 
NPE. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-20 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428430651



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
moreAvailable = !receivedBuffers.isEmpty();
}
 
+   if (next == null) {

Review comment:
   Let me further explain it. 
   
   The canceler thread will close the `InputGate` in advance so the task thread 
might be aware of the released state to exit early. So when the canceler thread 
called `RemoteInputChannel#releaseAllResources` before, then all the buffers in 
`receivedBuffers` were already drained and recycled. 
   
   But the task thread was not aware of this then, and it would probably call 
`getNextBuffer` to get a `null` buffer here. We only expect the `null` buffer 
in the case of `released` channel, so we throw the expected 
`CancelTaskException` to make task thread exit. If not released case, there 
must be some logic bugs. E.g this channel notifies gate of available data by 
fault.  So we throw `IllegalStateException` for such case, to avoid the 
misleading `NullPointerException` when reference with the buffer below.
   
   My fix in `#releaseAllResources` is only for avoiding concurrent pulling 
`receivedBuffers` by both task thread and canceler thread, which might cause 
recycle the same buffer twice and misleading exception thrown by netty stack.  
   
   Another option for modifying the logic in `#getNextBuffer` like below: 
   
   ```
   synchronized (receivedBuffers) {
if (isReleased.get()) {
throw new CancelTaskException("Queried for a 
buffer after channel has been released.");
}
next = receivedBuffers.poll();
moreAvailable = !receivedBuffers.isEmpty();
}
   ```
   
   But it might still make sense to also judge whether the buffer is `null` out 
of the `synchronized`, which is not the race condition case, and only for 
avoiding potential logic bugs in data notification logic to cause misleading 
NPE. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12261: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-20 Thread GitBox


zhijiangW commented on a change in pull request #12261:
URL: https://github.com/apache/flink/pull/12261#discussion_r428139107



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -181,6 +181,14 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
moreAvailable = !receivedBuffers.isEmpty();
}
 
+   if (next == null) {

Review comment:
   I guess it can probably happen in practice. When the canceler thread 
already released the respective input channel, but the task thread might still 
call `getNextBuffer` in the case of released `receivedBuffers`, then it can get 
the `null` buffer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org