[jira] [Updated] (FLINK-13754) Decouple OperatorChain with StreamStatusMaintainer
[ https://issues.apache.org/jira/browse/FLINK-13754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13754: - Summary: Decouple OperatorChain with StreamStatusMaintainer (was: Decouple OperatorChain from StreamStatusMaintainer) > Decouple OperatorChain with StreamStatusMaintainer > -- > > Key: FLINK-13754 > URL: https://issues.apache.org/jira/browse/FLINK-13754 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The current OperatorChain is heavy-weight to take some unrelated roles like > StreamStatusMaintainer. If other components only rely on the > StreamStatusMaintainer, we have to pass the whole OperatorChain. From the > design aspect of single function, we need to decouple > The solution is to refactor the creation of StreamStatusMaintainer and > RecordWriterOutput in StreamTask level, and then break the implementation > cycle dependency between them. The array of RecordWriters which has close > relationship with RecordWriterOutput is created in StreamTask, so it is > reasonable to create them together. The created StreamStatusMaintainer in > StreamTask can be directly referenced by subclasses like > OneInputStreamTask/TwoInputStreamTask. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (FLINK-13754) Decouple OperatorChain from StreamStatusMaintainer
[ https://issues.apache.org/jira/browse/FLINK-13754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13754: - Description: The current OperatorChain is heavy-weight to take some unrelated roles like StreamStatusMaintainer. If other components only rely on the StreamStatusMaintainer, we have to pass the whole OperatorChain. From the design aspect of single function, we need to decouple The solution is to refactor the creation of StreamStatusMaintainer and RecordWriterOutput in StreamTask level, and then break the implementation cycle dependency between them. The array of RecordWriters which has close relationship with RecordWriterOutput is created in StreamTask, so it is reasonable to create them together. The created StreamStatusMaintainer in StreamTask can be directly referenced by subclasses like OneInputStreamTask/TwoInputStreamTask. was: There are two motivations for this refactoring: * It is the precondition for the following work of decoupling the dependency between two inputs status in ForwardingValveOutputHandler. * From the aspect of design rule, the current OperatorChain takes many unrelated roles like StreamStatusMaintainer to make it unmaintainable. The root reason for this case is from the cycle dependency between RecordWriterOutput (created by OperatorChain) and StreamStatusMaintainer. The solution is to refactor the creation of StreamStatusMaintainer and RecordWriterOutput in StreamTask level, and then break the implementation cycle dependency between them. The array of RecordWriters which has close relationship with RecordWriterOutput is created in StreamTask, so it is reasonable to create them together. The created StreamStatusMaintainer in StreamTask can be directly referenced by subclasses like OneInputStreamTask/TwoInputStreamTask. > Decouple OperatorChain from StreamStatusMaintainer > -- > > Key: FLINK-13754 > URL: https://issues.apache.org/jira/browse/FLINK-13754 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The current OperatorChain is heavy-weight to take some unrelated roles like > StreamStatusMaintainer. If other components only rely on the > StreamStatusMaintainer, we have to pass the whole OperatorChain. From the > design aspect of single function, we need to decouple > The solution is to refactor the creation of StreamStatusMaintainer and > RecordWriterOutput in StreamTask level, and then break the implementation > cycle dependency between them. The array of RecordWriters which has close > relationship with RecordWriterOutput is created in StreamTask, so it is > reasonable to create them together. The created StreamStatusMaintainer in > StreamTask can be directly referenced by subclasses like > OneInputStreamTask/TwoInputStreamTask. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16911537#comment-16911537 ] zhijiang commented on FLINK-13798: -- Thanks for this notification [~StephanEwen]. Wish it would make the watermark logic easy to go in the new source operator. But now we have to solve this issue by refactoring TimestampsAndPeriodicWatermarksOperator. :( > Refactor the process of checking stream status while emitting watermark in > source > - > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are two scenarios for the source case: > * In the source WatermarkContext, it would toggle the status as active while > collecting/emitting and the status is checked in RecordWriterOutput. If the > watermark is triggered by timer for AutomaticWatermarkContext, the timer task > would check the status before emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by > timer, but it still relies on RecordWriterOutput to check the status before > emitting. > So the check logic in RecordWriterOutput only makes sense for the last > scenario, and seems redundant for the first scenario. > Even worse, this logic in RecordWriterOutput would bring cycle dependency > with StreamStatusMaintainer, which is a blocker for the following work of > integrating source processing on runtime side. > To solve above issues, the basic idea is to refactor this check logic in > upper layer instead of current low level RecordWriterOutput. The solution is > migrating the check logic from RecordWriterOutput to > TimestampsAndPeriodicWatermarksOperator. And we could further remove the > logic of toggling active in WatermarkContext -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13798: - Description: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are two scenarios for the source case: * In the source WatermarkContext, it would toggle the status as active while collecting/emitting and the status is checked in RecordWriterOutput. If the watermark is triggered by timer for AutomaticWatermarkContext, the timer task would check the status before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer, but it still relies on RecordWriterOutput to check the status before emitting. So the check logic in RecordWriterOutput only makes sense for the last scenario, and seems redundant for the first scenario. Even worse, this logic in RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to refactor this check logic in upper layer instead of current low level RecordWriterOutput. The solution is migrating the check logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could further remove the logic of toggling active in WatermarkContext was: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are two scenarios for the source case: * In the source WatermarkContext, it would toggle the status as active while collecting/emitting and the status is checked in RecordWriterOutput. If the watermark is triggered by timer for AutomaticWatermarkContext, the timer task would check the status before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer, but it still relies on RecordWriterOutput to check the status before emitting. So the check logic in RecordWriterOutput only makes sense for the last scenario, and seems redundant for the first scenario. Even worse, this logic in RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to refactor this check logic in upper layer instead of current low level RecordWriterOutput. The solution is migrating the check logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. > Refactor the process of checking stream status while emitting watermark in > source > - > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are two scenarios for the source case: > * In the source WatermarkContext, it would toggle the status as active while > collecting/emitting and the status is checked in RecordWriterOutput. If the > watermark is triggered by timer for AutomaticWatermarkContext, the timer task > would check the status before emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by > timer, but it still relies on RecordWriterOutput to check the status before > emitting. > So the check logic in RecordWriterOutput only makes sense for the last > scenario, and seems redundant for the first scenario. > Even worse, this logic in RecordWriterOutput would bring cycle dependency > with StreamStatusMaintainer, which is a blocker for the following work of > integrating source processing on runtime side. > To solve above issues, the basic idea is to refactor this check logic in > upper layer instead of current low level RecordWriterOutput. The solution is > migrating the check logic from RecordWriterOutput to > TimestampsAndPeriodicWatermarksOperator. And we could further remove the > logic of toggling active in
[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13798: - Description: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are two scenarios for the source case: * In the source WatermarkContext, it would toggle the status as active while collecting/emitting and the status is checked in RecordWriterOutput. If the watermark is triggered by timer for AutomaticWatermarkContext, the timer task would check the status before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer, but it still relies on RecordWriterOutput to check the status before emitting. So the check logic in RecordWriterOutput only makes sense for the last scenario, and seems redundant for the first scenario. Even worse, this logic in RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to refactor this check logic in upper layer instead of current low level RecordWriterOutput. The solution is migrating the check logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. was: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are two scenarios for the source case: * In the source WatermarkContext, it would toggle the status as active while collecting/emitting and the status is checked in RecordWriterOutput. If the watermark is triggered by timer for AutomaticWatermarkContext, the timer task would check the status before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer, but it still relies on RecordWriterOutput to check the status before emitting. So the check logic in RecordWriterOutput only makes sense for the last scenario, and seems redundant for the first scenario. Even worse, this logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to refactor this check logic in upper layer instead of current low level RecordWriterOutput. The solution is migrating the check logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. > Refactor the process of checking stream status while emitting watermark in > source > - > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are two scenarios for the source case: > * In the source WatermarkContext, it would toggle the status as active while > collecting/emitting and the status is checked in RecordWriterOutput. If the > watermark is triggered by timer for AutomaticWatermarkContext, the timer task > would check the status before emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by > timer, but it still relies on RecordWriterOutput to check the status before > emitting. > So the check logic in RecordWriterOutput only makes sense for the last > scenario, and seems redundant for the first scenario. > Even worse, this logic in RecordWriterOutput would bring cycle dependency > with StreamStatusMaintainer, which is a blocker for the following work of > integrating source processing on runtime side. > To solve above issues, the basic idea is to refactor this check logic in > upper layer instead of current low level RecordWriterOutput. The solution is > migrating the check logic from RecordWriterOutput to > TimestampsAndPeriodicWatermarksOperator. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13798: - Description: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are two scenarios for the source case: * In the source WatermarkContext, it would toggle the status as active while collecting/emitting and the status is checked in RecordWriterOutput. If the watermark is triggered by timer for AutomaticWatermarkContext, the timer task would check the status before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer, but it still relies on RecordWriterOutput to check the status before emitting. So the check logic in RecordWriterOutput only makes sense for the last scenario, and seems redundant for the first scenario. Even worse, this logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to refactor this check logic in upper layer instead of current low level RecordWriterOutput. The solution is migrating the check logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. was: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are two scenarios for the source case: * In the source WatermarkContext, it would toggle the status as active while collecting/emitting and the status is checked in the RecordWriterOutput. If the watermark is triggered by timer for AutomaticWatermarkContext, the timer task would check the status before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer. But it relies on RecordWriterOutput to check the status before emitting. So we can see that the check logic in RecordWriterOutput only makes sense for the last scenario, and seems redundant for the first scenario. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to make this check logic in upper layer instead of current low level RecordWriterOutput. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And the toggling active action could be removed in AutomaticWatermarkContext while emitting records. > Refactor the process of checking stream status while emitting watermark in > source > - > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are two scenarios for the source case: > * In the source WatermarkContext, it would toggle the status as active while > collecting/emitting and the status is checked in RecordWriterOutput. If the > watermark is triggered by timer for AutomaticWatermarkContext, the timer task > would check the status before emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by > timer, but it still relies on RecordWriterOutput to check the status before > emitting. > So the check logic in RecordWriterOutput only makes sense for the last > scenario, and seems redundant for the first scenario. > Even worse, this logic is RecordWriterOutput would bring cycle dependency > with StreamStatusMaintainer, which is a blocker for the following work of > integrating source processing on runtime side. > To solve above issues, the basic idea is to refactor this check logic in > upper layer instead of current low level RecordWriterOutput. The solution is > migrating the check logic from RecordWriterOutput to > TimestampsAndPeriodicWatermarksOperator. -- This
[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13798: - Description: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are two scenarios for the source case: * In the source WatermarkContext, it would toggle the status as active while collecting/emitting and the status is checked in the RecordWriterOutput. If the watermark is triggered by timer for AutomaticWatermarkContext, the timer task would check the status before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer. But it relies on RecordWriterOutput to check the status before emitting. So we can see that the check logic in RecordWriterOutput only makes sense for the last scenario, and seems redundant for the first scenario. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to make this check logic in upper layer instead of current low level RecordWriterOutput. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And the toggling active action could be removed in AutomaticWatermarkContext while emitting records. was: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are three scenarios for the source case: * In the AutomaticWatermarkContext, it would toggle the status as active while collecting record and the status is checked in the RecordWriterOutput. If the watermark is triggered by timer, the timer task would check the status before emitting watermark. * In the ManualWatermarkContext, the status is also checked in RecordWriterOutput before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer in interval time. When it happens, it would call emitting watermark via output. Then the RecordWriterOutput would check the status before emitting. So we can see that the checking logic in RecordWriterOutput only makes sense for the last two scenarios, and seems redundant for the first scenario. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to make this check logic in upper layer instead of current low level RecordWriterOutput. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And the toggling active action could be removed in AutomaticWatermarkContext while emitting records. > Refactor the process of checking stream status while emitting watermark in > source > - > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are two scenarios for the source case: > * In the source WatermarkContext, it would toggle the status as active while > collecting/emitting and the status is checked in the RecordWriterOutput. If > the watermark is triggered by timer for AutomaticWatermarkContext, the timer > task would check the status before emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by > timer. But it relies on RecordWriterOutput to check the status before > emitting. > So we can see that the check logic in RecordWriterOutput only makes sense for > the last scenario, and seems redundant for the first scenario. > Even worse, the logic is RecordWriterOutput would bring cycle dependency with > StreamStatusMaintainer, which is a blocker for the following work of > integrating
[jira] [Issue Comment Deleted] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13798: - Comment: was deleted (was: Thanks for the replies. [~StephanEwen] Wish it would make logic easy understand as you mentioned in new source operator. But now we have to solve this issue during refactoring. :( I just found another issue which is not mentioned in our precious discussion. If we remove this check from RecordWriterOutput, and add this logic in TimestampsAndPeriodicWatermarksOperator, for the AutomaticWatermarkContext case it seems no problem. But for the above second case of ManualWatermarkContext, I am not quite sure whether it needs the check logic as now. Could you help double confirm it? [~tzulitai] ) > Refactor the process of checking stream status while emitting watermark in > source > - > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are three scenarios for the source case: > * In the AutomaticWatermarkContext, it would toggle the status as active > while collecting record and the status is checked in the RecordWriterOutput. > If the watermark is triggered by timer, the timer task would check the status > before emitting watermark. > * In the ManualWatermarkContext, the status is also checked in > RecordWriterOutput before emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by > timer in interval time. When it happens, it would call emitting watermark via > output. Then the RecordWriterOutput would check the status before emitting. > So we can see that the checking logic in RecordWriterOutput only makes sense > for the last two scenarios, and seems redundant for the first scenario. > Even worse, the logic is RecordWriterOutput would bring cycle dependency with > StreamStatusMaintainer, which is a blocker for the following work of > integrating source processing on runtime side. > To solve above issues, the basic idea is to make this check logic in upper > layer instead of current low level RecordWriterOutput. The solution is that > we could migrate the checking logic from RecordWriterOutput to > TimestampsAndPeriodicWatermarksOperator. And the toggling active action could > be removed in AutomaticWatermarkContext while emitting records. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16911455#comment-16911455 ] zhijiang commented on FLINK-13798: -- Thanks for the replies. [~StephanEwen] Wish it would make logic easy understand as you mentioned in new source operator. But now we have to solve this issue during refactoring. :( I just found another issue which is not mentioned in our precious discussion. If we remove this check from RecordWriterOutput, and add this logic in TimestampsAndPeriodicWatermarksOperator, for the AutomaticWatermarkContext case it seems no problem. But for the above second case of ManualWatermarkContext, I am not quite sure whether it needs the check logic as now. Could you help double confirm it? [~tzulitai] > Refactor the process of checking stream status while emitting watermark in > source > - > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are three scenarios for the source case: > * In the AutomaticWatermarkContext, it would toggle the status as active > while collecting record and the status is checked in the RecordWriterOutput. > If the watermark is triggered by timer, the timer task would check the status > before emitting watermark. > * In the ManualWatermarkContext, the status is also checked in > RecordWriterOutput before emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by > timer in interval time. When it happens, it would call emitting watermark via > output. Then the RecordWriterOutput would check the status before emitting. > So we can see that the checking logic in RecordWriterOutput only makes sense > for the last two scenarios, and seems redundant for the first scenario. > Even worse, the logic is RecordWriterOutput would bring cycle dependency with > StreamStatusMaintainer, which is a blocker for the following work of > integrating source processing on runtime side. > To solve above issues, the basic idea is to make this check logic in upper > layer instead of current low level RecordWriterOutput. The solution is that > we could migrate the checking logic from RecordWriterOutput to > TimestampsAndPeriodicWatermarksOperator. And the toggling active action could > be removed in AutomaticWatermarkContext while emitting records. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13798: - Description: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are three scenarios for the source case: * In the AutomaticWatermarkContext, it would toggle the status as active while collecting record and the status is checked in the RecordWriterOutput. If the watermark is triggered by timer, the timer task would check the status before emitting watermark. * In the ManualWatermarkContext, the status is also checked in RecordWriterOutput before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer in interval time. When it happens, it would call emitting watermark via output. Then the RecordWriterOutput would check the status before emitting. So we can see that the checking logic in RecordWriterOutput only makes sense for the last two scenarios, and seems redundant for the first scenario. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to make this check logic in upper layer instead of current low level RecordWriterOutput. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And the toggling active action could be removed in AutomaticWatermarkContext while emitting records. was: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are three scenarios for the source case: * In the AutomaticWatermarkContext, it would toggle the status as active while collecting record and the status is checked in the RecordWriterOutput. If the watermark is triggered by timer, the timer task would check the status before emitting watermark. * In the ManualWatermarkContext, the status is also checked in RecordWriterOutput before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer in interval time. When it happens, it would call emitting watermark via output. Then the RecordWriterOutput would check the status before emitting. So we can see that the checking logic in RecordWriterOutput only makes sense for the last two scenarios, and seems redundant for the first scenario. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to make this check logic in upper layer instead of current low level RecordWriterOutput. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. > Refactor the process of checking stream status while emitting watermark in > source > - > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are three scenarios for the source case: > * In the AutomaticWatermarkContext, it would toggle the status as active > while collecting record and the status is checked in the RecordWriterOutput. > If the watermark is triggered by timer, the timer task would check the status > before emitting watermark. > * In the ManualWatermarkContext, the status is also checked in > RecordWriterOutput before emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by > timer in interval time. When it happens, it would call emitting watermark via > output. Then the RecordWriterOutput would check the status before emitting. > So we can see that the checking logic in RecordWriterOutput only makes sense > for the
[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13798: - Description: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are three scenarios for the source case: * In the AutomaticWatermarkContext, it would toggle the status as active while collecting record and the status is checked in the RecordWriterOutput. If the watermark is triggered by timer, the timer task would check the status before emitting watermark. * In the ManualWatermarkContext, the status is also checked in RecordWriterOutput before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer in interval time. When it happens, it would call emitting watermark via output. Then the RecordWriterOutput would check the status before emitting. So we can see that the checking logic in RecordWriterOutput only makes sense for the last two scenarios, and seems redundant for the first scenario. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to make this check logic in upper layer instead of current low level RecordWriterOutput. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. was: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are three scenarios for the source case: * In the AutomaticWatermarkContext, it would toggle the status as active while collecting record and the status is checked in the RecordWriterOutput. If the watermark is triggered by timer, the timer task would check the status before calling emit watermark. * In the ManualWatermarkContext, the status is also checked in RecordWriterOutput before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer in interval time. When it happens, it would call emitting watermark via output. Then the RecordWriterOutput would check the status before emitting. So we can see that the checking logic in RecordWriterOutput only makes sense for the last two scenarios, and seems redundant for the first scenario. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to make this check logic in upper layer instead of current low level RecordWriterOutput. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. > Refactor the process of checking stream status while emitting watermark in > source > - > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are three scenarios for the source case: > * In the AutomaticWatermarkContext, it would toggle the status as active > while collecting record and the status is checked in the RecordWriterOutput. > If the watermark is triggered by timer, the timer task would check the status > before emitting watermark. > * In the ManualWatermarkContext, the status is also checked in > RecordWriterOutput before emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by > timer in interval time. When it happens, it would call emitting watermark via > output. Then the RecordWriterOutput would check the status before emitting. > So we can see that the checking logic in RecordWriterOutput only makes sense > for the last two scenarios, and seems redundant for the first scenario. > Even worse, the logic is
[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13798: - Description: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are three scenarios for the source case: * In the AutomaticWatermarkContext, it would toggle the status as active while collecting record and the status is checked in the RecordWriterOutput. If the watermark is triggered by timer, the timer task would check the status before calling emit watermark. * In the ManualWatermarkContext, the status is also checked in RecordWriterOutput before emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer in interval time. When it happens, it would call emitting watermark via output. Then the RecordWriterOutput would check the status before emitting. So we can see that the checking logic in RecordWriterOutput only makes sense for the last two scenarios, and seems redundant for the first scenario. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. To solve above issues, the basic idea is to make this check logic in upper layer instead of current low level RecordWriterOutput. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. was: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are two scenarios for the source case: * Emit watermark via source context: In the specific WatermarkContext, it would toggle the stream status as active before collecting/emitting records/watermarks. Then in the implementation of RecordWriterOutput, it would check the status always active before really emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer in interval time. When it happens, it would call output stack to emit watermark. Then the RecordWriterOutput could take the role of checking status before really emitting watermark. So we can see that the checking status logic in RecordWriterOutput only works for above second scenario, and this logic seems redundant for the first scenario because WatermarkContext always toggle active status before emitting. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. > Refactor the process of checking stream status while emitting watermark in > source > - > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are three scenarios for the source case: > * In the AutomaticWatermarkContext, it would toggle the status as active > while collecting record and the status is checked in the RecordWriterOutput. > If the watermark is triggered by timer, the timer task would check the status > before calling emit watermark. > * In the ManualWatermarkContext, the status is also checked in > RecordWriterOutput before emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by > timer in interval time. When it happens, it would call emitting watermark via > output. Then the RecordWriterOutput would check the status before emitting. > So we can see that the checking logic in RecordWriterOutput only makes sense > for the last two scenarios, and seems redundant for the first scenario. > Even worse, the logic is RecordWriterOutput would bring cycle dependency with > StreamStatusMaintainer, which is a blocker
[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13798: - Description: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are two scenarios for the source case: * Emit watermark via source context: In the specific WatermarkContext, it would toggle the stream status as active before collecting/emitting records/watermarks. Then in the implementation of RecordWriterOutput, it would check the status always active before really emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer in interval time. When it happens, it would call output stack to emit watermark. Then the RecordWriterOutput could take the role of checking status before really emitting watermark. So we can see that the checking status logic in RecordWriterOutput only works for above second scenario, and this logic seems redundant for the first scenario because WatermarkContext always toggle active status before emitting. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. was: As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are two scenarios for the source case: * Emit watermark via source context: In the specific WatermarkContext, it would toggle the stream status as active before collecting/emitting records/watermarks. Then in the implementation of RecordWriterOutput, it would check the status always active before really emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer in interval time. When it happens, it would call output stack to emit watermark. Then the RecordWriterOutput could take the role of checking status before really emitting watermark. So we can see that the checking status logic in RecordWriterOutput only works for above second scenario, and this logic seems redundant for the first scenario because WatermarkContext always toggle active status before emitting. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could also remove the toggle active logic in existing WatermarkContext. > Refactor the process of checking stream status while emitting watermark in > source > - > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are two scenarios for the source case: > * Emit watermark via source context: In the specific WatermarkContext, it > would toggle the stream status as active before collecting/emitting > records/watermarks. Then in the implementation of RecordWriterOutput, it > would check the status always active before really emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by > timer in interval time. When it happens, it would call output stack to emit > watermark. Then the RecordWriterOutput could take the role of checking status > before really emitting watermark. > So we can see that the checking status logic in RecordWriterOutput only works > for above second scenario, and this logic seems redundant for the first > scenario because WatermarkContext always toggle active status before > emitting. Even worse, the logic is RecordWriterOutput would bring cycle > dependency with StreamStatusMaintainer, which is a
[jira] [Commented] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16911275#comment-16911275 ] zhijiang commented on FLINK-13798: -- Thanks for the offline discussion and suggestion. [~pnowojski] [~tzulitai] Could you double check my above understanding is right? > Refactor the process of checking stream status while emitting watermark in > source > - > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are two scenarios for the source case: > * Emit watermark via source context: In the specific WatermarkContext, it > would toggle the stream status as active before collecting/emitting > records/watermarks. Then in the implementation of RecordWriterOutput, it > would check the status always active before really emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by > timer in interval time. When it happens, it would call output stack to emit > watermark. Then the RecordWriterOutput could take the role of checking status > before really emitting watermark. > So we can see that the checking status logic in RecordWriterOutput only works > for above second scenario, and this logic seems redundant for the first > scenario because WatermarkContext always toggle active status before > emitting. Even worse, the logic is RecordWriterOutput would bring cycle > dependency with StreamStatusMaintainer, which is a blocker for the following > work of integrating source processing on runtime side. > The solution is that we could migrate the checking logic from > RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could > also remove the toggle active logic in existing WatermarkContext. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
zhijiang created FLINK-13798: Summary: Refactor the process of checking stream status while emitting watermark in source Key: FLINK-13798 URL: https://issues.apache.org/jira/browse/FLINK-13798 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are two scenarios for the source case: * Emit watermark via source context: In the specific WatermarkContext, it would toggle the stream status as active before collecting/emitting records/watermarks. Then in the implementation of RecordWriterOutput, it would check the status always active before really emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer in interval time. When it happens, it would call output stack to emit watermark. Then the RecordWriterOutput could take the role of checking status before really emitting watermark. So we can see that the checking status logic in RecordWriterOutput only works for above second scenario, and this logic seems redundant for the first scenario because WatermarkContext always toggle active status before emitting. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could also remove the toggle active logic in existing WatermarkContext. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-13767) Migrate isFinished method from AvailabilityListener to AsyncDataInput
zhijiang created FLINK-13767: Summary: Migrate isFinished method from AvailabilityListener to AsyncDataInput Key: FLINK-13767 URL: https://issues.apache.org/jira/browse/FLINK-13767 Project: Flink Issue Type: Sub-task Components: Runtime / Network, Runtime / Task Reporter: zhijiang Assignee: zhijiang AvailabilityListener is both used in AsyncDataInput and StreamTaskInput. We already introduced InputStatus for StreamTaskInput#emitNext, and then InputStatus#END_OF_INPUT has the same semantic with AvailabilityListener#isFinished. But for the case of AsyncDataInput which is mainly used by InputGate layer, the isFinished() method is still needed at the moment. So we migrate this method from AvailabilityListener to AsyncDataInput, and refactor the StreamInputProcessor implementations by using InputStatus to judge finished state. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13766) Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext
zhijiang created FLINK-13766: Summary: Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext Key: FLINK-13766 URL: https://issues.apache.org/jira/browse/FLINK-13766 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang The current processing in task input processor is based on the way of pollNext. In order to unify the processing way of new source operator, we introduce the new StreamTaskInput#emitNext(Output) instead of current pollNext. Then we need to adjust the existing implementations of StreamOneInputProcessor/StreamTwoInputSelectableProcessor based on the new emit way. To do so, we could integrate all the task inputs from network/source in a unified processing on runtime side. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13765) Introduce the InputSelectionHandler for selecting next input in StreamTwoInputSelectableProcessor
zhijiang created FLINK-13765: Summary: Introduce the InputSelectionHandler for selecting next input in StreamTwoInputSelectableProcessor Key: FLINK-13765 URL: https://issues.apache.org/jira/browse/FLINK-13765 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang In StreamTwoInputSelectableProcessor there are three fields \{InputSelectable, InputSelection, availableInputsMask} to be used together for the function of selecting next available input index. It would bring two problems: * From design aspect, these fields should be abstracted into a separate component and passed into StreamTwoInputSelectableProcessor. * inputSelector.nextSelection() is called while processing elements in StreamTwoInputSelectableProcessor, so it is the blocker for integrating task input/output for both StreamOneInputProcessor/StreamTwoInputSelectableProcessor later. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13764) Pass the counter of numRecordsIn into the constructors of StreamOne/TwoInputProcessor
zhijiang created FLINK-13764: Summary: Pass the counter of numRecordsIn into the constructors of StreamOne/TwoInputProcessor Key: FLINK-13764 URL: https://issues.apache.org/jira/browse/FLINK-13764 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang Currently the counter of numRecordsIn is setup while processing input in processor. In order to integrate the processing logic based on StreamTaskInput#emitNext(Output) later, we need to pass the counter into output functions then. So this refactoring is the precondition of following works, and it could get additional benefits. One is that we could make the counter as final field in StreamInputProcessor. Another is that we could reuse the counter setup logic for both StreamOne/TwoInputProcessors. There should be no side effects if we make the counter setup a bit earlier than the previous way. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13764) Pass the counter of numRecordsIn into the constructors of StreamOne/TwoInputProcessor
[ https://issues.apache.org/jira/browse/FLINK-13764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13764: - Component/s: Runtime / Metrics > Pass the counter of numRecordsIn into the constructors of > StreamOne/TwoInputProcessor > - > > Key: FLINK-13764 > URL: https://issues.apache.org/jira/browse/FLINK-13764 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics, Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > Currently the counter of numRecordsIn is setup while processing input in > processor. In order to integrate the processing logic based on > StreamTaskInput#emitNext(Output) later, we need to pass the counter into > output functions then. > So this refactoring is the precondition of following works, and it could get > additional benefits. One is that we could make the counter as final field in > StreamInputProcessor. Another is that we could reuse the counter setup logic > for both StreamOne/TwoInputProcessors. > There should be no side effects if we make the counter setup a bit earlier > than the previous way. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13762) Integrate the implementation of ForwardingValveOutputHandler for StreamOne/TwoInputProcessor
[ https://issues.apache.org/jira/browse/FLINK-13762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13762: - Description: Currently StreamOneInputProcessor and StreamTwoInputSelectableProcessor have separate implementations of ForwardingValueOutputHandler. Especially for the implementation in StreamTwoInputSelectableProcessor, it couples the internal input index logic which would be a blocker for the following unification of StreamTaskInput/Output. We could realize a unified ForwardingValueOutputHandler for both StreamOneInput/ TwoInputSelectableProcessor, and it does not consider different inputs to always consume StreamStatus. Then we refactor the implementation of StreamStatusMaintainer for judging the status of different inputs internally before really emitting the StreamStatus. was: Currently StreamOneInputProcessor and StreamTwoInputSelectableProcessor have separate implementations of ForwardingValveOutputHandler. Especially for the implementation in StreamTwoInputSelectableProcessor, it couples the internal input index logic which would be a blocker for the following unification of StreamTaskInput/Output. We could realize a unified ForwardingValveOutputHandler for both StreamOneInput/ TwoInputSelectableProcessor, and it does not consider different inputs to always consume StreamStatus. Then we refactor the implementation of StreamStatusMaintainer for judging the status of different inputs internally before really emitting the StreamStatus. > Integrate the implementation of ForwardingValveOutputHandler for > StreamOne/TwoInputProcessor > > > Key: FLINK-13762 > URL: https://issues.apache.org/jira/browse/FLINK-13762 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > Currently StreamOneInputProcessor and StreamTwoInputSelectableProcessor have > separate implementations of ForwardingValueOutputHandler. Especially for the > implementation in StreamTwoInputSelectableProcessor, it couples the internal > input index logic which would be a blocker for the following unification of > StreamTaskInput/Output. > We could realize a unified ForwardingValueOutputHandler for both > StreamOneInput/ TwoInputSelectableProcessor, and it does not consider > different inputs to always consume StreamStatus. Then we refactor the > implementation of StreamStatusMaintainer for judging the status of different > inputs internally before really emitting the StreamStatus. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13762) Implement a unified StatusWatermarkOutputHandler for StreamOne/TwoInputProcessor
[ https://issues.apache.org/jira/browse/FLINK-13762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13762: - Summary: Implement a unified StatusWatermarkOutputHandler for StreamOne/TwoInputProcessor (was: Integrate the implementation of ForwardingValveOutputHandler for StreamOne/TwoInputProcessor) > Implement a unified StatusWatermarkOutputHandler for > StreamOne/TwoInputProcessor > > > Key: FLINK-13762 > URL: https://issues.apache.org/jira/browse/FLINK-13762 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > Currently StreamOneInputProcessor and StreamTwoInputSelectableProcessor have > separate implementations of ForwardingValueOutputHandler. Especially for the > implementation in StreamTwoInputSelectableProcessor, it couples the internal > input index logic which would be a blocker for the following unification of > StreamTaskInput/Output. > We could realize a unified ForwardingValueOutputHandler for both > StreamOneInput/ TwoInputSelectableProcessor, and it does not consider > different inputs to always consume StreamStatus. Then we refactor the > implementation of StreamStatusMaintainer for judging the status of different > inputs internally before really emitting the StreamStatus. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13762) Integrate the implementation of ForwardingValveOutputHandler for StreamOne/TwoInputProcessor
zhijiang created FLINK-13762: Summary: Integrate the implementation of ForwardingValveOutputHandler for StreamOne/TwoInputProcessor Key: FLINK-13762 URL: https://issues.apache.org/jira/browse/FLINK-13762 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang Currently StreamOneInputProcessor and StreamTwoInputSelectableProcessor have separate implementations of ForwardingValveOutputHandler. Especially for the implementation in StreamTwoInputSelectableProcessor, it couples the internal input index logic which would be a blocker for the following unification of StreamTaskInput/Output. We could realize a unified ForwardingValveOutputHandler for both StreamOneInput/ TwoInputSelectableProcessor, and it does not consider different inputs to always consume StreamStatus. Then we refactor the implementation of StreamStatusMaintainer for judging the status of different inputs internally before really emitting the StreamStatus. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13754) Decouple OperatorChain from StreamStatusMaintainer
zhijiang created FLINK-13754: Summary: Decouple OperatorChain from StreamStatusMaintainer Key: FLINK-13754 URL: https://issues.apache.org/jira/browse/FLINK-13754 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang There are two motivations for this refactoring: * It is the precondition for the following work of decoupling the dependency between two inputs status in ForwardingValveOutputHandler. * From the aspect of design rule, the current OperatorChain takes many unrelated roles like StreamStatusMaintainer to make it unmaintainable. The root reason for this case is from the cycle dependency between RecordWriterOutput (created by OperatorChain) and StreamStatusMaintainer. The solution is to refactor the creation of StreamStatusMaintainer and RecordWriterOutput in StreamTask level, and then break the implementation cycle dependency between them. The array of RecordWriters which has close relationship with RecordWriterOutput is created in StreamTask, so it is reasonable to create them together. The created StreamStatusMaintainer in StreamTask can be directly referenced by subclasses like OneInputStreamTask/TwoInputStreamTask. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13753) Integrate new Source Operator with Mailbox Model in StreamTask
[ https://issues.apache.org/jira/browse/FLINK-13753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13753: - Description: This is the umbrella issue for integrating new source operator with mailbox model in StreamTask. The motivation is based on [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface] which proposes to refactor the whole source API and the integration of task-level actions (including checkpoint, timer, async operator) with unified mailbox model on runtime side. * The benefits are simple unified processing logics because only one single thread handles all the actions without concurrent issue, and further getting rid of lock dependency which causes unfair lock concern in checkpoint process. * We still need to support the current legacy source in some releases which would probably be used for a while, especially for the scenario of performance concern. The design doc is [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit] was: This is the umbrella issue for integrating new source operator with mailbox model in StreamTask. The motivation is based on [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface] which proposes to refactor the whole source API and the integration of task-level actions (including checkpoint, timer, async operator) with unified mailbox model on runtime side. * The benefits are simple unified processing logics because only one single thread handles all the actions without concurrent issue, and further getting rid of lock dependency which causes unfair lock concern in checkpoint process. * We still need to support the current legacy source in some releases which would probably be used for a while, especially for the scenario of performance concern. The design doc is [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit] > Integrate new Source Operator with Mailbox Model in StreamTask > -- > > Key: FLINK-13753 > URL: https://issues.apache.org/jira/browse/FLINK-13753 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > This is the umbrella issue for integrating new source operator with mailbox > model in StreamTask. > The motivation is based on > [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface] > which proposes to refactor the whole source API and the integration of > task-level actions (including checkpoint, timer, async operator) with unified > mailbox model on runtime side. > * The benefits are simple unified processing logics because only one single > thread handles all the actions without concurrent issue, and further getting > rid of lock dependency which causes unfair lock concern in checkpoint process. > * We still need to support the current legacy source in some releases which > would probably be used for a while, especially for the scenario of > performance concern. > The design doc is > [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13753) Integrate new Source Operator with Mailbox Model in StreamTask
[ https://issues.apache.org/jira/browse/FLINK-13753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13753: - Description: This is the umbrella issue for integrating new source operator with mailbox model in StreamTask. The motivation is based on [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface] which proposes to refactor the whole source API and the integration of task-level actions (including checkpoint, timer, async operator) with unified mailbox model on runtime side. * The benefits are simple unified processing logics because only one single thread handles all the actions without concurrent issue, and further getting rid of lock dependency which causes unfair lock concern in checkpoint process. * We still need to support the current legacy source in some releases which would probably be used for a while, especially for the scenario of performance concern. The design doc is [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit] was: This is the umbrella issue for integrating new source operator with mailbox model in StreamTask. The motivation is based on [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface] which proposes to refactor the whole source API and the integration of task-level actions (including checkpoint, timer, async operator) with unified [mailbox model|[https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g]] on runtime side. * The benefits are simple unified processing logics because only one single thread handles all the actions without concurrent issue, and further getting rid of lock dependency which causes unfair lock concern in checkpoint process. * We still need to support the current legacy source in some releases which would probably be used for a while, especially for the scenario of performance concern. The design doc is [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]] > Integrate new Source Operator with Mailbox Model in StreamTask > -- > > Key: FLINK-13753 > URL: https://issues.apache.org/jira/browse/FLINK-13753 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > This is the umbrella issue for integrating new source operator with mailbox > model in StreamTask. > The motivation is based on > [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface] > which proposes to refactor the whole source API and the integration of > task-level actions (including checkpoint, timer, async operator) with unified > mailbox model on runtime side. > * The benefits are simple unified processing logics because only one single > thread handles all the actions without concurrent issue, and further getting > rid of lock dependency which causes unfair lock concern in checkpoint process. > * We still need to support the current legacy source in some releases which > would probably be used for a while, especially for the scenario of > performance concern. > The design doc is > [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13753) Integrate new Source Operator with Mailbox Model in StreamTask
zhijiang created FLINK-13753: Summary: Integrate new Source Operator with Mailbox Model in StreamTask Key: FLINK-13753 URL: https://issues.apache.org/jira/browse/FLINK-13753 Project: Flink Issue Type: Improvement Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang This is the umbrella issue for integrating new source operator with mailbox model in StreamTask. The motivation is based on [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface] which proposes to refactor the whole source API and the integration of task-level actions (including checkpoint, timer, async operator) with unified [mailbox model| [https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g]] on runtime side. * The benefits are simple unified processing logics because only one single thread handles all the actions without concurrent issue, and further getting rid of lock dependency which causes unfair lock concern in checkpoint process. * We still need to support the current legacy source in some releases which would probably be used for a while, especially for the scenario of performance concern. The design doc is [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13753) Integrate new Source Operator with Mailbox Model in StreamTask
[ https://issues.apache.org/jira/browse/FLINK-13753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13753: - Description: This is the umbrella issue for integrating new source operator with mailbox model in StreamTask. The motivation is based on [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface] which proposes to refactor the whole source API and the integration of task-level actions (including checkpoint, timer, async operator) with unified [mailbox model|[https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g]] on runtime side. * The benefits are simple unified processing logics because only one single thread handles all the actions without concurrent issue, and further getting rid of lock dependency which causes unfair lock concern in checkpoint process. * We still need to support the current legacy source in some releases which would probably be used for a while, especially for the scenario of performance concern. The design doc is [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]] was: This is the umbrella issue for integrating new source operator with mailbox model in StreamTask. The motivation is based on [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface] which proposes to refactor the whole source API and the integration of task-level actions (including checkpoint, timer, async operator) with unified [mailbox model| [https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g]] on runtime side. * The benefits are simple unified processing logics because only one single thread handles all the actions without concurrent issue, and further getting rid of lock dependency which causes unfair lock concern in checkpoint process. * We still need to support the current legacy source in some releases which would probably be used for a while, especially for the scenario of performance concern. The design doc is [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]] > Integrate new Source Operator with Mailbox Model in StreamTask > -- > > Key: FLINK-13753 > URL: https://issues.apache.org/jira/browse/FLINK-13753 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > This is the umbrella issue for integrating new source operator with mailbox > model in StreamTask. > The motivation is based on > [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface] > which proposes to refactor the whole source API and the integration of > task-level actions (including checkpoint, timer, async operator) with unified > [mailbox > model|[https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g]] > on runtime side. > * The benefits are simple unified processing logics because only one single > thread handles all the actions without concurrent issue, and further getting > rid of lock dependency which causes unfair lock concern in checkpoint process. > * We still need to support the current legacy source in some releases which > would probably be used for a while, especially for the scenario of > performance concern. > The design doc is > [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Closed] (FLINK-12630) Refactor abstract InputGate to general interface
[ https://issues.apache.org/jira/browse/FLINK-12630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang closed FLINK-12630. Resolution: Won't Fix > Refactor abstract InputGate to general interface > > > Key: FLINK-12630 > URL: https://issues.apache.org/jira/browse/FLINK-12630 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > `InputGate` is currently defined as an abstract class which extracts the > common codes for checking data availability for subclasses `SingleInputGate` > and `UnionInputGate`, but it might bring limits for further extending > `InputGate` implementations in shuffle service architecture. > `SingleInputGate` is created from shuffle service so it belongs to the scope > of shuffle service, while `UnionInputGate` is a wrapper of some > `SingleInputGate`s so it should be in the task/processor stack. > In order to make a new `InputGate` implementation from another new shuffle > service could be directly pitched in, we should define a more clean > `InputGate` interface to decouple the implementation of checking data > available logic. In detail we could define the `isAvailable` method in > `InputGate` interface and extract the current implementation as a separate > class `FutureBasedAvailability` which could still be extent and reused for > both `SingleInputGate` and `UnionInputGate`. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Closed] (FLINK-12576) inputQueueLength metric does not work for LocalInputChannels
[ https://issues.apache.org/jira/browse/FLINK-12576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang closed FLINK-12576. Resolution: Fixed Fix in master : 302fc1d5de38bb6db99ddc45470efcc9ebd782bc Fix in release-1.9 : 2a6fb9af7bca5a55d1dc9c55b779eea38b43e1a2 > inputQueueLength metric does not work for LocalInputChannels > > > Key: FLINK-12576 > URL: https://issues.apache.org/jira/browse/FLINK-12576 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / Network >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Piotr Nowojski >Assignee: Aitozi >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Currently {{inputQueueLength}} ignores LocalInputChannels > ({{SingleInputGate#getNumberOfQueuedBuffers}}). This can can cause mistakes > when looking for causes of back pressure (If task is back pressuring whole > Flink job, but there is a data skew and only local input channels are being > used). -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13435) Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per partition type
[ https://issues.apache.org/jira/browse/FLINK-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16898697#comment-16898697 ] zhijiang commented on FLINK-13435: -- Thanks for helping merge into 1.9, I forgot that. [~Zentol] > Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per > partition type > > > Key: FLINK-13435 > URL: https://issues.apache.org/jira/browse/FLINK-13435 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Runtime / Network >Affects Versions: 1.9.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0, 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > In a long term we do not need auto-release semantics for blocking > (persistent) partition. We expect them always to be released externally by JM > and assume they can be consumed multiple times. > The pipelined partitions have always only one consumer and one consumption > attempt. Afterwards they can be always released automatically. > ShuffleDescriptor.ReleaseType was introduced to make release semantics more > flexible but it is not needed in a long term. > FORCE_PARTITION_RELEASE_ON_CONSUMPTION was introduced as a safety net to be > able to fallback to 1.8 behaviour without the partition tracker and JM taking > care about blocking partition release. We can make this option specific for > NettyShuffleEnvironment which was the only existing shuffle service before. > If it is activated then the blocking partition is also auto-released on a > consumption attempt as it was before. The fine-grained recovery will just not > find the partition after the job restart in this case and will restart the > producer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty
[ https://issues.apache.org/jira/browse/FLINK-13493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897998#comment-16897998 ] zhijiang edited comment on FLINK-13493 at 8/1/19 12:10 PM: --- Thanks for the clarification [~azagrebin]! The set structure of readers in `BoundedBlockingSubpartition` make me think of this concern. I agree it should be no problem now because of two things: * We do not have multiple readers for one subpartition concurrently. The current semantic of consuming multiple times for one subpartition is only considering the failover case, that means next consumption attempt only comes after the previous attempt canceled. * The current blocking subpartition release would check whether reader set is empty. As I proposed in FLINK-13478, it might no need to check that if the ResultPartition already decides to release the whole partition, and this decision might also come from JM RPC. So the whole logic seems a bit weird that the subpartition notifies result partition of consumed even though there are other unconsumed readers, and then if the result partition triggers the whole release, that subpartition with unconsumed readers actually would not be released. was (Author: zjwang): Thanks for the clarification [~azagrebin]! The set structure of readers in `BoundedBlockingSubpartition` make me think of this concern. I agree it should be no problem now because of two things: * We do not have multiple readers for one subpartition concurrently. The current semantic of consuming multiple times for one subpartition is only considering the failover case, that means next consumption attempt only comes after the previous attempt canceled. * The current blocking subpartition release would check whether reader set is empty. As I proposed in FLINK-13478, it might no need to check that if the ResultPartition already decides to release the whole partition, and this decision might also come from JM RPC. So the whole logic seems a bit weird that the subpartition would notify result partition consumed even though there are other unconsumed readers, and then if the result partition triggers the whole release, the subpartition with unconsumed readers actually would not be released. > BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the > readers are empty > --- > > Key: FLINK-13493 > URL: https://issues.apache.org/jira/browse/FLINK-13493 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > In previous implementation, it would always notify the {{ResultPartition}} of > consumed subpartition if any reader view is released. Based on the > reference-counter release strategy it might cause problems if one blocking > subpartition has multiple readers. That means the whole result partition > might be released but there are still alive readers in some subpartitions. > Although the default release strategy for blocking partition is based on > JM/scheduler notification atm. But if we switch the option to based on > consumption notification it would cause problems. And from the subpartition > side it should has the right behavior no matter what is the specific release > strategy in upper layer. > In order to fix this bug, the {{BoundedBlockingSubpartition}} would only > notify {{onConsumedSubpartition}} when all the readers are empty. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty
[ https://issues.apache.org/jira/browse/FLINK-13493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897998#comment-16897998 ] zhijiang edited comment on FLINK-13493 at 8/1/19 12:09 PM: --- Thanks for the clarification [~azagrebin]! The set structure of readers in `BoundedBlockingSubpartition` make me think of this concern. I agree it should be no problem now because of two things: * We do not have multiple readers for one subpartition concurrently. The current semantic of consuming multiple times for one subpartition is only considering the failover case, that means next consumption attempt only comes after the previous attempt canceled. * The current blocking subpartition release would check whether reader set is empty. As I proposed in FLINK-13478, it might no need to check that if the ResultPartition already decides to release the whole partition, and this decision might also come from JM RPC. So the whole logic seems a bit weird that the subpartition would notify result partition consumed even though there are other unconsumed readers, and then if the result partition triggers the whole release, the subpartition with unconsumed readers actually would not be released. was (Author: zjwang): Thanks for the clarification [~azagrebin]! The set structure of readers in `BoundedBlockingSubpartition` make me think of this concern. I agree it should be no problem now because of two things: * We do not have multiple readers for one subpartition concurrently. The current semantic of consuming multiple times for one subpartition is only considering the failover case, that means next consumption attempt only comes after the previous attempt canceled. * The current blocking subpartition release would check whether reader set is empty. As I proposed in FLINK-13478, it might no need to check that if the ResultPartition already decides to release the whole partition, and this decision might also come from JM RPC. So the whole logic seems a bit wired that the subpartition would notify result partition consumed even though there are other unconsumed readers, and then if the result partition triggers the whole release, the subpartition with unconsumed readers actually would not be released. > BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the > readers are empty > --- > > Key: FLINK-13493 > URL: https://issues.apache.org/jira/browse/FLINK-13493 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > In previous implementation, it would always notify the {{ResultPartition}} of > consumed subpartition if any reader view is released. Based on the > reference-counter release strategy it might cause problems if one blocking > subpartition has multiple readers. That means the whole result partition > might be released but there are still alive readers in some subpartitions. > Although the default release strategy for blocking partition is based on > JM/scheduler notification atm. But if we switch the option to based on > consumption notification it would cause problems. And from the subpartition > side it should has the right behavior no matter what is the specific release > strategy in upper layer. > In order to fix this bug, the {{BoundedBlockingSubpartition}} would only > notify {{onConsumedSubpartition}} when all the readers are empty. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty
[ https://issues.apache.org/jira/browse/FLINK-13493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897998#comment-16897998 ] zhijiang edited comment on FLINK-13493 at 8/1/19 12:07 PM: --- Thanks for the clarification [~azagrebin]! The set structure of readers in `BoundedBlockingSubpartition` make me think of this concern. I agree it should be no problem now because of two things: * We do not have multiple readers for one subpartition concurrently. The current semantic of consuming multiple times for one subpartition is only considering the failover case, that means next consumption attempt only comes after the previous attempt canceled. * The current blocking subpartition release would check whether reader set is empty. As I proposed in FLINK-13478, it might no need to check that if the ResultPartition already decides to release the whole partition, and this decision might also come from JM RPC. So the whole logic seems a bit wired that the subpartition would notify result partition consumed even though there are other unconsumed readers, and then if the result partition triggers the whole release, the subpartition with unconsumed readers actually would not be released. was (Author: zjwang): Thanks for the clarification [~azagrebin]! I agree it should be no problem now because we do not have multiple readers for one subpartition concurrently. The current semantic of consuming multiple times for one subpartition is only considering the failover case, that means next consumption attempt only comes after the previous attempt canceled. The set structure of readers in `BoundedBlockingSubpartition` make me think of this unnecessary concern. > BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the > readers are empty > --- > > Key: FLINK-13493 > URL: https://issues.apache.org/jira/browse/FLINK-13493 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > In previous implementation, it would always notify the {{ResultPartition}} of > consumed subpartition if any reader view is released. Based on the > reference-counter release strategy it might cause problems if one blocking > subpartition has multiple readers. That means the whole result partition > might be released but there are still alive readers in some subpartitions. > Although the default release strategy for blocking partition is based on > JM/scheduler notification atm. But if we switch the option to based on > consumption notification it would cause problems. And from the subpartition > side it should has the right behavior no matter what is the specific release > strategy in upper layer. > In order to fix this bug, the {{BoundedBlockingSubpartition}} would only > notify {{onConsumedSubpartition}} when all the readers are empty. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Closed] (FLINK-13435) Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per partition type
[ https://issues.apache.org/jira/browse/FLINK-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang closed FLINK-13435. Resolution: Fixed Fixed in master : 6ce4e2b7ec958570068838491bd8f56f880c > Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per > partition type > > > Key: FLINK-13435 > URL: https://issues.apache.org/jira/browse/FLINK-13435 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Runtime / Network >Affects Versions: 1.9.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > In a long term we do not need auto-release semantics for blocking > (persistent) partition. We expect them always to be released externally by JM > and assume they can be consumed multiple times. > The pipelined partitions have always only one consumer and one consumption > attempt. Afterwards they can be always released automatically. > ShuffleDescriptor.ReleaseType was introduced to make release semantics more > flexible but it is not needed in a long term. > FORCE_PARTITION_RELEASE_ON_CONSUMPTION was introduced as a safety net to be > able to fallback to 1.8 behaviour without the partition tracker and JM taking > care about blocking partition release. We can make this option specific for > NettyShuffleEnvironment which was the only existing shuffle service before. > If it is activated then the blocking partition is also auto-released on a > consumption attempt as it was before. The fine-grained recovery will just not > find the partition after the job restart in this case and will restart the > producer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty
[ https://issues.apache.org/jira/browse/FLINK-13493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897998#comment-16897998 ] zhijiang commented on FLINK-13493: -- Thanks for the clarification [~azagrebin]! I agree it should be no problem now because we do not have multiple readers for one subpartition concurrently. The current semantic of consuming multiple times for one subpartition is only considering the failover case, that means next consumption attempt only comes after the previous attempt canceled. The set structure of readers in `BoundedBlockingSubpartition` make me think of this unnecessary concern. > BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the > readers are empty > --- > > Key: FLINK-13493 > URL: https://issues.apache.org/jira/browse/FLINK-13493 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > In previous implementation, it would always notify the {{ResultPartition}} of > consumed subpartition if any reader view is released. Based on the > reference-counter release strategy it might cause problems if one blocking > subpartition has multiple readers. That means the whole result partition > might be released but there are still alive readers in some subpartitions. > Although the default release strategy for blocking partition is based on > JM/scheduler notification atm. But if we switch the option to based on > consumption notification it would cause problems. And from the subpartition > side it should has the right behavior no matter what is the specific release > strategy in upper layer. > In order to fix this bug, the {{BoundedBlockingSubpartition}} would only > notify {{onConsumedSubpartition}} when all the readers are empty. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-10672) Task stuck while writing output to flink
[ https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897491#comment-16897491 ] zhijiang commented on FLINK-10672: -- Could you also test the release-1.8.1 or current master branch whether it could happen or not. We ever fixed some deadlock issues for both pipelined and blocking partitions, which could also cause the similar problems as you mentioned. > Task stuck while writing output to flink > > > Key: FLINK-10672 > URL: https://issues.apache.org/jira/browse/FLINK-10672 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.5.4 > Environment: OS: Debuan rodente 4.17 > Flink version: 1.5.4 > ||Key||Value|| > |jobmanager.heap.mb|1024| > |jobmanager.rpc.address|localhost| > |jobmanager.rpc.port|6123| > |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter| > |metrics.reporter.jmx.port|9250-9260| > |metrics.reporters|jmx| > |parallelism.default|1| > |rest.port|8081| > |taskmanager.heap.mb|1024| > |taskmanager.numberOfTaskSlots|1| > |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26| > > h1. Overview > ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap > Size||Flink Managed Memory|| > |43501|1|0|12|62.9 GB|922 MB|642 MB| > h1. Memory > h2. JVM (Heap/Non-Heap) > ||Type||Committed||Used||Maximum|| > |Heap|922 MB|575 MB|922 MB| > |Non-Heap|68.8 MB|64.3 MB|-1 B| > |Total|991 MB|639 MB|922 MB| > h2. Outside JVM > ||Type||Count||Used||Capacity|| > |Direct|3,292|105 MB|105 MB| > |Mapped|0|0 B|0 B| > h1. Network > h2. Memory Segments > ||Type||Count|| > |Available|3,194| > |Total|3,278| > h1. Garbage Collection > ||Collector||Count||Time|| > |G1_Young_Generation|13|336| > |G1_Old_Generation|1|21| >Reporter: Ankur Goenka >Priority: Major > Labels: beam > Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, > jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, > jstack_66985.log > > > I am running a fairly complex pipleline with 200+ task. > The pipeline works fine with small data (order of 10kb input) but gets stuck > with a slightly larger data (300kb input). > > The task gets stuck while writing the output toFlink, more specifically it > gets stuck while requesting memory segment in local buffer pool. The Task > manager UI shows that it has enough memory and memory segments to work with. > The relevant stack trace is > {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 > tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at (C/C++) 0x7fef201c7dae (Unknown Source) > at (C/C++) 0x7fef1f2aea07 (Unknown Source) > at (C/C++) 0x7fef1f241cd3 (Unknown Source) > at java.lang.Object.wait(Native Method) > - waiting on <0xf6d56450> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247) > - locked <0xf6d56450> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26) > at > org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230) > - locked <0xf6a60bd0> (a java.lang.Object) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32) > at >
[jira] [Commented] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition
[ https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897482#comment-16897482 ] zhijiang commented on FLINK-13478: -- [~azagrebin], thanks for the consideration. I think it would no implicit consequences for the consumer side. Just like the current way of `PipelinedSubpartition#release`, it only release the view and would not trigger the connection close. The connection close is normally initiated by consumer/client side. > Decouple two different release strategies in BoundedBlockingSubpartition > > > Key: FLINK-13478 > URL: https://issues.apache.org/jira/browse/FLINK-13478 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > We have two basic release strategies atm. One is based on consumption via > network notification from consumer. The other is based on notification via > RPC from JM/scheduler. > But in current implementation of {{BoundedBlockingSubpartition}}, these two > ways are a bit coupled with each other. If the JM decides to release > partition and send the release RPC call, it has to wait all the reader views > really released before finally closing the data file. So the JM-RPC-based > release strategy still relies on the consumption confirmation via network to > some extent. > In order to make these two release strategies independent, if the release > call is from JM/scheduler RPC, we could immediately release all the view > readers and then close the data file as well. If the release is based on > consumption confirmation, after all the view readers for one subpartition are > released, the subpartition could further notify the parent > {{ResultPartition}} which decides whether to release the whole partition or > not. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (FLINK-13489) Heavy deployment end-to-end test fails on Travis with TM heartbeat timeout
[ https://issues.apache.org/jira/browse/FLINK-13489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang reassigned FLINK-13489: Assignee: Yingjie Cao (was: zhijiang) > Heavy deployment end-to-end test fails on Travis with TM heartbeat timeout > -- > > Key: FLINK-13489 > URL: https://issues.apache.org/jira/browse/FLINK-13489 > Project: Flink > Issue Type: Bug > Components: Test Infrastructure >Reporter: Tzu-Li (Gordon) Tai >Assignee: Yingjie Cao >Priority: Blocker > Fix For: 1.9.0 > > > https://api.travis-ci.org/v3/job/564925128/log.txt > {code} > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: 1b4f1807cc749628cfc1bdf04647527a) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:250) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) > at > org.apache.flink.deployment.HeavyDeploymentStressTestProgram.main(HeavyDeploymentStressTestProgram.java:70) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:247) > ... 21 more > Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager > with id ea456d6a590eca7598c19c4d35e56db9 timed out. > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:318) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at
[jira] [Assigned] (FLINK-13487) TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-13487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang reassigned FLINK-13487: Assignee: Yun Gao (was: Yun Gao) > TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall > failed on Travis > > > Key: FLINK-13487 > URL: https://issues.apache.org/jira/browse/FLINK-13487 > Project: Flink > Issue Type: Bug > Components: Runtime / Task, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: error_log.png > > Time Spent: 20m > Remaining Estimate: 0h > > https://api.travis-ci.org/v3/job/564925114/log.txt > {code} > 21:14:47.090 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time > elapsed: 5.754 s <<< FAILURE! - in > org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest > 21:14:47.090 [ERROR] > testPartitionReleaseAfterReleaseCall(org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest) > Time elapsed: 0.136 s <<< ERROR! > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: > Could not submit task because there is no JobManager associated for the job > 2a0ab40cb53241799b71ff6fd2f53d3d. > at > org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionRelease(TaskExecutorPartitionLifecycleTest.java:331) > at > org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall(TaskExecutorPartitionLifecycleTest.java:201) > Caused by: > org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: > Could not submit task because there is no JobManager associated for the job > 2a0ab40cb53241799b71ff6fd2f53d3d. > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition
[ https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13478: - Description: We have two basic release strategies atm. One is based on consumption via network notification from consumer. The other is based on notification via RPC from JM/scheduler. But in current implementation of {{BoundedBlockingSubpartition}}, these two ways are a bit coupled with each other. If the JM decides to release partition and send the release RPC call, it has to wait all the reader views really released before finally closing the data file. So the JM-RPC-based release strategy still relies on the consumption confirmation via network to some extent. In order to make these two release strategies independent, if the release call is from JM/scheduler RPC, we could immediately release all the view readers and then close the data file as well. If the release is based on consumption notification, after all the view readers for one subpartition are released, the subpartition could further notify the parent {{ResultPartition}} which decides whether to release the whole partition or not. was: We have two basic release strategies atm. One is based on consumption via network notification from consumer. The other is based on notification via RPC from JM/scheduler. But in current implementation of {{BoundedBlockingSubpartition}}, these two ways are coupled with each other. In detail, the network consumption notification could only close data file after the release RPC was triggered from JM/scheduler. Also for the release RPC it has to wait all the reader views really released before closing the data file. So the release RPC still relies on network notification to some extent. In order to make these two release strategies independent, if the release call is from JM/scheduler RPC, we could immediately release all the view readers and then close the data file as well. If the release is based on consumption notification, after all the view readers for one subpartition are released, the subpartition could further notify the parent {{ResultPartition}} which decides whether to release the whole partition or not. > Decouple two different release strategies in BoundedBlockingSubpartition > > > Key: FLINK-13478 > URL: https://issues.apache.org/jira/browse/FLINK-13478 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > We have two basic release strategies atm. One is based on consumption via > network notification from consumer. The other is based on notification via > RPC from JM/scheduler. > But in current implementation of {{BoundedBlockingSubpartition}}, these two > ways are a bit coupled with each other. If the JM decides to release > partition and send the release RPC call, it has to wait all the reader views > really released before finally closing the data file. So the JM-RPC-based > release strategy still relies on the consumption confirmation via network to > some extent. > In order to make these two release strategies independent, if the release > call is from JM/scheduler RPC, we could immediately release all the view > readers and then close the data file as well. If the release is based on > consumption notification, after all the view readers for one subpartition are > released, the subpartition could further notify the parent > {{ResultPartition}} which decides whether to release the whole partition or > not. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition
[ https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13478: - Description: We have two basic release strategies atm. One is based on consumption via network notification from consumer. The other is based on notification via RPC from JM/scheduler. But in current implementation of {{BoundedBlockingSubpartition}}, these two ways are a bit coupled with each other. If the JM decides to release partition and send the release RPC call, it has to wait all the reader views really released before finally closing the data file. So the JM-RPC-based release strategy still relies on the consumption confirmation via network to some extent. In order to make these two release strategies independent, if the release call is from JM/scheduler RPC, we could immediately release all the view readers and then close the data file as well. If the release is based on consumption confirmation, after all the view readers for one subpartition are released, the subpartition could further notify the parent {{ResultPartition}} which decides whether to release the whole partition or not. was: We have two basic release strategies atm. One is based on consumption via network notification from consumer. The other is based on notification via RPC from JM/scheduler. But in current implementation of {{BoundedBlockingSubpartition}}, these two ways are a bit coupled with each other. If the JM decides to release partition and send the release RPC call, it has to wait all the reader views really released before finally closing the data file. So the JM-RPC-based release strategy still relies on the consumption confirmation via network to some extent. In order to make these two release strategies independent, if the release call is from JM/scheduler RPC, we could immediately release all the view readers and then close the data file as well. If the release is based on consumption notification, after all the view readers for one subpartition are released, the subpartition could further notify the parent {{ResultPartition}} which decides whether to release the whole partition or not. > Decouple two different release strategies in BoundedBlockingSubpartition > > > Key: FLINK-13478 > URL: https://issues.apache.org/jira/browse/FLINK-13478 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > We have two basic release strategies atm. One is based on consumption via > network notification from consumer. The other is based on notification via > RPC from JM/scheduler. > But in current implementation of {{BoundedBlockingSubpartition}}, these two > ways are a bit coupled with each other. If the JM decides to release > partition and send the release RPC call, it has to wait all the reader views > really released before finally closing the data file. So the JM-RPC-based > release strategy still relies on the consumption confirmation via network to > some extent. > In order to make these two release strategies independent, if the release > call is from JM/scheduler RPC, we could immediately release all the view > readers and then close the data file as well. If the release is based on > consumption confirmation, after all the view readers for one subpartition are > released, the subpartition could further notify the parent > {{ResultPartition}} which decides whether to release the whole partition or > not. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition
[ https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896163#comment-16896163 ] zhijiang commented on FLINK-13478: -- I think it might belong to a pure refactoring. >From behavior aspect, this fix would only speed the release of partition >resource if the release call from JM is already reaching on TM side, no need >to wait for consumers' network confirmation. But it seems no matter to release >partitions a bit delay based on the assumption that the partition >canceled/consumed would be finally confirmed via network once establishment. It mainly brings a bit confusing to understand the partition release strategy. If the strategy of partition release is based on JM's decision which is always reliable, then it should not be blocked/delayed by network notification. We could focus on it if necessary in 1.10 release. > Decouple two different release strategies in BoundedBlockingSubpartition > > > Key: FLINK-13478 > URL: https://issues.apache.org/jira/browse/FLINK-13478 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > We have two basic release strategies atm. One is based on consumption via > network notification from consumer. The other is based on notification via > RPC from JM/scheduler. > But in current implementation of {{BoundedBlockingSubpartition}}, these two > ways are coupled with each other. In detail, the network consumption > notification could only close data file after the release RPC was triggered > from JM/scheduler. Also for the release RPC it has to wait all the reader > views really released before closing the data file. So the release RPC still > relies on network notification to some extent. > In order to make these two release strategies independent, if the release > call is from JM/scheduler RPC, we could immediately release all the view > readers and then close the data file as well. If the release is based on > consumption notification, after all the view readers for one subpartition are > released, the subpartition could further notify the parent > {{ResultPartition}} which decides whether to release the whole partition or > not. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Issue Comment Deleted] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition
[ https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13478: - Comment: was deleted (was: I think it might belong to a pure refactoring. >From behavior aspect, this fix would only speed the release of partition >resource if the release call from JM is already reaching on TM side, no need >to wait for consumers' network confirmation. But it seems no matter to release >partitions a bit delay based on the assumption that the partition >canceled/consumed would be finally confirmed via network once establishment. It mainly brings a bit confusing to understand the partition release strategy. If the strategy of partition release is based on JM's decision which is always reliable, then it should not be blocked/delayed by network notification. We could focus on it if necessary in 1.10 release. ) > Decouple two different release strategies in BoundedBlockingSubpartition > > > Key: FLINK-13478 > URL: https://issues.apache.org/jira/browse/FLINK-13478 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > We have two basic release strategies atm. One is based on consumption via > network notification from consumer. The other is based on notification via > RPC from JM/scheduler. > But in current implementation of {{BoundedBlockingSubpartition}}, these two > ways are coupled with each other. In detail, the network consumption > notification could only close data file after the release RPC was triggered > from JM/scheduler. Also for the release RPC it has to wait all the reader > views really released before closing the data file. So the release RPC still > relies on network notification to some extent. > In order to make these two release strategies independent, if the release > call is from JM/scheduler RPC, we could immediately release all the view > readers and then close the data file as well. If the release is based on > consumption notification, after all the view readers for one subpartition are > released, the subpartition could further notify the parent > {{ResultPartition}} which decides whether to release the whole partition or > not. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition
[ https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896149#comment-16896149 ] zhijiang commented on FLINK-13478: -- I think it might belong to a pure refactoring. >From behavior aspect, this fix would only speed the release of partition >resource if the release call from JM is already reaching on TM side, no need >to wait for consumers' network confirmation. But it seems no matter to release >partitions a bit delay based on the assumption that the partition >canceled/consumed would be finally confirmed via network once establishment. It mainly brings a bit confusing to understand the partition release strategy. If the strategy of partition release is based on JM's decision which is always reliable, then it should not be blocked/delayed by network notification. We could focus on it if necessary in 1.10 release. > Decouple two different release strategies in BoundedBlockingSubpartition > > > Key: FLINK-13478 > URL: https://issues.apache.org/jira/browse/FLINK-13478 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > We have two basic release strategies atm. One is based on consumption via > network notification from consumer. The other is based on notification via > RPC from JM/scheduler. > But in current implementation of {{BoundedBlockingSubpartition}}, these two > ways are coupled with each other. In detail, the network consumption > notification could only close data file after the release RPC was triggered > from JM/scheduler. Also for the release RPC it has to wait all the reader > views really released before closing the data file. So the release RPC still > relies on network notification to some extent. > In order to make these two release strategies independent, if the release > call is from JM/scheduler RPC, we could immediately release all the view > readers and then close the data file as well. If the release is based on > consumption notification, after all the view readers for one subpartition are > released, the subpartition could further notify the parent > {{ResultPartition}} which decides whether to release the whole partition or > not. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13487) TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-13487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896083#comment-16896083 ] zhijiang commented on FLINK-13487: -- Thanks for the cooperation from [~SleePy] and [~gaoyunhaii] to trace the root cause. It is mainly caused by unfinished registration while submitting tasks and we could reoccur it when adjust the codes to sleep for a while during registration. We should make the submitting task happen only after registration done. [~gaoyunhaii] would fix for it. > TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall > failed on Travis > > > Key: FLINK-13487 > URL: https://issues.apache.org/jira/browse/FLINK-13487 > Project: Flink > Issue Type: Bug > Components: Runtime / Task, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > https://api.travis-ci.org/v3/job/564925114/log.txt > {code} > 21:14:47.090 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time > elapsed: 5.754 s <<< FAILURE! - in > org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest > 21:14:47.090 [ERROR] > testPartitionReleaseAfterReleaseCall(org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest) > Time elapsed: 0.136 s <<< ERROR! > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: > Could not submit task because there is no JobManager associated for the job > 2a0ab40cb53241799b71ff6fd2f53d3d. > at > org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionRelease(TaskExecutorPartitionLifecycleTest.java:331) > at > org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall(TaskExecutorPartitionLifecycleTest.java:201) > Caused by: > org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: > Could not submit task because there is no JobManager associated for the job > 2a0ab40cb53241799b71ff6fd2f53d3d. > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (FLINK-13487) TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-13487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang reassigned FLINK-13487: Assignee: Yun Gao (was: zhijiang) > TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall > failed on Travis > > > Key: FLINK-13487 > URL: https://issues.apache.org/jira/browse/FLINK-13487 > Project: Flink > Issue Type: Bug > Components: Runtime / Task, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Yun Gao >Priority: Blocker > Fix For: 1.9.0 > > > https://api.travis-ci.org/v3/job/564925114/log.txt > {code} > 21:14:47.090 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time > elapsed: 5.754 s <<< FAILURE! - in > org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest > 21:14:47.090 [ERROR] > testPartitionReleaseAfterReleaseCall(org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest) > Time elapsed: 0.136 s <<< ERROR! > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: > Could not submit task because there is no JobManager associated for the job > 2a0ab40cb53241799b71ff6fd2f53d3d. > at > org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionRelease(TaskExecutorPartitionLifecycleTest.java:331) > at > org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall(TaskExecutorPartitionLifecycleTest.java:201) > Caused by: > org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: > Could not submit task because there is no JobManager associated for the job > 2a0ab40cb53241799b71ff6fd2f53d3d. > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader
[ https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895932#comment-16895932 ] zhijiang commented on FLINK-13442: -- I change the priority of this issue as not blocker and create another ticket FLINK-13493 for tracing the possible issue of blocking partition release. I know the default release strategy for blocking partition is via notification from JM/scheduler. But if we might switch the option to based on consumption notification, it would cause problems. > Remove unnecessary notifySubpartitionConsumed method from view reader > -- > > Key: FLINK-13442 > URL: https://issues.apache.org/jira/browse/FLINK-13442 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.10 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently the methods of > `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` > NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in > netty stack during releasing resources. > As confirmed in FLINK-13245, in order to make this release logic simple and > clean, we could remove the redundant `notifySubpartitionConsumed` from > `NetworkSequenceViewReader` side, and also remove it from > `ResultSubpartitionView` side. In the implementation of > `ResultSubpartitionView#releaseAllResources` it would further notify the > parent subpartition of consumed state via > `ResultSubpartition#notifySubpartitionConsumed` which further feedback to > parent `ResultPartition` layer via `onConsumedSubpartition`. Finally > `ResultPartition` could decide whether to release itself or not. > E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly > used for pipelined partition, it would release partition after reference > counter decreased to 0. For the case of `ResultPartition` which would be > generated for blocking partition by default, it would never be released after > notifying consumed. And the JM/scheduler would decide when to release > partition properly. > In addition, `InputChannel#notifySubpartitionConsumed` could also be removed > as a result of above. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty
zhijiang created FLINK-13493: Summary: BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty Key: FLINK-13493 URL: https://issues.apache.org/jira/browse/FLINK-13493 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang In previous implementation, it would always notify the {{ResultPartition}} of consumed subpartition if any reader view is released. Based on the reference-counter release strategy it might cause problems if one blocking subpartition has multiple readers. That means the whole result partition might be released but there are still alive readers in some subpartitions. Although the default release strategy for blocking partition is based on JM/scheduler notification atm. But if we switch the option to based on consumption notification it would cause problems. And from the subpartition side it should has the right behavior no matter what is the specific release strategy in upper layer. In order to fix this bug, the {{BoundedBlockingSubpartition}} would only notify {{onConsumedSubpartition}} when all the readers are empty. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader
[ https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13442: - Fix Version/s: (was: 1.9.0) 1.10 > Remove unnecessary notifySubpartitionConsumed method from view reader > -- > > Key: FLINK-13442 > URL: https://issues.apache.org/jira/browse/FLINK-13442 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.10 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently the methods of > `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` > NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in > netty stack during releasing resources. > As confirmed in FLINK-13245, in order to make this release logic simple and > clean, we could remove the redundant `notifySubpartitionConsumed` from > `NetworkSequenceViewReader` side, and also remove it from > `ResultSubpartitionView` side. In the implementation of > `ResultSubpartitionView#releaseAllResources` it would further notify the > parent subpartition of consumed state via > `ResultSubpartition#notifySubpartitionConsumed` which further feedback to > parent `ResultPartition` layer via `onConsumedSubpartition`. Finally > `ResultPartition` could decide whether to release itself or not. > E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly > used for pipelined partition, it would release partition after reference > counter decreased to 0. For the case of `ResultPartition` which would be > generated for blocking partition by default, it would never be released after > notifying consumed. And the JM/scheduler would decide when to release > partition properly. > In addition, `InputChannel#notifySubpartitionConsumed` could also be removed > as a result of above. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader
[ https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13442: - Issue Type: Improvement (was: Bug) > Remove unnecessary notifySubpartitionConsumed method from view reader > -- > > Key: FLINK-13442 > URL: https://issues.apache.org/jira/browse/FLINK-13442 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently the methods of > `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` > NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in > netty stack during releasing resources. > As confirmed in FLINK-13245, in order to make this release logic simple and > clean, we could remove the redundant `notifySubpartitionConsumed` from > `NetworkSequenceViewReader` side, and also remove it from > `ResultSubpartitionView` side. In the implementation of > `ResultSubpartitionView#releaseAllResources` it would further notify the > parent subpartition of consumed state via > `ResultSubpartition#notifySubpartitionConsumed` which further feedback to > parent `ResultPartition` layer via `onConsumedSubpartition`. Finally > `ResultPartition` could decide whether to release itself or not. > E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly > used for pipelined partition, it would release partition after reference > counter decreased to 0. For the case of `ResultPartition` which would be > generated for blocking partition by default, it would never be released after > notifying consumed. And the JM/scheduler would decide when to release > partition properly. > In addition, `InputChannel#notifySubpartitionConsumed` could also be removed > as a result of above. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader
[ https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13442: - Priority: Minor (was: Blocker) > Remove unnecessary notifySubpartitionConsumed method from view reader > -- > > Key: FLINK-13442 > URL: https://issues.apache.org/jira/browse/FLINK-13442 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently the methods of > `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` > NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in > netty stack during releasing resources. > As confirmed in FLINK-13245, in order to make this release logic simple and > clean, we could remove the redundant `notifySubpartitionConsumed` from > `NetworkSequenceViewReader` side, and also remove it from > `ResultSubpartitionView` side. In the implementation of > `ResultSubpartitionView#releaseAllResources` it would further notify the > parent subpartition of consumed state via > `ResultSubpartition#notifySubpartitionConsumed` which further feedback to > parent `ResultPartition` layer via `onConsumedSubpartition`. Finally > `ResultPartition` could decide whether to release itself or not. > E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly > used for pipelined partition, it would release partition after reference > counter decreased to 0. For the case of `ResultPartition` which would be > generated for blocking partition by default, it would never be released after > notifying consumed. And the JM/scheduler would decide when to release > partition properly. > In addition, `InputChannel#notifySubpartitionConsumed` could also be removed > as a result of above. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader
[ https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13442: - Issue Type: Bug (was: Improvement) > Remove unnecessary notifySubpartitionConsumed method from view reader > -- > > Key: FLINK-13442 > URL: https://issues.apache.org/jira/browse/FLINK-13442 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently the methods of > `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` > NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in > netty stack during releasing resources. > As confirmed in FLINK-13245, in order to make this release logic simple and > clean, we could remove the redundant `notifySubpartitionConsumed` from > `NetworkSequenceViewReader` side, and also remove it from > `ResultSubpartitionView` side. In the implementation of > `ResultSubpartitionView#releaseAllResources` it would further notify the > parent subpartition of consumed state via > `ResultSubpartition#notifySubpartitionConsumed` which further feedback to > parent `ResultPartition` layer via `onConsumedSubpartition`. Finally > `ResultPartition` could decide whether to release itself or not. > E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly > used for pipelined partition, it would release partition after reference > counter decreased to 0. For the case of `ResultPartition` which would be > generated for blocking partition by default, it would never be released after > notifying consumed. And the JM/scheduler would decide when to release > partition properly. > In addition, `InputChannel#notifySubpartitionConsumed` could also be removed > as a result of above. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader
[ https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13442: - Priority: Blocker (was: Minor) > Remove unnecessary notifySubpartitionConsumed method from view reader > -- > > Key: FLINK-13442 > URL: https://issues.apache.org/jira/browse/FLINK-13442 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently the methods of > `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` > NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in > netty stack during releasing resources. > As confirmed in FLINK-13245, in order to make this release logic simple and > clean, we could remove the redundant `notifySubpartitionConsumed` from > `NetworkSequenceViewReader` side, and also remove it from > `ResultSubpartitionView` side. In the implementation of > `ResultSubpartitionView#releaseAllResources` it would further notify the > parent subpartition of consumed state via > `ResultSubpartition#notifySubpartitionConsumed` which further feedback to > parent `ResultPartition` layer via `onConsumedSubpartition`. Finally > `ResultPartition` could decide whether to release itself or not. > E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly > used for pipelined partition, it would release partition after reference > counter decreased to 0. For the case of `ResultPartition` which would be > generated for blocking partition by default, it would never be released after > notifying consumed. And the JM/scheduler would decide when to release > partition properly. > In addition, `InputChannel#notifySubpartitionConsumed` could also be removed > as a result of above. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition
[ https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895832#comment-16895832 ] zhijiang commented on FLINK-13478: -- [~StephanEwen] [~pnowojski] [~azagrebin] What do you think this issue? > Decouple two different release strategies in BoundedBlockingSubpartition > > > Key: FLINK-13478 > URL: https://issues.apache.org/jira/browse/FLINK-13478 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > We have two basic release strategies atm. One is based on consumption via > network notification from consumer. The other is based on notification via > RPC from JM/scheduler. > But in current implementation of {{BoundedBlockingSubpartition}}, these two > ways are coupled with each other. In detail, the network consumption > notification could only close data file after the release RPC was triggered > from JM/scheduler. Also for the release RPC it has to wait all the reader > views really released before closing the data file. So the release RPC still > relies on network notification to some extent. > In order to make these two release strategies independent, if the release > call is from JM/scheduler RPC, we could immediately release all the view > readers and then close the data file as well. If the release is based on > consumption notification, after all the view readers for one subpartition are > released, the subpartition could further notify the parent > {{ResultPartition}} which decides whether to release the whole partition or > not. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader
[ https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13442: - Fix Version/s: 1.9.0 > Remove unnecessary notifySubpartitionConsumed method from view reader > -- > > Key: FLINK-13442 > URL: https://issues.apache.org/jira/browse/FLINK-13442 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently the methods of > `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` > NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in > netty stack during releasing resources. > As confirmed in FLINK-13245, in order to make this release logic simple and > clean, we could remove the redundant `notifySubpartitionConsumed` from > `NetworkSequenceViewReader` side, and also remove it from > `ResultSubpartitionView` side. In the implementation of > `ResultSubpartitionView#releaseAllResources` it would further notify the > parent subpartition of consumed state via > `ResultSubpartition#notifySubpartitionConsumed` which further feedback to > parent `ResultPartition` layer via `onConsumedSubpartition`. Finally > `ResultPartition` could decide whether to release itself or not. > E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly > used for pipelined partition, it would release partition after reference > counter decreased to 0. For the case of `ResultPartition` which would be > generated for blocking partition by default, it would never be released after > notifying consumed. And the JM/scheduler would decide when to release > partition properly. > In addition, `InputChannel#notifySubpartitionConsumed` could also be removed > as a result of above. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition
zhijiang created FLINK-13478: Summary: Decouple two different release strategies in BoundedBlockingSubpartition Key: FLINK-13478 URL: https://issues.apache.org/jira/browse/FLINK-13478 Project: Flink Issue Type: Improvement Components: Runtime / Coordination, Runtime / Network Reporter: zhijiang Assignee: zhijiang We have two basic release strategies atm. One is based on consumption via network notification from consumer. The other is based on notification via RPC from JM/scheduler. But in current implementation of {{BoundedBlockingSubpartition}}, these two ways are coupled with each other. In detail, the network consumption notification could only close data file after the release RPC was triggered from JM/scheduler. Also for the release RPC it has to wait all the reader views really released before closing the data file. So the release RPC still relies on network notification to some extent. In order to make these two release strategies independent, if the release call is from JM/scheduler RPC, we could immediately release all the view readers and then close the data file as well. If the release is based on consumption notification, after all the view readers for one subpartition are released, the subpartition could further notify the parent {{ResultPartition}} which decides whether to release the whole partition or not. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader
[ https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13442: - Description: Currently the methods of `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in netty stack during releasing resources. As confirmed in FLINK-13245, in order to make this release logic simple and clean, we could remove the redundant `notifySubpartitionConsumed` from `NetworkSequenceViewReader` side, and also remove it from `ResultSubpartitionView` side. In the implementation of `ResultSubpartitionView#releaseAllResources` it would further notify the parent subpartition of consumed state via `ResultSubpartition#notifySubpartitionConsumed` which further feedback to parent `ResultPartition` layer via `onConsumedSubpartition`. Finally `ResultPartition` could decide whether to release itself or not. E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly used for pipelined partition, it would release partition after reference counter decreased to 0. For the case of `ResultPartition` which would be generated for blocking partition by default, it would never be released after notifying consumed. And the JM/scheduler would decide when to release partition properly. In addition, `InputChannel#notifySubpartitionConsumed` could also be removed as a result of above. was: Currently the methods of `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in netty stack during releasing resources. To make this release logic simple and clean, we could remove the redundant `notifySubpartitionConsumed` from `NetworkSequenceViewReader` side, and also remove it from `ResultSubpartitionView` side. In the implementation of `ResultSubpartitionView#releaseAllResources` it would further notify the parent subpartition of consumed state via `ResultSubpartition#notifySubpartitionConsumed` which further feedback to parent `ResultPartition` layer via `onConsumedSubpartition`. Finally `ResultPartition` could decide whether to release itself or not. E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly used for pipelined partition, it would release partition after reference counter decreased to 0. For the case of `ResultPartition` which would be generated for blocking partition by default, it would never be released after notifying consumed. And the JM/scheduler would decide when to release partition properly. In addition, `InputChannel#notifySubpartitionConsumed` could also be removed as a result of above. > Remove unnecessary notifySubpartitionConsumed method from view reader > -- > > Key: FLINK-13442 > URL: https://issues.apache.org/jira/browse/FLINK-13442 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > Currently the methods of > `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` > NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in > netty stack during releasing resources. > As confirmed in FLINK-13245, in order to make this release logic simple and > clean, we could remove the redundant `notifySubpartitionConsumed` from > `NetworkSequenceViewReader` side, and also remove it from > `ResultSubpartitionView` side. In the implementation of > `ResultSubpartitionView#releaseAllResources` it would further notify the > parent subpartition of consumed state via > `ResultSubpartition#notifySubpartitionConsumed` which further feedback to > parent `ResultPartition` layer via `onConsumedSubpartition`. Finally > `ResultPartition` could decide whether to release itself or not. > E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly > used for pipelined partition, it would release partition after reference > counter decreased to 0. For the case of `ResultPartition` which would be > generated for blocking partition by default, it would never be released after > notifying consumed. And the JM/scheduler would decide when to release > partition properly. > In addition, `InputChannel#notifySubpartitionConsumed` could also be removed > as a result of above. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader
zhijiang created FLINK-13442: Summary: Remove unnecessary notifySubpartitionConsumed method from view reader Key: FLINK-13442 URL: https://issues.apache.org/jira/browse/FLINK-13442 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently the methods of `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in netty stack during releasing resources. To make this release logic simple and clean, we could remove the redundant `notifySubpartitionConsumed` from `NetworkSequenceViewReader` side, and also remove it from `ResultSubpartitionView` side. In the implementation of `ResultSubpartitionView#releaseAllResources` it would further notify the parent subpartition of consumed state via `ResultSubpartition#notifySubpartitionConsumed` which further feedback to parent `ResultPartition` layer via `onConsumedSubpartition`. Finally `ResultPartition` could decide whether to release itself or not. E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly used for pipelined partition, it would release partition after reference counter decreased to 0. For the case of `ResultPartition` which would be generated for blocking partition by default, it would never be released after notifying consumed. And the JM/scheduler would decide when to release partition properly. In addition, `InputChannel#notifySubpartitionConsumed` could also be removed as a result of above. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13325) Add test case for FLINK-13249
[ https://issues.apache.org/jira/browse/FLINK-13325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16893462#comment-16893462 ] zhijiang commented on FLINK-13325: -- Yes, I already reviewed this PR several days ago. Waiting for the further feedback from [~srichter]. > Add test case for FLINK-13249 > - > > Key: FLINK-13325 > URL: https://issues.apache.org/jira/browse/FLINK-13325 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.9.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16893427#comment-16893427 ] zhijiang commented on FLINK-13245: -- Thanks for joining this discussion and giving helpful suggestions [~StephanEwen]. I agree with the further refactoring of above relevant processes for partition release in release-1.10. Considering the inconsistent situation mentioned above, actually it could work correctly by default now. Because the reference counter in `ReleaseOnConsumptionResultPartition` would only be used for the case of pipelined partitions by default. For the blocking partitions which would be consumed multiple times via creating multiple readers/views, the release should be controlled by the scheduler. But I agree the above logic is fragile and might cause inconsistent easily if not setting correctly. We could further refactor this issue later. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16890630#comment-16890630 ] zhijiang commented on FLINK-13245: -- [~azagrebin] I totally agree with your above point. The first priority should make the current fine grained recovery work in release-1.9. As we confirmed before, the consumption notification via network is just best-effort atm, not always reliable especially when the network connection is not established during consumer failed. I remembered that the JM would always release partitions while restarting producer tasks before, maybe I missed some parts while reviewing the relevant PRs of partition lifecycle feature. I am sorry for not giving this potential issue from network stack before. We could solve the current issue in FLINK-13771 now, and further address the semantics of partition release and refactor the network behavior if necessary in release-1.10. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889022#comment-16889022 ] zhijiang commented on FLINK-13245: -- After confirming the comments from [~Zentol] in PR, I found that for the case of `SlotCountExceedingParallelismTest` it would not generate `ReleaseOnConsumptionResultPartition` because the partition is blocking type. So the reference counter would not be used in `ResultPartition`, and the files for bounded blocking partition could be released finally via calling `TaskExecutorGateway#releasePartitions` based on `RegionPartitionReleaseStrategy`. The description of this jira ticket might not be accurate. In my local running this test in Mac system, it has no file leaks after finished. I am not sure why it has file leaks in windows system and I guess it might be relevant with mmap internal mechanism in different systems. I would double verify this test in windows system. My PR modifications seems only for the case of pipelined partition which is using `ReleaseOnConsumptionResultPartition`, then the call of `notifySubpartitionConsumed` would make the reference counter become 0 finally to trigger release. But for the pipelined partition it is no issues for persistent file. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888788#comment-16888788 ] zhijiang commented on FLINK-13245: -- Thanks for these further thoughts and I think it would make things more clearly after discussion. [~azagrebin] I could understand your above concerns. I agree that the current semantics of `CancelPartitionRequest/CloseRequest` are not very accurate, because they could indicate either successful consumption on consumer side or consumer task fails /any exceptions during consumption. Considering the concern of when to call `notifySubpartitionConsumed`, I think the current implementation is based on whether the producer receives the confirmable notification(Cancel/CloseRequest) from consumer side. If it receives any messages then it would call `notifySubpartitionConsumed` no matter with consumer finishes/fails. In the case of handling channel exception, it only happens in consumer locally, so it would not call `notifySubpartitionConsumed`. Also for the case of channel inactive, the producer could not distinguish whether it is caused by initiative close connection on consumer side or TM lost exceptionally, so it would not call ` notifySubpartitionConsumed`. For the first concern, we could provide the more definitely messages for clearly semantics of consumption successful/failed instead of current `Cancel/ClosePartition`. For the second concern we could also consider the proper way for handling messages/exception/inactive. But one precondition is that we should confirm the specific semantic of releasing partition based on consumption in partition management feature. Currently there are three strategies which could release partition: * partition release based on consumers confirmation via network * partition release based on JM notification * partition release when disconnection between TM/JM For the first strategy (partition release based on consumers confirmation): * We could define the network message as `ReleasePartition` instead of current `Cancel/ClosePartition`. Then it might not care about whether the consumer finishes/fails during consumption. The precondition for this way is reliable network notification, but actually we have no ack mechanism for such message in application layer. Even if consumer task fails before establishing the connection with producer, we still need rely on JM notification of releasing partitions of producers. * We could not provide specific semantic as now, and the current strategy is only coupling with existing mechanism in network stack. * The semantic actually could be defined clearly as partition release based on one successful consumption. And considering the implementation it could be done by both consumer notification and JM notification. In general I think we should consider how to define different release strategies which should provide specific semantics, and not caring about implementations when thinking about strategy. Actually any strategy could be implemented in multiple ways. E.g. the semantics might be divided into at-least once consumption, exactly-once consumption and at-most once consumption. After we confirm the specific semantics, then we would know how to refactor the current network stack considering implementation for certain strategy. It might need worth further re-architecture the partition release strategy in release-1.10, because the feature of interactive queries is difficult to expand another strategy based on current architecture. In order not to block current release, the existing modifications could solve the file leak issue I think. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count
[jira] [Commented] (FLINK-10672) Task stuck while writing output to flink
[ https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16887642#comment-16887642 ] zhijiang commented on FLINK-10672: -- [~ibzib] I think it is the case of back pressure. The producer is blocked in the process of `requestMemorySegment` because its produced partition data could not be consumed by downstream tasks. In order to trace the root issue, the key point is to find the proper downstream task which triggers the back pressure. E.G. for the topology of A-->B–>C—>D, if we found vertex A is blocked by `requestMemorySegment` long time, we can trace the state of upstream vertex in topology. If vertex B is also blocked, we could continue tracing the upstream until we find the vertex which is not blocked any more, assuming vertex C in this case. Then we further check which specific parallelism task in vertex C causes the above serious block. Such task has a feature that its inqueue buffer size is very high and its out queue size is low even empty, and the relevant metrics could help. If such task is found, we could further check its stack to confirm where it is stuck. Maybe you could get the root cause then, or you can provide further findings for us. > Task stuck while writing output to flink > > > Key: FLINK-10672 > URL: https://issues.apache.org/jira/browse/FLINK-10672 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.5.4 > Environment: OS: Debuan rodente 4.17 > Flink version: 1.5.4 > ||Key||Value|| > |jobmanager.heap.mb|1024| > |jobmanager.rpc.address|localhost| > |jobmanager.rpc.port|6123| > |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter| > |metrics.reporter.jmx.port|9250-9260| > |metrics.reporters|jmx| > |parallelism.default|1| > |rest.port|8081| > |taskmanager.heap.mb|1024| > |taskmanager.numberOfTaskSlots|1| > |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26| > > h1. Overview > ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap > Size||Flink Managed Memory|| > |43501|1|0|12|62.9 GB|922 MB|642 MB| > h1. Memory > h2. JVM (Heap/Non-Heap) > ||Type||Committed||Used||Maximum|| > |Heap|922 MB|575 MB|922 MB| > |Non-Heap|68.8 MB|64.3 MB|-1 B| > |Total|991 MB|639 MB|922 MB| > h2. Outside JVM > ||Type||Count||Used||Capacity|| > |Direct|3,292|105 MB|105 MB| > |Mapped|0|0 B|0 B| > h1. Network > h2. Memory Segments > ||Type||Count|| > |Available|3,194| > |Total|3,278| > h1. Garbage Collection > ||Collector||Count||Time|| > |G1_Young_Generation|13|336| > |G1_Old_Generation|1|21| >Reporter: Ankur Goenka >Priority: Major > Labels: beam > Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, > jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, > jstack_66985.log > > > I am running a fairly complex pipleline with 200+ task. > The pipeline works fine with small data (order of 10kb input) but gets stuck > with a slightly larger data (300kb input). > > The task gets stuck while writing the output toFlink, more specifically it > gets stuck while requesting memory segment in local buffer pool. The Task > manager UI shows that it has enough memory and memory segments to work with. > The relevant stack trace is > {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 > tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at (C/C++) 0x7fef201c7dae (Unknown Source) > at (C/C++) 0x7fef1f2aea07 (Unknown Source) > at (C/C++) 0x7fef1f241cd3 (Unknown Source) > at java.lang.Object.wait(Native Method) > - waiting on <0xf6d56450> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247) > - locked <0xf6d56450> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42) > at >
[jira] [Commented] (FLINK-10672) Task stuck while writing output to flink
[ https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16887201#comment-16887201 ] zhijiang commented on FLINK-10672: -- [~ibzib] which fix do you mean? In addition which flink version is used to cause above issue? I have not looked through this issue yet, and I would further check the jstack to confirm whether it has been solved already. > Task stuck while writing output to flink > > > Key: FLINK-10672 > URL: https://issues.apache.org/jira/browse/FLINK-10672 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.5.4 > Environment: OS: Debuan rodente 4.17 > Flink version: 1.5.4 > ||Key||Value|| > |jobmanager.heap.mb|1024| > |jobmanager.rpc.address|localhost| > |jobmanager.rpc.port|6123| > |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter| > |metrics.reporter.jmx.port|9250-9260| > |metrics.reporters|jmx| > |parallelism.default|1| > |rest.port|8081| > |taskmanager.heap.mb|1024| > |taskmanager.numberOfTaskSlots|1| > |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26| > > h1. Overview > ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap > Size||Flink Managed Memory|| > |43501|1|0|12|62.9 GB|922 MB|642 MB| > h1. Memory > h2. JVM (Heap/Non-Heap) > ||Type||Committed||Used||Maximum|| > |Heap|922 MB|575 MB|922 MB| > |Non-Heap|68.8 MB|64.3 MB|-1 B| > |Total|991 MB|639 MB|922 MB| > h2. Outside JVM > ||Type||Count||Used||Capacity|| > |Direct|3,292|105 MB|105 MB| > |Mapped|0|0 B|0 B| > h1. Network > h2. Memory Segments > ||Type||Count|| > |Available|3,194| > |Total|3,278| > h1. Garbage Collection > ||Collector||Count||Time|| > |G1_Young_Generation|13|336| > |G1_Old_Generation|1|21| >Reporter: Ankur Goenka >Priority: Major > Labels: beam > Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, > jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, > jstack_66985.log > > > I am running a fairly complex pipleline with 200+ task. > The pipeline works fine with small data (order of 10kb input) but gets stuck > with a slightly larger data (300kb input). > > The task gets stuck while writing the output toFlink, more specifically it > gets stuck while requesting memory segment in local buffer pool. The Task > manager UI shows that it has enough memory and memory segments to work with. > The relevant stack trace is > {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 > tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at (C/C++) 0x7fef201c7dae (Unknown Source) > at (C/C++) 0x7fef1f2aea07 (Unknown Source) > at (C/C++) 0x7fef1f241cd3 (Unknown Source) > at java.lang.Object.wait(Native Method) > - waiting on <0xf6d56450> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247) > - locked <0xf6d56450> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26) > at > org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230) > - locked <0xf6a60bd0> (a java.lang.Object) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32) > at > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139) > at >
[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885832#comment-16885832 ] zhijiang edited comment on FLINK-13245 at 7/16/19 4:16 AM: --- We have some previous assumptions that ResultSubpartitionView could be released individually, but all the subpartitions are released together via `ResultPartition/ResultPartitionManager`. After thinking it through, it might be reasonable to have both methods as {{ResultSubpartitionView#notifySubpartitionConsumed}} and {{ResultSubpartitionView#releaseAllResources}}, because they describe the different semantics. * `releaseAllResources` is used for releasing resources from ResultSubpartitionView aspect. The view is created by netty stack which is also responsible for triggering the release. In detail it has two scenarios to trigger release: One case is that netty channel inactive/exception as current {{PartitonRequestQueue#channelInactive}} and {{PartitionRequestQueue#exceptionCaught}} done. The other case is that when ResultSubpartitionView is actually consumed via {{NettyMessage#CannelPartitionRequest}}. * `notifySubpartitionConsumed` only indicates the ResultSubpartition/View consumed via {{CannelPartitionRequest}}, so we should call this method while handling the cancel message. For the case of channel exception/inactive, it does not always indicate the consumption semantic, so we should not call this method as current done in {{PartitionRequestQueue}}. It is up to {{JobMaster}} whether to release partition in the case of channel inactive/exception. For the streaming job if the consumer fails, the {{JobMaster}} would also cancel the producer task to release the whole {{ResultPartition}}. For the batch job of blocking partition, if the consumer TM exits to cause channel inactive, the {{ResultPartition}} might not need to be released. Overall, these two methods seem to decouple the release between {{ResultPartition}} and {{ResultSubpartitionView}}. So it makes sense to keep them as now, as long as we could handle the {{CannelPartitionRequest}} message correctly based on above modifications. [~azagrebin] was (Author: zjwang): We have some previous assumptions that ResultSubpartitionView could be released individually, but all the subpartitions are released together via `ResultPartition/ResultPartitionManager`. After thinking it through, it might be reasonable to have both methods as {{ResultSubpartitionView#notifySubpartitionConsumed}} and {{ResultSubpartitionView#releaseAllResources}}, because they describe the different semantics. * `releaseAllResources` is used for releasing resources from ResultSubpartitionView aspect. The view is created by netty stack which is also responsible for triggering the release. In detail it has two scenarios to trigger release: One case is that netty channel inactive/exception as current {{PartitonRequestQueue#channelInactive}} and {{PartitionRequestQueue#exceptionCaught}} done. The other case is that when ResultSubpartitionView is actually consumed via {{NettyMessage#CannelPartitionRequest}}. * `notifySubpartitionConsumed` only indicates the ResultSubpartition/View consumed via {{CannelPartitionRequest}}, so we should call this method while handling the cancel message. For the case of channel exception/inactive, it does not always indicate the consumption semantic, so we should not call this method as current done in {{PartitionRequestQueue}}. It is up to {{JobMaster}} whether to release partition in the case of channel inactive/exception. For the streaming job if the consumer fails, the {{JobMaster}} would also cancel the producer task to release the whole {{ResultPartition}}. For the batch job of blocking partition, if the consumer TM exits to cause channel inactive, the {{ResultPartition}} might not need to be released. Overall, these two methods seem to decouple the release between {{ResultPartition}} and {{ResultSubpartitionView}}. So it makes sense to keep them as now, as long as we could handle the {{CannelPartitionRequest}} message correctly based on above modifications. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a >
[jira] [Commented] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885832#comment-16885832 ] zhijiang commented on FLINK-13245: -- We have some previous assumptions that ResultSubpartitionView could be released individually, but all the subpartitions are released together via `ResultPartition/ResultPartitionManager`. After thinking it through, it might be reasonable to have both methods as {{ResultSubpartitionView#notifySubpartitionConsumed}} and {{ResultSubpartitionView#releaseAllResources}}, because they describe the different semantics. * `releaseAllResources` is used for releasing resources from ResultSubpartitionView aspect. The view is created by netty stack which is also responsible for triggering the release. In detail it has two scenarios to trigger release: One case is that netty channel inactive/exception as current {{PartitonRequestQueue#channelInactive}} and {{PartitionRequestQueue#exceptionCaught}} done. The other case is that when ResultSubpartitionView is actually consumed via {{NettyMessage#CannelPartitionRequest}}. * `notifySubpartitionConsumed` only indicates the ResultSubpartition/View consumed via {{CannelPartitionRequest}}, so we should call this method while handling the cancel message. For the case of channel exception/inactive, it does not always indicate the consumption semantic, so we should not call this method as current done in {{PartitionRequestQueue}}. It is up to {{JobMaster}} whether to release partition in the case of channel inactive/exception. For the streaming job if the consumer fails, the {{JobMaster}} would also cancel the producer task to release the whole {{ResultPartition}}. For the batch job of blocking partition, if the consumer TM exits to cause channel inactive, the {{ResultPartition}} might not need to be released. Overall, these two methods seem to decouple the release between {{ResultPartition}} and {{ResultSubpartitionView}}. So it makes sense to keep them as now, as long as we could handle the {{CannelPartitionRequest}} message correctly based on above modifications. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Issue Comment Deleted] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13245: - Comment: was deleted (was: [~azagrebin] I have not seen your latest comments before submitting above. I agree with your above two points. It is not consistent to remove the reader from `allReaders` if it is not in `availableReaders` for canceled partition. If we do not remove it from `allReader`, it still could be released while handling {{CloseRequest}} from last {{RemoteInputChannel}}. I think the above modifications could solve this issue based on existing mechanism. For the second problem I also ever found it seems a bit strange to release partition via circle call/dependency. That means network notification --> ResultPartition --> ResultPartitionManager --> ResultPartition --> {{ResultSubpartition}}. And I also considered integrating these methods of `releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we should do this refactoring at this point in release-1.9. I think it might be safe and proper to do this refactoring in next version 1.10. At this point we could fix the potential file leak to reuse the previous way and make less work.) > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885396#comment-16885396 ] zhijiang edited comment on FLINK-13245 at 7/15/19 4:47 PM: --- [~azagrebin] I have not seen your latest comments before submitting above. I agree with your above two points. It is not consistent to remove the reader from `allReaders` if it is not in `availableReaders` for canceled partition. If we do not remove it from `allReader`, it still could be released while handling {{CloseRequest}} from last {{RemoteInputChannel}}. I think the above modifications could solve this issue based on existing mechanism. For the second problem I also ever found it seems a bit strange to release partition via circle call/dependency. That means network notification --> ResultPartition --> ResultPartitionManager --> ResultPartition --> {{ResultSubpartition}}. And I also considered integrating these methods of `releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we should do this refactoring at this point in release-1.9. I think it might be safe and proper to do this refactoring in next version 1.10. At this point we could fix the potential file leak to reuse the previous way and make less work. was (Author: zjwang): [~azagrebin] I have not seen your latest comments before submitting above. I agree with your above two points. It is not consistent to remove the reader from `allReaders` if it is not in `availableReaders` for canceled partition. If we do not remove it from `allReader`, it still could be released while handling {{CloseRequest}} from last {{RemoteInputChannel}}. I think the above modifications could solve this issue based on existing mechanism. For the second problem I also ever found it seems a bit strange to release partition via circle call/dependency. That means network notification -> ResultPartition -> {{ResultPartitionManager}} -> ResultPartition -> {{ResultSubpartition}}. And I also considered integrating these methods of `releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we should do this refactoring at this point in release-1.9. I think it might be safe and proper to do this refactoring in next version 1.10. At this point we could fix the potential file leak to reuse the previous way and make less work. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885396#comment-16885396 ] zhijiang edited comment on FLINK-13245 at 7/15/19 4:46 PM: --- [~azagrebin] I have not seen your latest comments before submitting above. I agree with your above two points. It is not consistent to remove the reader from `allReaders` if it is not in `availableReaders` for canceled partition. If we do not remove it from `allReader`, it still could be released while handling {{CloseRequest}} from last {{RemoteInputChannel}}. I think the above modifications could solve this issue based on existing mechanism. For the second problem I also ever found it seems a bit strange to release partition via circle call/dependency. That means network notification -> ResultPartition -> {{ResultPartitionManager}} -> ResultPartition -> {{ResultSubpartition}}. And I also considered integrating these methods of `releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we should do this refactoring at this point in release-1.9. I think it might be safe and proper to do this refactoring in next version 1.10. At this point we could fix the potential file leak to reuse the previous way and make less work. was (Author: zjwang): [~azagrebin] I have not seen your latest comments before submitting above. I agree with your above two points. It is not consistent to remove the reader from `allReaders` if it is not in `availableReaders` for canceled partition. If we do not remove it from `allReader`, it still could be released while handling {{CloseRequest}} from last {{RemoteInputChannel}}. I think the above modifications could solve this issue based on existing mechanism. For the second problem I also ever found it seems a bit strange to release partition via circle call/dependency. That means network notification -> {{ResultPartition}} -> {{ResultPartitionManager}} -> {{ResultPartition}} -> {{ResultSubpartition}}. And I also considered integrating these methods of `releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we should do this refactoring at this point in release-1.9. I think it might be safe and proper to do this refactoring in next version 1.10. At this point we could fix the potential file leak to reuse the previous way and make less work. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885396#comment-16885396 ] zhijiang edited comment on FLINK-13245 at 7/15/19 4:44 PM: --- [~azagrebin] I have not seen your latest comments before submitting above. I agree with your above two points. It is not consistent to remove the reader from `allReaders` if it is not in `availableReaders` for canceled partition. If we do not remove it from `allReader`, it still could be released while handling {{CloseRequest}} from last {{RemoteInputChannel}}. I think the above modifications could solve this issue based on existing mechanism. For the second problem I also ever found it seems a bit strange to release partition via circle call/dependency. That means network notification -> {{ResultPartition}} -> {{ResultPartitionManager}} -> {{ResultPartition}} -> {{ResultSubpartition}}. And I also considered integrating these methods of `releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we should do this refactoring at this point in release-1.9. I think it might be safe and proper to do this refactoring in next version 1.10. At this point we could fix the potential file leak to reuse the previous way and make less work. was (Author: zjwang): [~azagrebin] I have not seen your latest comments before submitting above. I agree with your above two points. It is not consistent to remove the reader from `allReaders` if it is not in `availableReaders` for canceled partition. If we do not remove it from `allReader`, it still could be released while handling `CloseRequest` from `RemoteInputChannel`. I think the above modifications could solve this issue based on existing mechanism. For the second problem I also ever found it seems a bit strange to release partition via circle call/dependency. That means network notification -> ResultPartition- > ResultPartitionManager -> ResultPartition -> ResultSubpartition. And I also considered integrating these methods of `releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we should do this refactoring at this point in release-1.9. I think it might be safe and proper to do this refactoring in next version 1.10. At this point we could fix the potential file leak to reuse the previous way and make less work. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885396#comment-16885396 ] zhijiang edited comment on FLINK-13245 at 7/15/19 4:43 PM: --- [~azagrebin] I have not seen your latest comments before submitting above. I agree with your above two points. It is not consistent to remove the reader from `allReaders` if it is not in `availableReaders` for canceled partition. If we do not remove it from `allReader`, it still could be released while handling `CloseRequest` from `RemoteInputChannel`. I think the above modifications could solve this issue based on existing mechanism. For the second problem I also ever found it seems a bit strange to release partition via circle call/dependency. That means network notification -> ResultPartition- > ResultPartitionManager -> ResultPartition -> ResultSubpartition. And I also considered integrating these methods of `releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we should do this refactoring at this point in release-1.9. I think it might be safe and proper to do this refactoring in next version 1.10. At this point we could fix the potential file leak to reuse the previous way and make less work. was (Author: zjwang): [~azagrebin] I have not seen your latest comments before I submitting above. I agree with your above two points. It is not consistent to remove the reader from `allReaders` if it is not in `availableReaders` for canceled partition. If we do not remove it from `allReader`, it still could be released while handling `CloseRequest` from `RemoteInputChannel`. I think the above modifications could solve this issue based on existing mechanism. For the second problem I also ever found it seems a bit strange to release partition via circle call/dependency. That means network notification -> ResultPartition -> ResultPartitionManager -> ResultPartition -> ResultSubpartition. And I also considered integrating these methods of `releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we should do this refactoring at this point in release-1.9. I think it might be safe and proper to do this refactoring in next version 1.10. At this point we could fix the potential file leak to reuse the previous way and make less work. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885396#comment-16885396 ] zhijiang commented on FLINK-13245: -- [~azagrebin] I have not seen your latest comments before I submitting above. I agree with your above two points. It is not consistent to remove the reader from `allReaders` if it is not in `availableReaders` for canceled partition. If we do not remove it from `allReader`, it still could be released while handling `CloseRequest` from `RemoteInputChannel`. I think the above modifications could solve this issue based on existing mechanism. For the second problem I also ever found it seems a bit strange to release partition via circle call/dependency. That means network notification -> ResultPartition -> ResultPartitionManager -> ResultPartition -> ResultSubpartition. And I also considered integrating these methods of `releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we should do this refactoring at this point in release-1.9. I think it might be safe and proper to do this refactoring in next version 1.10. At this point we could fix the potential file leak to reuse the previous way and make less work. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885371#comment-16885371 ] zhijiang commented on FLINK-13245: -- Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] I think the above idea of above modifications makes sense, because the `availableReader` is not always equivalent to `allReaders`, then it is proper to find the canceled view reader from `allReaders` instead. This issue also exists in previous {{SpillableSubpartition}} which actually uses memory type in {{SlotCountExceedingParallelismTest,}} so we could not find this potential bug then. In detail, we should also call `toRelease.notifySubpartitionConsumed` before calling `toRelease.releaseAllResources` in above modifications. Otherwise the reference counter in {{ReleaseOnConsumptionResultPartition}} would not decrease to zero and really release partition via {{ResultPartitionManager}}. I would submit the PR and add some unite tests later tomorrow. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885371#comment-16885371 ] zhijiang edited comment on FLINK-13245 at 7/15/19 4:22 PM: --- Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] I think the idea of above modifications makes sense, because the `availableReader` is not always equivalent to `allReaders`, then it is proper to find the canceled view reader from `allReaders` instead. This issue also exists in previous {{SpillableSubpartition}} which actually uses memory type in {{SlotCountExceedingParallelismTest,}} so we could not find this potential bug then. In detail, we should also call `toRelease.notifySubpartitionConsumed` before calling `toRelease.releaseAllResources` in above modifications. Otherwise the reference counter in {{ReleaseOnConsumptionResultPartition}} would not decrease to zero and really release partition via {{ResultPartitionManager}}. I would submit the PR and add some unite tests later tomorrow. was (Author: zjwang): Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] I think the above idea of above modifications makes sense, because the `availableReader` is not always equivalent to `allReaders`, then it is proper to find the canceled view reader from `allReaders` instead. This issue also exists in previous {{SpillableSubpartition}} which actually uses memory type in {{SlotCountExceedingParallelismTest,}} so we could not find this potential bug then. In detail, we should also call `toRelease.notifySubpartitionConsumed` before calling `toRelease.releaseAllResources` in above modifications. Otherwise the reference counter in {{ReleaseOnConsumptionResultPartition}} would not decrease to zero and really release partition via {{ResultPartitionManager}}. I would submit the PR and add some unite tests later tomorrow. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Issue Comment Deleted] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13245: - Comment: was deleted (was: Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] After reviewing the relevant codes, it actually has two issues here: * Considering handling the {{CancelPartitionRequest}} issue, I guess we might have two assumptions before. One assumption is that `availableReaders` is always equivalent to `allReaders`, but now this assumption is not right because of credit. The other assumption is that we make the logic of {{CancelPartitionRequest}} as best-effort way, because the last {{RemoteInputChannel}} would send {{CloseRequest}} message, then the {{PartitionRequestQueue}} would always release all the view readers in `allReaders` in final. The details are in {{PartitionRequestQueue#close}}. If so, it seems no problem even though the previous {{CancelPartitionRequest}} does not work sometimes. But I think it would be more proper/strict if we handle the {{CancelPartitionRequest}} via `allReaders` instead. * Another root problem is that the {{ReleaseOnConsumptionResultPartition#onConsumedSubpartition}} is not working for the implementation of {{BoundedBlockingSubpartition}}. {{onConsumedSubpartition}} would be triggered from consumer notification via network, but it is only feasible for {{RemoteInputChannel}}. For the case of {{LocalInputChannel}}, it would call {{ResultSubpartitionView#releaseAllResources}} directly. So in the {{SlotCountExceedingParallelismTest}} there are some local channels which cause the reference counter in {{ReleaseOnConsumptionResultPartition}} would never decrease to 0, then it would never release the {{BoundedBlockingSubpartition}}. But in the process of {{BoundedBlockingSubpartition#releaseReaderReference}} it would check the `isReleased` tag before deleting the file. So it has the conflict here. In fact this issue already exists in previous {{SpillableSubpartition}} , but in the test the spillable subpartition would always use the memory way then it hides the potential problem.) > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Issue Comment Deleted] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13245: - Comment: was deleted (was: For the first issue, I think it is not very important currently, because we also has the {{PartitionRequestQueue#close}} to release all the view readers. For the second issue, it needs to fixed and I think we should still stick to the way of releasing partition in whole via {{ResultPartitionManager}}, that means we could not release one subpartition if the reference counter is not becoming zero in {{ReleaseOnConsumptionResultPartition}}. In detail: * In the process of {{BoundedBlockingSubpartitionReader#releaseAllResources}}, we should also call the {{parent.onConsumedSubpartition()}} as {{PipelinedSubpartitionView}} done in order to notify the {{ReleaseOnConsumptionResultPartition}} to decrease the reference counter. [~Zentol] [~azagrebin] [~StephanEwen]) > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885280#comment-16885280 ] zhijiang commented on FLINK-13245: -- For the first issue, I think it is not very important currently, because we also has the {{PartitionRequestQueue#close }}to release all the view readers. For the second issue, it needs to fixed and I think we should still stick to the way of releasing partition in whole via {{ResultPartitionManager}}, that means we could not release one subpartition if the reference counter is not becoming zero in {{ReleaseOnConsumptionResultPartition}}. In detail: * In the process of {{BoundedBlockingSubpartitionReader#releaseAllResources}}, we should also call the {{parent.onConsumedSubpartition()}} as {{PipelinedSubpartitionView}} done in order to notify the {{ReleaseOnConsumptionResultPartition}} to decrease the reference counter. [~Zentol] [~azagrebin] [~StephanEwen] > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885280#comment-16885280 ] zhijiang edited comment on FLINK-13245 at 7/15/19 2:27 PM: --- For the first issue, I think it is not very important currently, because we also has the {{PartitionRequestQueue#close}} to release all the view readers. For the second issue, it needs to fixed and I think we should still stick to the way of releasing partition in whole via {{ResultPartitionManager}}, that means we could not release one subpartition if the reference counter is not becoming zero in {{ReleaseOnConsumptionResultPartition}}. In detail: * In the process of {{BoundedBlockingSubpartitionReader#releaseAllResources}}, we should also call the {{parent.onConsumedSubpartition()}} as {{PipelinedSubpartitionView}} done in order to notify the {{ReleaseOnConsumptionResultPartition}} to decrease the reference counter. [~Zentol] [~azagrebin] [~StephanEwen] was (Author: zjwang): For the first issue, I think it is not very important currently, because we also has the {{PartitionRequestQueue#close }}to release all the view readers. For the second issue, it needs to fixed and I think we should still stick to the way of releasing partition in whole via {{ResultPartitionManager}}, that means we could not release one subpartition if the reference counter is not becoming zero in {{ReleaseOnConsumptionResultPartition}}. In detail: * In the process of {{BoundedBlockingSubpartitionReader#releaseAllResources}}, we should also call the {{parent.onConsumedSubpartition()}} as {{PipelinedSubpartitionView}} done in order to notify the {{ReleaseOnConsumptionResultPartition}} to decrease the reference counter. [~Zentol] [~azagrebin] [~StephanEwen] > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885251#comment-16885251 ] zhijiang commented on FLINK-13245: -- Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] After reviewing the relevant codes, it actually has two issues here: * Considering handling the {{CancelPartitionRequest}} issue, I guess we might have two assumptions before. One assumption is that `availableReaders` is always equivalent to `allReaders`, but now this assumption is not right because of credit. The other assumption is that we make the logic of {{CancelPartitionRequest}} as best-effort way, because the last {{RemoteInputChannel}} would send {{CloseRequest}} message, then the {{PartitionRequestQueue}} would always release all the view readers in `allReaders` in final. The details are in {{PartitionRequestQueue#close}}. If so, it seems no problem even though the previous {{CancelPartitionRequest}} does not work sometimes. But I think it would be more proper/strict if we handle the {{CancelPartitionRequest}} via `allReaders` instead. * Another root problem is that the {{ReleaseOnConsumptionResultPartition#onConsumedSubpartition}} is not working for the implementation of {{BoundedBlockingSubpartition}}. {{onConsumedSubpartition}} would be triggered from consumer notification via network, but it is only feasible for {{RemoteInputChannel}}. For the case of {{LocalInputChannel}}, it would call {{ResultSubpartitionView#releaseAllResources}} directly. So in the {{SlotCountExceedingParallelismTest}} there are some local channels which cause the reference counter in {{ReleaseOnConsumptionResultPartition}} would never decrease to 0, then it would never release the {{BoundedBlockingSubpartition}}. But in the process of {{BoundedBlockingSubpartition#releaseReaderReference}} it would check the `isReleased` tag before deleting the file. So it has the conflict here. In fact this issue already exists in previous {{SpillableSubpartition}} , but in the test the spillable subpartition would always use the memory way then it hides the potential problem. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Issue Comment Deleted] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13245: - Comment: was deleted (was: Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] After reviewing the relevant codes, it actually has two issues here: * Considering handling the {{CancelPartitionRequest}} issue, I guess we might have two assumptions before. One assumption is that `availableReaders` is always equivalent to `allReaders`, but now this assumption is not right because of credit. The other assumption is that we make the logic of {{CancelPartitionRequest}} as best-effort way, because the last {{RemoteInputChannel}} would send {{CloseRequest}} message, then the {{PartitionRequestQueue}} would always release all the view readers in `allReaders` in final. The details are in {{PartitionRequestQueue#close}}. If so, it seems no problem even though the previous {{CancelPartitionRequest}} does not work sometimes. But I think it would be more proper/strict if we handle the {{CancelPartitionRequest}} via `allReaders` instead. * Another root problem is that the {{ReleaseOnConsumptionResultPartition#onConsumedSubpartition}} is not working for the implementation of {{BoundedBlockingSubpartition}}. {{onConsumedSubpartition}} would be triggered from consumer notification via network, but it is only feasible for {{RemoteInputChannel}}. For the case of {{LocalInputChannel}}, it would call {{ResultSubpartitionView#releaseAllResources}} directly. So in the {{SlotCountExceedingParallelismTest}} there are some local channels which cause the reference counter in {{ReleaseOnConsumptionResultPartition}} would never decrease to 0, then it would never release the {{BoundedBlockingSubpartition}}. But in the process of {{BoundedBlockingSubpartition#releaseReaderReference}} it would check the `isReleased` tag before deleting the file. So it has the conflict here. In fact this issue already exists in previous {{SpillableSubpartition}} , but in the test the spillable subpartition would always use the memory way then it hides the potential problem.) > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885241#comment-16885241 ] zhijiang edited comment on FLINK-13245 at 7/15/19 1:46 PM: --- Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] After reviewing the relevant codes, it actually has two issues here: * Considering handling the {{CancelPartitionRequest}} issue, I guess we might have two assumptions before. One assumption is that `availableReaders` is always equivalent to `allReaders`, but now this assumption is not right because of credit. The other assumption is that we make the logic of {{CancelPartitionRequest}} as best-effort way, because the last {{RemoteInputChannel}} would send {{CloseRequest}} message, then the {{PartitionRequestQueue}} would always release all the view readers in `allReaders` in final. The details are in {{PartitionRequestQueue#close}}. If so, it seems no problem even though the previous {{CancelPartitionRequest}} does not work sometimes. But I think it would be more proper/strict if we handle the {{CancelPartitionRequest}} via `allReaders` instead. * Another root problem is that the {{ReleaseOnConsumptionResultPartition#onConsumedSubpartition}} is not working for the implementation of {{BoundedBlockingSubpartition}}. {{onConsumedSubpartition}} would be triggered from consumer notification via network, but it is only feasible for {{RemoteInputChannel}}. For the case of {{LocalInputChannel}}, it would call {{ResultSubpartitionView#releaseAllResources}} directly. So in the {{SlotCountExceedingParallelismTest}} there are some local channels which cause the reference counter in {{ReleaseOnConsumptionResultPartition}} would never decrease to 0, then it would never release the {{BoundedBlockingSubpartition}}. But in the process of {{BoundedBlockingSubpartition#releaseReaderReference}} it would check the `isReleased` tag before deleting the file. So it has the conflict here. In fact this issue already exists in previous {{SpillableSubpartition}} , but in the test the spillable subpartition would always use the memory way then it hides the potential problem. was (Author: zjwang): Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] After reviewing the relevant codes, it actually has two issues here: * Considering handling the {{CancelPartitionRequest}} issue, I guess we might have two assumptions before. One assumption is that `availableReaders` is always equivalent to `allReaders`, but now this assumption is not right because of credit. The other assumption is that we make the logic of {{CancelPartitionRequest}} as best-effort way, because the last {{RemoteInputChannel}} would send {{CloseRequest}} message, then the {{PartitionRequestQueue}} would always release all the view readers in `allReaders` in final. The details are in {{PartitionRequestQueue#close}}. If so, it seems no problem even though the previous {{CancelPartitionRequest}} does not work sometimes. But I think it would be more proper/strict if we handle the {{CancelPartitionRequest}} via `allReaders` instead. * Another root problem is that the {{ReleaseOnConsumptionResultPartition#onConsumedSubpartition}} is not working for the implementation of {{BoundedBlockingSubpartition}}. {{onConsumedSubpartition}} would be triggered from consumer notification via network, but it is only feasible for {{RemoteInputChannel}}. For the case of {{LocalInputChannel}}, it would call {{ResultSubpartitionView#releaseAllResources}} directly. So in the {{SlotCountExceedingParallelismTest}} there are some local channels which cause the reference counter in {{ReleaseOnConsumptionResultPartition}} would never decrease to 0, then it would never release the {{BoundedBlockingSubpartition}}. But in the process of {{BoundedBlockingSubpartition#releaseReaderReference}} it would check the `isReleased` tag before deleting the file. So it has the conflict here. In fact, this issue already exists in previous \{{SpillableSubpartition}} , but in the test the spoilable subpartition would always use the memory way then it hides the potential problem. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've
[jira] [Commented] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885241#comment-16885241 ] zhijiang commented on FLINK-13245: -- Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] After reviewing the relevant codes, it actually has two issues here: * Considering handling the {{CancelPartitionRequest}} issue, I guess we might have two assumptions before. One assumption is that `availableReaders` is always equivalent to `allReaders`, but now this assumption is not right because of credit. The other assumption is that we make the logic of {{CancelPartitionRequest}} as best-effort way, because the last {{RemoteInputChannel}} would send {{CloseRequest}} message, then the {{PartitionRequestQueue}} would always release all the view readers in `allReaders` in final. The details are in {{PartitionRequestQueue#close}}. If so, it seems no problem even though the previous {{CancelPartitionRequest}} does not work sometimes. But I think it would be more proper/strict if we handle the {{CancelPartitionRequest}} via `allReaders` instead. * Another root problem is that the {{ReleaseOnConsumptionResultPartition#onConsumedSubpartition}} is not working for the implementation of {{BoundedBlockingSubpartition}}. {{onConsumedSubpartition}} would be triggered from consumer notification via network, but it is only feasible for {{RemoteInputChannel}}. For the case of {{LocalInputChannel}}, it would call {{ResultSubpartitionView#releaseAllResources}} directly. So in the {{SlotCountExceedingParallelismTest}} there are some local channels which cause the reference counter in {{ReleaseOnConsumptionResultPartition}} would never decrease to 0, then it would never release the {{BoundedBlockingSubpartition}}. But in the process of {{BoundedBlockingSubpartition#releaseReaderReference}} it would check the `isReleased` tag before deleting the file. So it has the conflict here. In fact, this issue already exists in previous \{{SpillableSubpartition}} , but in the test the spoilable subpartition would always use the memory way then it hides the potential problem. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Issue Comment Deleted] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13245: - Comment: was deleted (was: Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] After reviewing the relevant codes, it actually has two issues here: * Considering handling the `CancelPartitionRequest` issue, I guess we might have two assumptions before. One assumption is that `availableReaders` is always equivalent to `allReaders`, but now this assumption is not right because of credit. The other assumption is that we make the logic of `CancelPartitionRequest` as best-effort way, because the last `RemoteInputChannel` would send `CloseRequest` message, then the `PartitionRequestQueue` would always release all the view readers in `allReaders` in final. The details are in `PartitionRequestQueue#close`. If so, it seems no problem even though the previous `CancelPartitionRequest` does not work. But I think it would be more proper/strict if we handle the ` CancelPartitionRequest` via `allReaders` instead. * Another critical problem is that the notification from consumer side of releasing subpartition is not actually making sense for the implementation of `BoundedBlockingSubpartition`, because it would check `isReleased` tag during `releaseReaderReference`, and this tag could only be changed from producer call. In other words, for blocking case when to release partition is determined by JobMaster based on PartitionReleaseStrategy. So even though we solve the above first issue, the file for blocking partition still could not be deleted after release notification from network. In addition, this issue is actually hidden in previous `SpillableSubpartition` because in this test the partition is only using the memory type not spilled file.) > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885076#comment-16885076 ] zhijiang edited comment on FLINK-13245 at 7/15/19 10:46 AM: Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] After reviewing the relevant codes, it actually has two issues here: * Considering handling the `CancelPartitionRequest` issue, I guess we might have two assumptions before. One assumption is that `availableReaders` is always equivalent to `allReaders`, but now this assumption is not right because of credit. The other assumption is that we make the logic of `CancelPartitionRequest` as best-effort way, because the last `RemoteInputChannel` would send `CloseRequest` message, then the `PartitionRequestQueue` would always release all the view readers in `allReaders` in final. The details are in `PartitionRequestQueue#close`. If so, it seems no problem even though the previous `CancelPartitionRequest` does not work. But I think it would be more proper/strict if we handle the ` CancelPartitionRequest` via `allReaders` instead. * Another critical problem is that the notification from consumer side of releasing subpartition is not actually making sense for the implementation of `BoundedBlockingSubpartition`, because it would check `isReleased` tag during `releaseReaderReference`, and this tag could only be changed from producer call. In other words, for blocking case when to release partition is determined by JobMaster based on PartitionReleaseStrategy. So even though we solve the above first issue, the file for blocking partition still could not be deleted after release notification from network. In addition, this issue is actually hidden in previous `SpillableSubpartition` because in this test the partition is only using the memory type not spilled file. was (Author: zjwang): Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] After reviewing the relevant codes, it actually has two issues here: * Considering handling the `CancelPartitionRequest` issue, I guess we might have two assumptions before. One assumption is that `availableReaders` is always equivalent to `allReaders`, but now this assumption is not right because of credit. The other assumption is that we make the logic of `CancelPartitionRequest` as best-effort way, because the last `RemoteInputChannel` would send `CloseRequest` message, then the `PartitionRequestQueue` would also release all the view readers in `allReaders`. The details are in `PartitionRequestQueue#close`. If so, it seems no problem even though the previous `CancelPartitionRequest` does not work. But I think it would be more proper/strict if we handle the ` CancelPartitionRequest` via `allReaders` instead. * Another critical problem is that the notification from consumer side of releasing subpartition is not actually making sense for the implementation of `BoundedBlockingSubpartition`, because it would check `isReleased` tag during `releaseReaderReference`, and this tag could only be changed from producer call. In other words, for blocking case when to release partition is determined by JobMaster based on PartitionReleaseStrategy. So even though we solve the above first issue, the file for blocking partition still could not be deleted after release notification from network. In addition, this issue is actually hidden in previous `SpillableSubpartition` because in this test the partition is only using the memory type not spilled file. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently
[jira] [Commented] (FLINK-13245) Network stack is leaking files
[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885076#comment-16885076 ] zhijiang commented on FLINK-13245: -- Thanks for finding this potential issue and the investigation! [~Zentol] [~azagrebin] After reviewing the relevant codes, it actually has two issues here: * Considering handling the `CancelPartitionRequest` issue, I guess we might have two assumptions before. One assumption is that `availableReaders` is always equivalent to `allReaders`, but now this assumption is not right because of credit. The other assumption is that we make the logic of `CancelPartitionRequest` as best-effort way, because the last `RemoteInputChannel` would send `CloseRequest` message, then the `PartitionRequestQueue` would also release all the view readers in `allReaders`. The details are in `PartitionRequestQueue#close`. If so, it seems no problem even though the previous `CancelPartitionRequest` does not work. But I think it would be more proper/strict if we handle the ` CancelPartitionRequest` via `allReaders` instead. * Another critical problem is that the notification from consumer side of releasing subpartition is not actually making sense for the implementation of `BoundedBlockingSubpartition`, because it would check `isReleased` tag during `releaseReaderReference`, and this tag could only be changed from producer call. In other words, for blocking case when to release partition is determined by JobMaster based on PartitionReleaseStrategy. So even though we solve the above first issue, the file for blocking partition still could not be deleted after release notification from network. In addition, this issue is actually hidden in previous `SpillableSubpartition` because in this test the partition is only using the memory type not spilled file. > Network stack is leaking files > -- > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Blocker > Fix For: 1.9.0 > > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Closed] (FLINK-13100) Fix the bug of throwing IOException while FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang closed FLINK-13100. Resolution: Resolved Fixed in 2c8ee78f714af4995f618c768661f5b638cd3025 > Fix the bug of throwing IOException while FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Blocker > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13235) Change the Netty default transport mode to auto
zhijiang created FLINK-13235: Summary: Change the Netty default transport mode to auto Key: FLINK-13235 URL: https://issues.apache.org/jira/browse/FLINK-13235 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang The current default config for "taskmanager.net.transport" in NettyShuffleEnvironmentOptions is "NIO". In order to use "EPOLL" mode which has better performance and is recommended when available, we could change the default config as "AUTO". Then the "NIO" mode is used as a fallback. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Closed] (FLINK-13141) Remove getBufferSize method from BufferPoolFactory
[ https://issues.apache.org/jira/browse/FLINK-13141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang closed FLINK-13141. Resolution: Resolved Fixed in b6563e385263beb556ce19855ca6dfdbb7f6c853 > Remove getBufferSize method from BufferPoolFactory > -- > > Key: FLINK-13141 > URL: https://issues.apache.org/jira/browse/FLINK-13141 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > This is just a refactor work to make the interfacer of BufferPoolFactory more > simple and clean. > BufferPoolFactory#getBufferSize is only used for creating subpartitions in > ResultPartitionFactory. We could pass the network buffer size from > NettyShuffleEnvironmentConfiguration while constructing the > ResultPartitionFactory, then the interface method getBufferSize could be > removed form BufferPoolFactory. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13100) Fix the bug of throwing IOException while FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13100: - Summary: Fix the bug of throwing IOException while FileBufferReader#nextBuffer (was: Fix the bug of throwing IOException during FileBufferReader#nextBuffer) > Fix the bug of throwing IOException while FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Blocker > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13100) Fix the bug of throwing IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13100: - Summary: Fix the bug of throwing IOException during FileBufferReader#nextBuffer (was: Fix the unexpected IOException during FileBufferReader#nextBuffer) > Fix the bug of throwing IOException during FileBufferReader#nextBuffer > -- > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Blocker > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang reassigned FLINK-13100: Assignee: zhijiang > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881029#comment-16881029 ] zhijiang commented on FLINK-13100: -- Yes, the ITCase is also necessary and I think it should already have IT cases for blocking partitions before. The key problem is we have two different implementations for blocking partitions which would be used in real practice. But the selection is based on whether it is 32-bit or 64-bit system automatically, then the way for 32-bit might never be touched when executing IT cases before. So I think we might need to support the file-file way configurable in previous IT cases. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881003#comment-16881003 ] zhijiang commented on FLINK-13100: -- [~pnowojski] the previous deadlock was ever happened on our side and we fixed it internally in Alibaba. Considering the new BoundedBlockingSubpartition only focusing on mmap way at the first version, so we were not caring about this issue then. Now the 32-bit system could touch the way of file-file, then it is very easy to trigger this problem now when the flush is pending as I described above. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880999#comment-16880999 ] zhijiang commented on FLINK-13100: -- In netty internal implementation of writeAndFlush(), the write operation could always be done but the flink buffer is not released yet. The flush operation could only be done after write operation when the channel interest ops include SelectionKey.OP_WRITE, that means the socket cache has enough space to hold the flushed message, otherwise the flush operation would be done with following writeAndFlush(). In summary, the channel future of writeAndFlush() done only indicates the write done, the flush is not always done. But the flink buffer is only recycled after flush done based on zero-copy improvement in netty stack. So it would cause this issue. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Fix the calculation of backlog in PipelinedSubpartition
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880499#comment-16880499 ] zhijiang commented on FLINK-11082: -- Yes, I also thought it was not an issue. > Fix the calculation of backlog in PipelinedSubpartition > --- > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.5.6, 1.7.1 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The backlog of subpartition should indicate how many buffers are consumable, > then the consumer could feedback the corresponding credits for transporting > these buffers. But in current PipelinedSubpartitionimplementation, the > backlog is increased by 1 when a BufferConsumer is added into > PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed > from PipelinedSubpartition. So the backlog only reflects how many buffers are > retained in PipelinedSubpartition, which is not always equivalent to the > number of consumable buffers. > The backlog inconsistency might result in floating buffers misdistribution on > consumer side, because the consumer would request floating buffers based on > backlog value, then one floating buffer might not be used in > RemoteInputChannel long time after requesting. > Considering the solution, the last buffer in PipelinedSubpartition could only > be consumable in the case of flush triggered or partition finished. So we > could calculate the backlog precisely based on partition flushed/finished > conditions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880465#comment-16880465 ] zhijiang edited comment on FLINK-13100 at 7/8/19 3:30 PM: -- Yes, the previous SpilledSubpartitionView really has the deadlock issue based on two buffers. And now we have the similar issue as before, but throw the IOException instead. Your understanding of above two parts is right. But there exists another case: * When the view is created, it would be enqueued into the netty thread loop at first time, because it has both data and credit now as you mentioned. * Then a buffer is fetched from view and writeAndFlush() is called. If it is still available in the queue, that means it must has credit because next buffer is always available for reading ahead. For this case it has no problem, because we only wait the previous writeAndFlush() done to trigger the next one if current queue is not empty. * But if it is not available (no credit) when writeAndFlush() is called, then the queue is empty in netty thread loop. Before the future of writeAndFlush() is done, the netty thread could still process the AddCredit message which would make the view become available again, then it would be added into queue to trigger the next writeAndFlush(). That means the previous buffer is not recycled but the next writeAndFlush() is also triggered to cause the problem. The process might be like this : first writeAndFlush() pending -> addCredit(trigger second writeAndFlush) -> finish the first writeAndFlush() to recycle buffer. * I think it might be caused by the improvement of reusing flink buffer in netty stack from release-1.5. We could break writeAndFlush() into write and flush two processes. In the before when the write process finishes, the flink buffer is copied into netty internal ByteBuffer to be recycled then, so it would not cause problem even though the second writeAndFlush is triggered before first pending done. But now the write process would still reference the flink buffer in netty stack until the flush is done. I would try to mock this process in relevant unit tests to verify and I might submit this test tomorrow for understanding it easily. was (Author: zjwang): Yes, the previous SpilledSubpartitionView really has the deadlock issue based on two buffers. And now we have the similar issue as before, but throw the IOException instead. Your understanding of above two parts is right. But there exists another case: * When the view is created, it would be enqueued into the netty thread loop at first time, because it has both data and credit now as you mentioned. * Then a buffer is fetched from view and writeAndFlush() is called. If it is still available in the queue, that means it must has credit because next buffer is always available for reading ahead. For this case it has no problem, because we only wait the previous writeAndFlush() done to trigger the next one if current queue is not empty. * But if it is not available (no credit) when writeAndFlush() is called, then the queue is empty in netty thread loop. Before the future of writeAndFlush() is done, the netty thread could still process the AddCredit message which would make the view become available again, then it would be added into queue to trigger the next writeAndFlush(). That means the previous buffer is not recycled but the next write is also triggered to cause the problem. The process might be like this : first writeAndFlush() pending -> addCredit(trigger second writeAndFlush) -> finish the first writeAndFlush() to recycle buffer. * I think it might be caused by the improvement of reusing flink buffer in netty stack from release-1.5. We could break writeAndFlush() into write and flush two processes. In the before when the write process finishes, the flink buffer is copied into netty internal ByteBuffer to be recycled then, so it would not cause problem even though the second writeAndFlush is triggered before first pending done. But now the write process would still reference the flink buffer in netty stack until the flush is done. I would try to mock this process in relevant unit tests to verify and I might submit this test tomorrow for understanding it easily. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled
[jira] [Comment Edited] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880465#comment-16880465 ] zhijiang edited comment on FLINK-13100 at 7/8/19 3:29 PM: -- Yes, the previous SpilledSubpartitionView really has the deadlock issue based on two buffers. And now we have the similar issue as before, but throw the IOException instead. Your understanding of above two parts is right. But there exists another case: * When the view is created, it would be enqueued into the netty thread loop at first time, because it has both data and credit now as you mentioned. * Then a buffer is fetched from view and writeAndFlush() is called. If it is still available in the queue, that means it must has credit because next buffer is always available for reading ahead. For this case it has no problem, because we only wait the previous writeAndFlush() done to trigger the next one if current queue is not empty. * But if it is not available (no credit) when writeAndFlush() is called, then the queue is empty in netty thread loop. Before the future of writeAndFlush() is done, the netty thread could still process the AddCredit message which would make the view become available again, then it would be added into queue to trigger the next writeAndFlush(). That means the previous buffer is not recycled but the next write is also triggered to cause the problem. The process might be like this : first writeAndFlush() pending -> addCredit(trigger second writeAndFlush) -> finish the first writeAndFlush() to recycle buffer. * I think it might be caused by the improvement of reusing flink buffer in netty stack from release-1.5. We could break writeAndFlush() into write and flush two processes. In the before when the write process finishes, the flink buffer is copied into netty internal ByteBuffer to be recycled then, so it would not cause problem even though the second writeAndFlush is triggered before first pending done. But now the write process would still reference the flink buffer in netty stack until the flush is done. I would try to mock this process in relevant unit tests to verify and I might submit this test tomorrow for understanding it easily. was (Author: zjwang): Yes, the previous SpilledSubpartitionView really has the deadlock issue based on two buffers. And now we have the similar issue as before, but throw the IOException instead. Your understanding of above two parts is right. But there exists another case: * When the view is created, it would be enqueued into the netty thread loop at first time, because it has both data and credit now as you mentioned. * Then a buffer is fetched from view and writeAndFlush() is called. If it is still available in the queue, that means it must has credit because next buffer is always available for reading ahead. For this case it has no problem, because we only wait the previous writeAndFlush() done to trigger the next one if current queue is not empty. * But if it is not available (no credit) when writeAndFlush() is called, then the queue is empty in netty thread loop. Before the future of writeAndFlush() is done, the netty thread could still process the AddCredit message which would make the view become available again, then it would be added into queue to trigger the next writeAndFlush(). That means the previous buffer is not recycled but the next write is also triggered to cause the problem. The process might be like this : first writeAndFlush() pending -> addCredit(trigger second writeAndFlush) -> finish the first writeAndFlush() to recycle buffer. * I think it might be caused by the improvement of reusing flink buffer in netty stack from release-1.5. We could break writeAndFlush() into write and flush two processes. In the before when the write process finishes, the flink buffer is copied into netty internal ByteBuffer to be recycled then, so it would not cause problem even though the second writeAndFlush is triggered before first pending done. But now the write process would still reference the flink buffer in netty stack until the flush is done. I would try to mock this process in relevant unit tests to verify and I might submit this test tomorrow for understanding it easily. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. >
[jira] [Comment Edited] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880465#comment-16880465 ] zhijiang edited comment on FLINK-13100 at 7/8/19 3:28 PM: -- Yes, the previous SpilledSubpartitionView really has the deadlock issue based on two buffers. And now we have the similar issue as before, but throw the IOException instead. Your understanding of above two parts is right. But there exists another case: * When the view is created, it would be enqueued into the netty thread loop at first time, because it has both data and credit now as you mentioned. * Then a buffer is fetched from view and writeAndFlush() is called. If it is still available in the queue, that means it must has credit because next buffer is always available for reading ahead. For this case it has no problem, because we only wait the previous writeAndFlush() done to trigger the next one if current queue is not empty. * But if it is not available (no credit) when writeAndFlush() is called, then the queue is empty in netty thread loop. Before the future of writeAndFlush() is done, the netty thread could still process the AddCredit message which would make the view become available again, then it would be added into queue to trigger the next writeAndFlush(). That means the previous buffer is not recycled but the next write is also triggered to cause the problem. The process might be like this : first writeAndFlush() pending -> addCredit(trigger second writeAndFlush) -> finish the first writeAndFlush() to recycle buffer. * I think it might be caused by the improvement of reusing flink buffer in netty stack from release-1.5. We could break writeAndFlush() into write and flush two processes. In the before when the write process finishes, the flink buffer is copied into netty internal ByteBuffer to be recycled then, so it would not cause problem even though the second writeAndFlush is triggered before first pending done. But now the write process would still reference the flink buffer in netty stack until the flush is done. I would try to mock this process in relevant unit tests to verify and I might submit this test tomorrow for understanding it easily. was (Author: zjwang): Yes, the previous SpilledSubpartitionView really has the deadlock issue based on two buffers. And now we have the similar issue as before, but throw the IOException instead. Your understanding of above two parts is right. But there exists another case: * When the view is created, it would be enqueued into the netty thread loop at first time, because it has both data and credit now as you mentioned. * Then a buffer is fetched from view and writeAndFlush() is called. If it is still available in the queue, that means it must has credit because next buffer is always available for reading ahead. For this case it has no problem, because we only wait the previous writeAndFlush() done to trigger the next one. * But if it is not available (no credit) when writeAndFlush() is called, then the queue is empty in netty thread loop. Before the future of writeAndFlush() is done, the netty thread could still process the AddCredit message which would make the view become available again, then it would be added into queue to trigger the next writeAndFlush(). That means the previous buffer is not recycled but the next write is also triggered to cause the problem. The process might be like this : first writeAndFlush() pending -> addCredit(trigger second writeAndFlush) -> finish the first writeAndFlush() to recycle buffer. * I think it might be caused by the improvement of reusing flink buffer in netty stack from release-1.5. We could break writeAndFlush() into write and flush two processes. In the before when the write process finishes, the flink buffer is copied into netty internal ByteBuffer to be recycled then, so it would not cause problem even though the second writeAndFlush is triggered before first pending done. But now the write process would still reference the flink buffer in netty stack until the flush is done. I would try to mock this process in relevant unit tests to verify and I might submit this test tomorrow for understanding it easily. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an