dawidwys commented on a change in pull request #17663:
URL: https://github.com/apache/flink/pull/17663#discussion_r742701577
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -338,10 +338,13 @@ void releaseAllResources() throws IOException {
@Override
void announceBufferSize(int newBufferSize) {
- checkState(!isReleased, "Channel released.");
-
- ResultSubpartitionView subpartitionView =
checkNotNull(this.subpartitionView);
- subpartitionView.notifyNewBufferSize(newBufferSize);
+ ResultSubpartitionView view = this.subpartitionView;
+ // if releaseAllResources would be called from the mailbox thread it
is possible that
Review comment:
I am not entirely sure if the description here is correct, or better
said if it is precise.
I believe calling any of the methods from a mailbox thread is not the reason
for the problem. Correct me if I am wrong, but I believe the problem is exactly
the same as described in `LocalInputChannel:198-204`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -338,10 +338,13 @@ void releaseAllResources() throws IOException {
@Override
void announceBufferSize(int newBufferSize) {
- checkState(!isReleased, "Channel released.");
-
- ResultSubpartitionView subpartitionView =
checkNotNull(this.subpartitionView);
- subpartitionView.notifyNewBufferSize(newBufferSize);
+ ResultSubpartitionView view = this.subpartitionView;
+ // if releaseAllResources would be called from the mailbox thread it
is possible that
+ // announceBufferSize would be called after it.
Review comment:
```suggestion
// if releaseAllResources was called from the mailbox thread it is
possible that
// announceBufferSize would be called after it.
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -338,10 +338,13 @@ void releaseAllResources() throws IOException {
@Override
void announceBufferSize(int newBufferSize) {
- checkState(!isReleased, "Channel released.");
-
- ResultSubpartitionView subpartitionView =
checkNotNull(this.subpartitionView);
- subpartitionView.notifyNewBufferSize(newBufferSize);
+ ResultSubpartitionView view = this.subpartitionView;
Review comment:
I guess you can move that inside of the `if` block.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -338,10 +338,13 @@ void releaseAllResources() throws IOException {
@Override
void announceBufferSize(int newBufferSize) {
- checkState(!isReleased, "Channel released.");
-
- ResultSubpartitionView subpartitionView =
checkNotNull(this.subpartitionView);
- subpartitionView.notifyNewBufferSize(newBufferSize);
+ ResultSubpartitionView view = this.subpartitionView;
+ // if releaseAllResources would be called from the mailbox thread it
is possible that
Review comment:
BTW, don't we have a bigger problem, that we try to perform debloating
even if a `SingleInputGate` might've finished already? What happens if we have
two input gates and one of the gates finishes early (receives the end of
partition), while the other processes records continuously? Could you try that
out?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]