[jira] [Updated] (FLINK-13754) Decouple OperatorChain with StreamStatusMaintainer

2019-08-20 Thread zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13754:
-
Summary: Decouple OperatorChain with StreamStatusMaintainer  (was: Decouple 
OperatorChain from StreamStatusMaintainer)

> Decouple OperatorChain with StreamStatusMaintainer
> --
>
> Key: FLINK-13754
> URL: https://issues.apache.org/jira/browse/FLINK-13754
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The current OperatorChain is heavy-weight to take some unrelated roles like 
> StreamStatusMaintainer. If other components only rely on the 
> StreamStatusMaintainer, we have to pass the whole OperatorChain. From the 
> design aspect of single function, we need to decouple 
> The solution is to refactor the creation of StreamStatusMaintainer and 
> RecordWriterOutput in StreamTask level, and then break the implementation 
> cycle dependency between them. The array of RecordWriters which has close 
> relationship with RecordWriterOutput is created in StreamTask, so it is 
> reasonable to create them together. The created StreamStatusMaintainer in 
> StreamTask can be directly referenced by subclasses like 
> OneInputStreamTask/TwoInputStreamTask.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13754) Decouple OperatorChain from StreamStatusMaintainer

2019-08-20 Thread zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13754:
-
Description: 
The current OperatorChain is heavy-weight to take some unrelated roles like 
StreamStatusMaintainer. If other components only rely on the 
StreamStatusMaintainer, we have to pass the whole OperatorChain. From the 
design aspect of single function, we need to decouple 

The solution is to refactor the creation of StreamStatusMaintainer and 
RecordWriterOutput in StreamTask level, and then break the implementation cycle 
dependency between them. The array of RecordWriters which has close 
relationship with RecordWriterOutput is created in StreamTask, so it is 
reasonable to create them together. The created StreamStatusMaintainer in 
StreamTask can be directly referenced by subclasses like 
OneInputStreamTask/TwoInputStreamTask.

  was:
There are two motivations for this refactoring:
 * It is the precondition for the following work of decoupling the dependency 
between two inputs status in ForwardingValveOutputHandler.
 * From the aspect of design rule, the current OperatorChain takes many 
unrelated roles like StreamStatusMaintainer to make it unmaintainable. The root 
reason for this case is from the cycle dependency between RecordWriterOutput 
(created by OperatorChain) and  StreamStatusMaintainer.

The solution is to refactor the creation of StreamStatusMaintainer and 
RecordWriterOutput in StreamTask level, and then break the implementation cycle 
dependency between them. The array of RecordWriters which has close 
relationship with RecordWriterOutput is created in StreamTask, so it is 
reasonable to create them together. The created StreamStatusMaintainer in 
StreamTask can be directly referenced by subclasses like 
OneInputStreamTask/TwoInputStreamTask.


> Decouple OperatorChain from StreamStatusMaintainer
> --
>
> Key: FLINK-13754
> URL: https://issues.apache.org/jira/browse/FLINK-13754
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The current OperatorChain is heavy-weight to take some unrelated roles like 
> StreamStatusMaintainer. If other components only rely on the 
> StreamStatusMaintainer, we have to pass the whole OperatorChain. From the 
> design aspect of single function, we need to decouple 
> The solution is to refactor the creation of StreamStatusMaintainer and 
> RecordWriterOutput in StreamTask level, and then break the implementation 
> cycle dependency between them. The array of RecordWriters which has close 
> relationship with RecordWriterOutput is created in StreamTask, so it is 
> reasonable to create them together. The created StreamStatusMaintainer in 
> StreamTask can be directly referenced by subclasses like 
> OneInputStreamTask/TwoInputStreamTask.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16911537#comment-16911537
 ] 

zhijiang commented on FLINK-13798:
--

Thanks for this notification [~StephanEwen]. Wish it would make the watermark 
logic easy to go in the new source operator. But now we have to solve this 
issue by refactoring TimestampsAndPeriodicWatermarksOperator. :(

> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * In the source WatermarkContext, it would toggle the status as active while 
> collecting/emitting and the status is checked in RecordWriterOutput. If the 
> watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
> would check the status before emitting watermark. 
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer, but it still relies on RecordWriterOutput to check the status before 
> emitting.
> So the check logic in RecordWriterOutput only makes sense for the last 
> scenario, and seems redundant for the first scenario.
> Even worse, this logic in RecordWriterOutput would bring cycle dependency 
> with StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to refactor this check logic in 
> upper layer instead of current low level RecordWriterOutput. The solution is 
> migrating the check logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator. And we could further remove the 
> logic of toggling active in WatermarkContext



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13798:
-
Description: 
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * In the source WatermarkContext, it would toggle the status as active while 
collecting/emitting and the status is checked in RecordWriterOutput. If the 
watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
would check the status before emitting watermark. 
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
timer, but it still relies on RecordWriterOutput to check the status before 
emitting.

So the check logic in RecordWriterOutput only makes sense for the last 
scenario, and seems redundant for the first scenario.

Even worse, this logic in RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to refactor this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is 
migrating the check logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator. And we could further remove the logic 
of toggling active in WatermarkContext

  was:
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * In the source WatermarkContext, it would toggle the status as active while 
collecting/emitting and the status is checked in RecordWriterOutput. If the 
watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
would check the status before emitting watermark. 
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
timer, but it still relies on RecordWriterOutput to check the status before 
emitting.

So the check logic in RecordWriterOutput only makes sense for the last 
scenario, and seems redundant for the first scenario.

Even worse, this logic in RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to refactor this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is 
migrating the check logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator.


> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * In the source WatermarkContext, it would toggle the status as active while 
> collecting/emitting and the status is checked in RecordWriterOutput. If the 
> watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
> would check the status before emitting watermark. 
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer, but it still relies on RecordWriterOutput to check the status before 
> emitting.
> So the check logic in RecordWriterOutput only makes sense for the last 
> scenario, and seems redundant for the first scenario.
> Even worse, this logic in RecordWriterOutput would bring cycle dependency 
> with StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to refactor this check logic in 
> upper layer instead of current low level RecordWriterOutput. The solution is 
> migrating the check logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator. And we could further remove the 
> logic of toggling active in 

[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13798:
-
Description: 
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * In the source WatermarkContext, it would toggle the status as active while 
collecting/emitting and the status is checked in RecordWriterOutput. If the 
watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
would check the status before emitting watermark. 
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
timer, but it still relies on RecordWriterOutput to check the status before 
emitting.

So the check logic in RecordWriterOutput only makes sense for the last 
scenario, and seems redundant for the first scenario.

Even worse, this logic in RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to refactor this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is 
migrating the check logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator.

  was:
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * In the source WatermarkContext, it would toggle the status as active while 
collecting/emitting and the status is checked in RecordWriterOutput. If the 
watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
would check the status before emitting watermark. 
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
timer, but it still relies on RecordWriterOutput to check the status before 
emitting.

So the check logic in RecordWriterOutput only makes sense for the last 
scenario, and seems redundant for the first scenario.

Even worse, this logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to refactor this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is 
migrating the check logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator.


> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * In the source WatermarkContext, it would toggle the status as active while 
> collecting/emitting and the status is checked in RecordWriterOutput. If the 
> watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
> would check the status before emitting watermark. 
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer, but it still relies on RecordWriterOutput to check the status before 
> emitting.
> So the check logic in RecordWriterOutput only makes sense for the last 
> scenario, and seems redundant for the first scenario.
> Even worse, this logic in RecordWriterOutput would bring cycle dependency 
> with StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to refactor this check logic in 
> upper layer instead of current low level RecordWriterOutput. The solution is 
> migrating the check logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13798:
-
Description: 
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * In the source WatermarkContext, it would toggle the status as active while 
collecting/emitting and the status is checked in RecordWriterOutput. If the 
watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
would check the status before emitting watermark. 
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
timer, but it still relies on RecordWriterOutput to check the status before 
emitting.

So the check logic in RecordWriterOutput only makes sense for the last 
scenario, and seems redundant for the first scenario.

Even worse, this logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to refactor this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is 
migrating the check logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator.

  was:
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * In the source WatermarkContext, it would toggle the status as active while 
collecting/emitting and the status is checked in the RecordWriterOutput. If the 
watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
would check the status before emitting watermark. 
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
timer. But it relies on RecordWriterOutput to check the status before emitting.

So we can see that the check logic in RecordWriterOutput only makes sense for 
the last scenario, and seems redundant for the first scenario.

Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to make this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is that we 
could migrate the checking logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator. And the toggling active action could 
be removed in AutomaticWatermarkContext while emitting records.


> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * In the source WatermarkContext, it would toggle the status as active while 
> collecting/emitting and the status is checked in RecordWriterOutput. If the 
> watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
> would check the status before emitting watermark. 
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer, but it still relies on RecordWriterOutput to check the status before 
> emitting.
> So the check logic in RecordWriterOutput only makes sense for the last 
> scenario, and seems redundant for the first scenario.
> Even worse, this logic is RecordWriterOutput would bring cycle dependency 
> with StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to refactor this check logic in 
> upper layer instead of current low level RecordWriterOutput. The solution is 
> migrating the check logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator.



--
This 

[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13798:
-
Description: 
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * In the source WatermarkContext, it would toggle the status as active while 
collecting/emitting and the status is checked in the RecordWriterOutput. If the 
watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
would check the status before emitting watermark. 
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
timer. But it relies on RecordWriterOutput to check the status before emitting.

So we can see that the check logic in RecordWriterOutput only makes sense for 
the last scenario, and seems redundant for the first scenario.

Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to make this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is that we 
could migrate the checking logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator. And the toggling active action could 
be removed in AutomaticWatermarkContext while emitting records.

  was:
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are three scenarios for the source case:
 * In the AutomaticWatermarkContext, it would toggle the status as active while 
collecting record and the status is checked in the RecordWriterOutput. If the 
watermark is triggered by timer, the timer task would check the status before 
emitting watermark. 
 * In the ManualWatermarkContext, the status is also checked in 
RecordWriterOutput before emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer 
in interval time. When it happens, it would call emitting watermark via output. 
Then the RecordWriterOutput would check the status before emitting.

So we can see that the checking logic in RecordWriterOutput only makes sense 
for the last two scenarios, and seems redundant for the first scenario.

Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to make this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is that we 
could migrate the checking logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator. And the toggling active action could 
be removed in AutomaticWatermarkContext while emitting records.


> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * In the source WatermarkContext, it would toggle the status as active while 
> collecting/emitting and the status is checked in the RecordWriterOutput. If 
> the watermark is triggered by timer for AutomaticWatermarkContext, the timer 
> task would check the status before emitting watermark. 
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer. But it relies on RecordWriterOutput to check the status before 
> emitting.
> So we can see that the check logic in RecordWriterOutput only makes sense for 
> the last scenario, and seems redundant for the first scenario.
> Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
> StreamStatusMaintainer, which is a blocker for the following work of 
> integrating 

[jira] [Issue Comment Deleted] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13798:
-
Comment: was deleted

(was: Thanks for the replies. [~StephanEwen]

Wish it would make logic easy understand as you mentioned in new source 
operator. But now we have to solve this issue during refactoring. :(

I just found another issue which is not mentioned in our precious discussion. 
If we remove this check from RecordWriterOutput, and add this logic in 
TimestampsAndPeriodicWatermarksOperator, for the  AutomaticWatermarkContext 
case it seems no problem. But for the above second case of  
ManualWatermarkContext, I am not quite sure whether it needs the check logic as 
now. Could you help double confirm it? [~tzulitai]

 )

> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are three scenarios for the source case:
>  * In the AutomaticWatermarkContext, it would toggle the status as active 
> while collecting record and the status is checked in the RecordWriterOutput. 
> If the watermark is triggered by timer, the timer task would check the status 
> before emitting watermark. 
>  * In the ManualWatermarkContext, the status is also checked in 
> RecordWriterOutput before emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by 
> timer in interval time. When it happens, it would call emitting watermark via 
> output. Then the RecordWriterOutput would check the status before emitting.
> So we can see that the checking logic in RecordWriterOutput only makes sense 
> for the last two scenarios, and seems redundant for the first scenario.
> Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
> StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to make this check logic in upper 
> layer instead of current low level RecordWriterOutput. The solution is that 
> we could migrate the checking logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator. And the toggling active action could 
> be removed in AutomaticWatermarkContext while emitting records.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16911455#comment-16911455
 ] 

zhijiang commented on FLINK-13798:
--

Thanks for the replies. [~StephanEwen]

Wish it would make logic easy understand as you mentioned in new source 
operator. But now we have to solve this issue during refactoring. :(

I just found another issue which is not mentioned in our precious discussion. 
If we remove this check from RecordWriterOutput, and add this logic in 
TimestampsAndPeriodicWatermarksOperator, for the  AutomaticWatermarkContext 
case it seems no problem. But for the above second case of  
ManualWatermarkContext, I am not quite sure whether it needs the check logic as 
now. Could you help double confirm it? [~tzulitai]

 

> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are three scenarios for the source case:
>  * In the AutomaticWatermarkContext, it would toggle the status as active 
> while collecting record and the status is checked in the RecordWriterOutput. 
> If the watermark is triggered by timer, the timer task would check the status 
> before emitting watermark. 
>  * In the ManualWatermarkContext, the status is also checked in 
> RecordWriterOutput before emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by 
> timer in interval time. When it happens, it would call emitting watermark via 
> output. Then the RecordWriterOutput would check the status before emitting.
> So we can see that the checking logic in RecordWriterOutput only makes sense 
> for the last two scenarios, and seems redundant for the first scenario.
> Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
> StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to make this check logic in upper 
> layer instead of current low level RecordWriterOutput. The solution is that 
> we could migrate the checking logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator. And the toggling active action could 
> be removed in AutomaticWatermarkContext while emitting records.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13798:
-
Description: 
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are three scenarios for the source case:
 * In the AutomaticWatermarkContext, it would toggle the status as active while 
collecting record and the status is checked in the RecordWriterOutput. If the 
watermark is triggered by timer, the timer task would check the status before 
emitting watermark. 
 * In the ManualWatermarkContext, the status is also checked in 
RecordWriterOutput before emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer 
in interval time. When it happens, it would call emitting watermark via output. 
Then the RecordWriterOutput would check the status before emitting.

So we can see that the checking logic in RecordWriterOutput only makes sense 
for the last two scenarios, and seems redundant for the first scenario.

Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to make this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is that we 
could migrate the checking logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator. And the toggling active action could 
be removed in AutomaticWatermarkContext while emitting records.

  was:
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are three scenarios for the source case:
 * In the AutomaticWatermarkContext, it would toggle the status as active while 
collecting record and the status is checked in the RecordWriterOutput. If the 
watermark is triggered by timer, the timer task would check the status before 
emitting watermark. 
 * In the ManualWatermarkContext, the status is also checked in 
RecordWriterOutput before emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer 
in interval time. When it happens, it would call emitting watermark via output. 
Then the RecordWriterOutput would check the status before emitting.

So we can see that the checking logic in RecordWriterOutput only makes sense 
for the last two scenarios, and seems redundant for the first scenario.

Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to make this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is that we 
could migrate the checking logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator. 


> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are three scenarios for the source case:
>  * In the AutomaticWatermarkContext, it would toggle the status as active 
> while collecting record and the status is checked in the RecordWriterOutput. 
> If the watermark is triggered by timer, the timer task would check the status 
> before emitting watermark. 
>  * In the ManualWatermarkContext, the status is also checked in 
> RecordWriterOutput before emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by 
> timer in interval time. When it happens, it would call emitting watermark via 
> output. Then the RecordWriterOutput would check the status before emitting.
> So we can see that the checking logic in RecordWriterOutput only makes sense 
> for the 

[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13798:
-
Description: 
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are three scenarios for the source case:
 * In the AutomaticWatermarkContext, it would toggle the status as active while 
collecting record and the status is checked in the RecordWriterOutput. If the 
watermark is triggered by timer, the timer task would check the status before 
emitting watermark. 
 * In the ManualWatermarkContext, the status is also checked in 
RecordWriterOutput before emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer 
in interval time. When it happens, it would call emitting watermark via output. 
Then the RecordWriterOutput would check the status before emitting.

So we can see that the checking logic in RecordWriterOutput only makes sense 
for the last two scenarios, and seems redundant for the first scenario.

Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to make this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is that we 
could migrate the checking logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator. 

  was:
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are three scenarios for the source case:
 * In the AutomaticWatermarkContext, it would toggle the status as active while 
collecting record and the status is checked in the RecordWriterOutput. If the 
watermark is triggered by timer, the timer task would check the status before 
calling emit watermark. 
 * In the ManualWatermarkContext, the status is also checked in 
RecordWriterOutput before emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer 
in interval time. When it happens, it would call emitting watermark via output. 
Then the RecordWriterOutput would check the status before emitting.

So we can see that the checking logic in RecordWriterOutput only makes sense 
for the last two scenarios, and seems redundant for the first scenario.

Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to make this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is that we 
could migrate the checking logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator. 


> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are three scenarios for the source case:
>  * In the AutomaticWatermarkContext, it would toggle the status as active 
> while collecting record and the status is checked in the RecordWriterOutput. 
> If the watermark is triggered by timer, the timer task would check the status 
> before emitting watermark. 
>  * In the ManualWatermarkContext, the status is also checked in 
> RecordWriterOutput before emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by 
> timer in interval time. When it happens, it would call emitting watermark via 
> output. Then the RecordWriterOutput would check the status before emitting.
> So we can see that the checking logic in RecordWriterOutput only makes sense 
> for the last two scenarios, and seems redundant for the first scenario.
> Even worse, the logic is 

[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13798:
-
Description: 
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are three scenarios for the source case:
 * In the AutomaticWatermarkContext, it would toggle the status as active while 
collecting record and the status is checked in the RecordWriterOutput. If the 
watermark is triggered by timer, the timer task would check the status before 
calling emit watermark. 
 * In the ManualWatermarkContext, the status is also checked in 
RecordWriterOutput before emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer 
in interval time. When it happens, it would call emitting watermark via output. 
Then the RecordWriterOutput would check the status before emitting.

So we can see that the checking logic in RecordWriterOutput only makes sense 
for the last two scenarios, and seems redundant for the first scenario.

Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to make this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is that we 
could migrate the checking logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator. 

  was:
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * Emit watermark via source context: In the specific WatermarkContext, it 
would toggle the  stream status as active before collecting/emitting 
records/watermarks. Then in the implementation of RecordWriterOutput, it would 
check the status always active before really emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer 
in interval time. When it happens, it would call output stack to emit 
watermark. Then the RecordWriterOutput could take the role of checking status 
before really emitting watermark.

So we can see that the checking status logic in RecordWriterOutput only works 
for above second scenario, and this logic seems redundant for the first 
scenario because WatermarkContext always toggle active status before emitting. 
Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

The solution is that we could migrate the checking logic from 
RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. 


> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are three scenarios for the source case:
>  * In the AutomaticWatermarkContext, it would toggle the status as active 
> while collecting record and the status is checked in the RecordWriterOutput. 
> If the watermark is triggered by timer, the timer task would check the status 
> before calling emit watermark. 
>  * In the ManualWatermarkContext, the status is also checked in 
> RecordWriterOutput before emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by 
> timer in interval time. When it happens, it would call emitting watermark via 
> output. Then the RecordWriterOutput would check the status before emitting.
> So we can see that the checking logic in RecordWriterOutput only makes sense 
> for the last two scenarios, and seems redundant for the first scenario.
> Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
> StreamStatusMaintainer, which is a blocker 

[jira] [Updated] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13798:
-
Description: 
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * Emit watermark via source context: In the specific WatermarkContext, it 
would toggle the  stream status as active before collecting/emitting 
records/watermarks. Then in the implementation of RecordWriterOutput, it would 
check the status always active before really emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer 
in interval time. When it happens, it would call output stack to emit 
watermark. Then the RecordWriterOutput could take the role of checking status 
before really emitting watermark.

So we can see that the checking status logic in RecordWriterOutput only works 
for above second scenario, and this logic seems redundant for the first 
scenario because WatermarkContext always toggle active status before emitting. 
Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

The solution is that we could migrate the checking logic from 
RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. 

  was:
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * Emit watermark via source context: In the specific WatermarkContext, it 
would toggle the  stream status as active before collecting/emitting 
records/watermarks. Then in the implementation of RecordWriterOutput, it would 
check the status always active before really emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer 
in interval time. When it happens, it would call output stack to emit 
watermark. Then the RecordWriterOutput could take the role of checking status 
before really emitting watermark.

So we can see that the checking status logic in RecordWriterOutput only works 
for above second scenario, and this logic seems redundant for the first 
scenario because WatermarkContext always toggle active status before emitting. 
Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

The solution is that we could migrate the checking logic from 
RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could 
also remove the toggle active logic  in existing WatermarkContext.


> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * Emit watermark via source context: In the specific WatermarkContext, it 
> would toggle the  stream status as active before collecting/emitting 
> records/watermarks. Then in the implementation of RecordWriterOutput, it 
> would check the status always active before really emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer in interval time. When it happens, it would call output stack to emit 
> watermark. Then the RecordWriterOutput could take the role of checking status 
> before really emitting watermark.
> So we can see that the checking status logic in RecordWriterOutput only works 
> for above second scenario, and this logic seems redundant for the first 
> scenario because WatermarkContext always toggle active status before 
> emitting. Even worse, the logic is RecordWriterOutput would bring cycle 
> dependency with StreamStatusMaintainer, which is a 

[jira] [Commented] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16911275#comment-16911275
 ] 

zhijiang commented on FLINK-13798:
--

Thanks for the offline discussion and suggestion.  [~pnowojski]  [~tzulitai]

Could you double check my above understanding is right?

> Refactor the process of checking stream status while emitting watermark in 
> source
> -
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * Emit watermark via source context: In the specific WatermarkContext, it 
> would toggle the  stream status as active before collecting/emitting 
> records/watermarks. Then in the implementation of RecordWriterOutput, it 
> would check the status always active before really emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer in interval time. When it happens, it would call output stack to emit 
> watermark. Then the RecordWriterOutput could take the role of checking status 
> before really emitting watermark.
> So we can see that the checking status logic in RecordWriterOutput only works 
> for above second scenario, and this logic seems redundant for the first 
> scenario because WatermarkContext always toggle active status before 
> emitting. Even worse, the logic is RecordWriterOutput would bring cycle 
> dependency with StreamStatusMaintainer, which is a blocker for the following 
> work of integrating source processing on runtime side.
> The solution is that we could migrate the checking logic from 
> RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could 
> also remove the toggle active logic  in existing WatermarkContext.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)
zhijiang created FLINK-13798:


 Summary: Refactor the process of checking stream status while 
emitting watermark in source
 Key: FLINK-13798
 URL: https://issues.apache.org/jira/browse/FLINK-13798
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * Emit watermark via source context: In the specific WatermarkContext, it 
would toggle the  stream status as active before collecting/emitting 
records/watermarks. Then in the implementation of RecordWriterOutput, it would 
check the status always active before really emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer 
in interval time. When it happens, it would call output stack to emit 
watermark. Then the RecordWriterOutput could take the role of checking status 
before really emitting watermark.

So we can see that the checking status logic in RecordWriterOutput only works 
for above second scenario, and this logic seems redundant for the first 
scenario because WatermarkContext always toggle active status before emitting. 
Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

The solution is that we could migrate the checking logic from 
RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could 
also remove the toggle active logic  in existing WatermarkContext.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13767) Migrate isFinished method from AvailabilityListener to AsyncDataInput

2019-08-18 Thread zhijiang (JIRA)
zhijiang created FLINK-13767:


 Summary: Migrate isFinished method from AvailabilityListener to 
AsyncDataInput
 Key: FLINK-13767
 URL: https://issues.apache.org/jira/browse/FLINK-13767
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network, Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


AvailabilityListener is both used in AsyncDataInput and StreamTaskInput. We 
already introduced InputStatus for StreamTaskInput#emitNext, and then 
InputStatus#END_OF_INPUT has the same semantic with 
AvailabilityListener#isFinished.

But for the case of AsyncDataInput which is mainly used by InputGate layer, the 
isFinished() method is still needed at the moment. So we migrate this method 
from AvailabilityListener to  AsyncDataInput, and refactor the 
StreamInputProcessor implementations by using InputStatus to judge finished 
state.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13766) Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext

2019-08-18 Thread zhijiang (JIRA)
zhijiang created FLINK-13766:


 Summary: Refactor the implementation of StreamInputProcessor based 
on StreamTaskInput#emitNext
 Key: FLINK-13766
 URL: https://issues.apache.org/jira/browse/FLINK-13766
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


The current processing in task input processor is based on the way of pollNext. 
In order to unify the processing way of new source operator, we introduce the 
new StreamTaskInput#emitNext(Output) instead of current pollNext. Then we need 
to adjust the existing implementations of 
StreamOneInputProcessor/StreamTwoInputSelectableProcessor based on the new emit 
way.

To do so, we could integrate all the task inputs from network/source in a 
unified processing on runtime side.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13765) Introduce the InputSelectionHandler for selecting next input in StreamTwoInputSelectableProcessor

2019-08-18 Thread zhijiang (JIRA)
zhijiang created FLINK-13765:


 Summary: Introduce the InputSelectionHandler for selecting next 
input in StreamTwoInputSelectableProcessor
 Key: FLINK-13765
 URL: https://issues.apache.org/jira/browse/FLINK-13765
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


In StreamTwoInputSelectableProcessor there are three fields \{InputSelectable, 
InputSelection, availableInputsMask} to be used together for the function of 
selecting next available input index. It would bring two problems:
 * From design aspect, these fields should be abstracted into a separate 
component and passed into StreamTwoInputSelectableProcessor.
 * inputSelector.nextSelection() is called while processing elements in  
StreamTwoInputSelectableProcessor, so it is the blocker for integrating task 
input/output for both StreamOneInputProcessor/StreamTwoInputSelectableProcessor 
later.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13764) Pass the counter of numRecordsIn into the constructors of StreamOne/TwoInputProcessor

2019-08-18 Thread zhijiang (JIRA)
zhijiang created FLINK-13764:


 Summary: Pass the counter of numRecordsIn into the constructors of 
StreamOne/TwoInputProcessor
 Key: FLINK-13764
 URL: https://issues.apache.org/jira/browse/FLINK-13764
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


Currently the counter of numRecordsIn is setup while processing input in 
processor. In order to integrate the processing logic based on 
StreamTaskInput#emitNext(Output) later, we need to pass the counter into output 
functions then.

So this refactoring is the precondition of following works, and it could get 
additional benefits. One is that we could make the counter as final field in 
StreamInputProcessor. Another is that we could reuse the counter setup logic 
for both StreamOne/TwoInputProcessors.

There should be no side effects if we make the counter setup a bit earlier than 
the previous way.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13764) Pass the counter of numRecordsIn into the constructors of StreamOne/TwoInputProcessor

2019-08-18 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13764:
-
Component/s: Runtime / Metrics

> Pass the counter of numRecordsIn into the constructors of 
> StreamOne/TwoInputProcessor
> -
>
> Key: FLINK-13764
> URL: https://issues.apache.org/jira/browse/FLINK-13764
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> Currently the counter of numRecordsIn is setup while processing input in 
> processor. In order to integrate the processing logic based on 
> StreamTaskInput#emitNext(Output) later, we need to pass the counter into 
> output functions then.
> So this refactoring is the precondition of following works, and it could get 
> additional benefits. One is that we could make the counter as final field in 
> StreamInputProcessor. Another is that we could reuse the counter setup logic 
> for both StreamOne/TwoInputProcessors.
> There should be no side effects if we make the counter setup a bit earlier 
> than the previous way.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13762) Integrate the implementation of ForwardingValveOutputHandler for StreamOne/TwoInputProcessor

2019-08-18 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13762:
-
Description: 
Currently StreamOneInputProcessor and StreamTwoInputSelectableProcessor have 
separate implementations of ForwardingValueOutputHandler. Especially for the 
implementation in  StreamTwoInputSelectableProcessor, it couples the internal 
input index logic which would be a blocker for the following unification of 
StreamTaskInput/Output.

We could realize a unified ForwardingValueOutputHandler for both 
StreamOneInput/ TwoInputSelectableProcessor, and it does not consider different 
inputs to always consume StreamStatus. Then we refactor the implementation of 
StreamStatusMaintainer for judging the status of different inputs internally 
before really emitting the StreamStatus.

  was:
Currently StreamOneInputProcessor and StreamTwoInputSelectableProcessor have 
separate implementations of ForwardingValveOutputHandler. Especially for the 
implementation in  StreamTwoInputSelectableProcessor, it couples the internal 
input index logic which would be a blocker for the following unification of 
StreamTaskInput/Output.

We could realize a unified ForwardingValveOutputHandler for both 
StreamOneInput/ TwoInputSelectableProcessor, and it does not consider different 
inputs to always consume StreamStatus. Then we refactor the implementation of 
StreamStatusMaintainer for judging the status of different inputs internally 
before really emitting the StreamStatus.


> Integrate the implementation of ForwardingValveOutputHandler for 
> StreamOne/TwoInputProcessor
> 
>
> Key: FLINK-13762
> URL: https://issues.apache.org/jira/browse/FLINK-13762
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> Currently StreamOneInputProcessor and StreamTwoInputSelectableProcessor have 
> separate implementations of ForwardingValueOutputHandler. Especially for the 
> implementation in  StreamTwoInputSelectableProcessor, it couples the internal 
> input index logic which would be a blocker for the following unification of 
> StreamTaskInput/Output.
> We could realize a unified ForwardingValueOutputHandler for both 
> StreamOneInput/ TwoInputSelectableProcessor, and it does not consider 
> different inputs to always consume StreamStatus. Then we refactor the 
> implementation of StreamStatusMaintainer for judging the status of different 
> inputs internally before really emitting the StreamStatus.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13762) Implement a unified StatusWatermarkOutputHandler for StreamOne/TwoInputProcessor

2019-08-18 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13762:
-
Summary: Implement a unified StatusWatermarkOutputHandler for 
StreamOne/TwoInputProcessor  (was: Integrate the implementation of 
ForwardingValveOutputHandler for StreamOne/TwoInputProcessor)

> Implement a unified StatusWatermarkOutputHandler for 
> StreamOne/TwoInputProcessor
> 
>
> Key: FLINK-13762
> URL: https://issues.apache.org/jira/browse/FLINK-13762
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> Currently StreamOneInputProcessor and StreamTwoInputSelectableProcessor have 
> separate implementations of ForwardingValueOutputHandler. Especially for the 
> implementation in  StreamTwoInputSelectableProcessor, it couples the internal 
> input index logic which would be a blocker for the following unification of 
> StreamTaskInput/Output.
> We could realize a unified ForwardingValueOutputHandler for both 
> StreamOneInput/ TwoInputSelectableProcessor, and it does not consider 
> different inputs to always consume StreamStatus. Then we refactor the 
> implementation of StreamStatusMaintainer for judging the status of different 
> inputs internally before really emitting the StreamStatus.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13762) Integrate the implementation of ForwardingValveOutputHandler for StreamOne/TwoInputProcessor

2019-08-18 Thread zhijiang (JIRA)
zhijiang created FLINK-13762:


 Summary: Integrate the implementation of 
ForwardingValveOutputHandler for StreamOne/TwoInputProcessor
 Key: FLINK-13762
 URL: https://issues.apache.org/jira/browse/FLINK-13762
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


Currently StreamOneInputProcessor and StreamTwoInputSelectableProcessor have 
separate implementations of ForwardingValveOutputHandler. Especially for the 
implementation in  StreamTwoInputSelectableProcessor, it couples the internal 
input index logic which would be a blocker for the following unification of 
StreamTaskInput/Output.

We could realize a unified ForwardingValveOutputHandler for both 
StreamOneInput/ TwoInputSelectableProcessor, and it does not consider different 
inputs to always consume StreamStatus. Then we refactor the implementation of 
StreamStatusMaintainer for judging the status of different inputs internally 
before really emitting the StreamStatus.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13754) Decouple OperatorChain from StreamStatusMaintainer

2019-08-16 Thread zhijiang (JIRA)
zhijiang created FLINK-13754:


 Summary: Decouple OperatorChain from StreamStatusMaintainer
 Key: FLINK-13754
 URL: https://issues.apache.org/jira/browse/FLINK-13754
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


There are two motivations for this refactoring:
 * It is the precondition for the following work of decoupling the dependency 
between two inputs status in ForwardingValveOutputHandler.
 * From the aspect of design rule, the current OperatorChain takes many 
unrelated roles like StreamStatusMaintainer to make it unmaintainable. The root 
reason for this case is from the cycle dependency between RecordWriterOutput 
(created by OperatorChain) and  StreamStatusMaintainer.

The solution is to refactor the creation of StreamStatusMaintainer and 
RecordWriterOutput in StreamTask level, and then break the implementation cycle 
dependency between them. The array of RecordWriters which has close 
relationship with RecordWriterOutput is created in StreamTask, so it is 
reasonable to create them together. The created StreamStatusMaintainer in 
StreamTask can be directly referenced by subclasses like 
OneInputStreamTask/TwoInputStreamTask.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13753) Integrate new Source Operator with Mailbox Model in StreamTask

2019-08-16 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13753:
-
Description: 
This is the umbrella issue for integrating new source operator with mailbox 
model in StreamTask.

The motivation is based on 
[FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
 which proposes to refactor the whole source API and the integration of 
task-level actions (including checkpoint, timer, async operator) with unified 
mailbox model on runtime side.
 * The benefits are simple unified processing logics because only one single 
thread handles all the actions without concurrent issue, and further getting 
rid of lock dependency which causes unfair lock concern in checkpoint process.
 * We still need to support the current legacy source in some releases which 
would probably be used for a while, especially for the scenario of performance 
concern.

The design doc is 
[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]

  was:
This is the umbrella issue for integrating new source operator with mailbox 
model in StreamTask.

The motivation is based on 
[FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
 which proposes to refactor the whole source API and the integration of 
task-level actions (including checkpoint, timer, async operator) with unified 
mailbox model on runtime side.
 * The benefits are simple unified processing logics because only one single 
thread handles all the actions without concurrent issue, and further getting 
rid of lock dependency which causes unfair lock concern in checkpoint process.
 * We still need to support the current legacy source in some releases which 
would probably be used for a while, especially for the scenario of performance 
concern.

The design doc is 
[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]


> Integrate new Source Operator with Mailbox Model in StreamTask
> --
>
> Key: FLINK-13753
> URL: https://issues.apache.org/jira/browse/FLINK-13753
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> This is the umbrella issue for integrating new source operator with mailbox 
> model in StreamTask.
> The motivation is based on 
> [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
>  which proposes to refactor the whole source API and the integration of 
> task-level actions (including checkpoint, timer, async operator) with unified 
> mailbox model on runtime side.
>  * The benefits are simple unified processing logics because only one single 
> thread handles all the actions without concurrent issue, and further getting 
> rid of lock dependency which causes unfair lock concern in checkpoint process.
>  * We still need to support the current legacy source in some releases which 
> would probably be used for a while, especially for the scenario of 
> performance concern.
> The design doc is 
> [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13753) Integrate new Source Operator with Mailbox Model in StreamTask

2019-08-16 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13753:
-
Description: 
This is the umbrella issue for integrating new source operator with mailbox 
model in StreamTask.

The motivation is based on 
[FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
 which proposes to refactor the whole source API and the integration of 
task-level actions (including checkpoint, timer, async operator) with unified 
mailbox model on runtime side.
 * The benefits are simple unified processing logics because only one single 
thread handles all the actions without concurrent issue, and further getting 
rid of lock dependency which causes unfair lock concern in checkpoint process.
 * We still need to support the current legacy source in some releases which 
would probably be used for a while, especially for the scenario of performance 
concern.

The design doc is 
[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]

  was:
This is the umbrella issue for integrating new source operator with mailbox 
model in StreamTask.

The motivation is based on 
[FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
 which proposes to refactor the whole source API and the integration of 
task-level actions (including checkpoint, timer, async operator) with unified 
[mailbox 
model|[https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g]]
 on runtime side.
 * The benefits are simple unified processing logics because only one single 
thread handles all the actions without concurrent issue, and further getting 
rid of lock dependency which causes unfair lock concern in checkpoint process.
 * We still need to support the current legacy source in some releases which 
would probably be used for a while, especially for the scenario of performance 
concern.

The design doc is 
[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]]


> Integrate new Source Operator with Mailbox Model in StreamTask
> --
>
> Key: FLINK-13753
> URL: https://issues.apache.org/jira/browse/FLINK-13753
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> This is the umbrella issue for integrating new source operator with mailbox 
> model in StreamTask.
> The motivation is based on 
> [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
>  which proposes to refactor the whole source API and the integration of 
> task-level actions (including checkpoint, timer, async operator) with unified 
> mailbox model on runtime side.
>  * The benefits are simple unified processing logics because only one single 
> thread handles all the actions without concurrent issue, and further getting 
> rid of lock dependency which causes unfair lock concern in checkpoint process.
>  * We still need to support the current legacy source in some releases which 
> would probably be used for a while, especially for the scenario of 
> performance concern.
> The design doc is 
> [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13753) Integrate new Source Operator with Mailbox Model in StreamTask

2019-08-16 Thread zhijiang (JIRA)
zhijiang created FLINK-13753:


 Summary: Integrate new Source Operator with Mailbox Model in 
StreamTask
 Key: FLINK-13753
 URL: https://issues.apache.org/jira/browse/FLINK-13753
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


This is the umbrella issue for integrating new source operator with mailbox 
model in StreamTask.

The motivation is based on 
[FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
 which proposes to refactor the whole source API and the integration of 
task-level actions (including checkpoint, timer, async operator) with unified 
[mailbox model| 
[https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g]]
 on runtime side.
 * The benefits are simple unified processing logics because only one single 
thread handles all the actions without concurrent issue, and further getting 
rid of lock dependency which causes unfair lock concern in checkpoint process.
 * We still need to support the current legacy source in some releases which 
would probably be used for a while, especially for the scenario of performance 
concern.

The design doc is 
[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13753) Integrate new Source Operator with Mailbox Model in StreamTask

2019-08-16 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13753:
-
Description: 
This is the umbrella issue for integrating new source operator with mailbox 
model in StreamTask.

The motivation is based on 
[FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
 which proposes to refactor the whole source API and the integration of 
task-level actions (including checkpoint, timer, async operator) with unified 
[mailbox 
model|[https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g]]
 on runtime side.
 * The benefits are simple unified processing logics because only one single 
thread handles all the actions without concurrent issue, and further getting 
rid of lock dependency which causes unfair lock concern in checkpoint process.
 * We still need to support the current legacy source in some releases which 
would probably be used for a while, especially for the scenario of performance 
concern.

The design doc is 
[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]]

  was:
This is the umbrella issue for integrating new source operator with mailbox 
model in StreamTask.

The motivation is based on 
[FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
 which proposes to refactor the whole source API and the integration of 
task-level actions (including checkpoint, timer, async operator) with unified 
[mailbox model| 
[https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g]]
 on runtime side.
 * The benefits are simple unified processing logics because only one single 
thread handles all the actions without concurrent issue, and further getting 
rid of lock dependency which causes unfair lock concern in checkpoint process.
 * We still need to support the current legacy source in some releases which 
would probably be used for a while, especially for the scenario of performance 
concern.

The design doc is 
[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]]


> Integrate new Source Operator with Mailbox Model in StreamTask
> --
>
> Key: FLINK-13753
> URL: https://issues.apache.org/jira/browse/FLINK-13753
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> This is the umbrella issue for integrating new source operator with mailbox 
> model in StreamTask.
> The motivation is based on 
> [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
>  which proposes to refactor the whole source API and the integration of 
> task-level actions (including checkpoint, timer, async operator) with unified 
> [mailbox 
> model|[https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g]]
>  on runtime side.
>  * The benefits are simple unified processing logics because only one single 
> thread handles all the actions without concurrent issue, and further getting 
> rid of lock dependency which causes unfair lock concern in checkpoint process.
>  * We still need to support the current legacy source in some releases which 
> would probably be used for a while, especially for the scenario of 
> performance concern.
> The design doc is 
> [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (FLINK-12630) Refactor abstract InputGate to general interface

2019-08-07 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang closed FLINK-12630.

Resolution: Won't Fix

> Refactor abstract InputGate to general interface
> 
>
> Key: FLINK-12630
> URL: https://issues.apache.org/jira/browse/FLINK-12630
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> `InputGate` is currently defined as an abstract class which extracts the 
> common codes for checking data availability for subclasses `SingleInputGate` 
> and `UnionInputGate`, but it might bring limits for further extending 
> `InputGate` implementations in shuffle service architecture.
> `SingleInputGate` is created from shuffle service so it belongs to the scope 
> of shuffle service, while `UnionInputGate` is a wrapper of some 
> `SingleInputGate`s so it should be in the task/processor stack.
> In order to make a new `InputGate` implementation from another new shuffle 
> service could be directly pitched in, we should define a more clean 
> `InputGate` interface to decouple the implementation of checking data 
> available logic. In detail we could define the `isAvailable` method in 
> `InputGate` interface and extract the current implementation as a separate 
> class `FutureBasedAvailability` which could still be extent and reused for 
> both `SingleInputGate` and `UnionInputGate`.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (FLINK-12576) inputQueueLength metric does not work for LocalInputChannels

2019-08-05 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang closed FLINK-12576.

Resolution: Fixed

Fix in master : 302fc1d5de38bb6db99ddc45470efcc9ebd782bc

Fix in release-1.9 : 2a6fb9af7bca5a55d1dc9c55b779eea38b43e1a2

> inputQueueLength metric does not work for LocalInputChannels
> 
>
> Key: FLINK-12576
> URL: https://issues.apache.org/jira/browse/FLINK-12576
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / Network
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Piotr Nowojski
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently {{inputQueueLength}} ignores LocalInputChannels 
> ({{SingleInputGate#getNumberOfQueuedBuffers}}). This can can cause mistakes 
> when looking for causes of back pressure (If task is back pressuring whole 
> Flink job, but there is a data skew and only local input channels are being 
> used).



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13435) Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per partition type

2019-08-02 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16898697#comment-16898697
 ] 

zhijiang commented on FLINK-13435:
--

Thanks for helping merge into 1.9, I forgot that. [~Zentol]

> Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per 
> partition type
> 
>
> Key: FLINK-13435
> URL: https://issues.apache.org/jira/browse/FLINK-13435
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In a long term we do not need auto-release semantics for blocking 
> (persistent) partition. We expect them always to be released externally by JM 
> and assume they can be consumed multiple times.
> The pipelined partitions have always only one consumer and one consumption 
> attempt. Afterwards they can be always released automatically.
> ShuffleDescriptor.ReleaseType was introduced to make release semantics more 
> flexible but it is not needed in a long term.
> FORCE_PARTITION_RELEASE_ON_CONSUMPTION was introduced as a safety net to be 
> able to fallback to 1.8 behaviour without the partition tracker and JM taking 
> care about blocking partition release. We can make this option specific for 
> NettyShuffleEnvironment which was the only existing shuffle service before. 
> If it is activated then the blocking partition is also auto-released on a 
> consumption attempt as it was before. The fine-grained recovery will just not 
> find the partition after the job restart in this case and will restart the 
> producer.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty

2019-08-01 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897998#comment-16897998
 ] 

zhijiang edited comment on FLINK-13493 at 8/1/19 12:10 PM:
---

Thanks for the clarification [~azagrebin]!

The set structure of readers in `BoundedBlockingSubpartition` make me think of 
this concern. I agree it should be no problem now because of two things:
 * We do not have multiple readers for one subpartition concurrently. The 
current semantic of consuming multiple times for one subpartition is only 
considering the failover case, that means next consumption attempt only comes 
after the previous attempt canceled.
 * The current blocking subpartition release would check whether reader set is 
empty. As I proposed in FLINK-13478, it might no need to check that if the 
ResultPartition already decides to release the whole partition, and this 
decision might also come from JM RPC.

So the whole logic seems a bit weird that the subpartition notifies result 
partition of consumed even though there are other unconsumed readers, and then 
if the result partition triggers the whole release, that subpartition with 
unconsumed readers actually would not be released.


was (Author: zjwang):
Thanks for the clarification [~azagrebin]!

The set structure of readers in `BoundedBlockingSubpartition` make me think of 
this concern. I agree it should be no problem now because of two things:
 * We do not have multiple readers for one subpartition concurrently. The 
current semantic of consuming multiple times for one subpartition is only 
considering the failover case, that means next consumption attempt only comes 
after the previous attempt canceled.
 * The current blocking subpartition release would check whether reader set is 
empty. As I proposed in FLINK-13478, it might no need to check that if the 
ResultPartition already decides to release the whole partition, and this 
decision might also come from JM RPC.

So the whole logic seems a bit weird that the subpartition would notify result 
partition consumed even though there are other unconsumed readers, and then if 
the result partition triggers the whole release, the subpartition with 
unconsumed readers actually would not be released.

> BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the 
> readers are empty
> ---
>
> Key: FLINK-13493
> URL: https://issues.apache.org/jira/browse/FLINK-13493
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In previous implementation, it would always notify the {{ResultPartition}} of 
> consumed subpartition if any reader view is released. Based on the 
> reference-counter release strategy it might cause problems if one blocking 
> subpartition has multiple readers. That means the whole result partition 
> might be released but there are still alive readers in some subpartitions.
> Although the default release strategy for blocking partition is based on 
> JM/scheduler notification atm. But if we switch the option to based on 
> consumption notification it would cause problems. And from the subpartition 
> side it should has the right behavior no matter what is the specific release 
> strategy in upper layer.
> In order to fix this bug, the {{BoundedBlockingSubpartition}} would only 
> notify {{onConsumedSubpartition}} when all the readers are empty.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty

2019-08-01 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897998#comment-16897998
 ] 

zhijiang edited comment on FLINK-13493 at 8/1/19 12:09 PM:
---

Thanks for the clarification [~azagrebin]!

The set structure of readers in `BoundedBlockingSubpartition` make me think of 
this concern. I agree it should be no problem now because of two things:
 * We do not have multiple readers for one subpartition concurrently. The 
current semantic of consuming multiple times for one subpartition is only 
considering the failover case, that means next consumption attempt only comes 
after the previous attempt canceled.
 * The current blocking subpartition release would check whether reader set is 
empty. As I proposed in FLINK-13478, it might no need to check that if the 
ResultPartition already decides to release the whole partition, and this 
decision might also come from JM RPC.

So the whole logic seems a bit weird that the subpartition would notify result 
partition consumed even though there are other unconsumed readers, and then if 
the result partition triggers the whole release, the subpartition with 
unconsumed readers actually would not be released.


was (Author: zjwang):
Thanks for the clarification [~azagrebin]!

The set structure of readers in `BoundedBlockingSubpartition` make me think of 
this concern. I agree it should be no problem now because of two things:
 * We do not have multiple readers for one subpartition concurrently. The 
current semantic of consuming multiple times for one subpartition is only 
considering the failover case, that means next consumption attempt only comes 
after the previous attempt canceled.
 * The current blocking subpartition release would check whether reader set is 
empty. As I proposed in FLINK-13478, it might no need to check that if the 
ResultPartition already decides to release the whole partition, and this 
decision might also come from JM RPC.

So the whole logic seems a bit wired that the subpartition would notify result 
partition consumed even though there are other unconsumed readers, and then if 
the result partition triggers the whole release, the subpartition with 
unconsumed readers actually would not be released.

> BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the 
> readers are empty
> ---
>
> Key: FLINK-13493
> URL: https://issues.apache.org/jira/browse/FLINK-13493
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In previous implementation, it would always notify the {{ResultPartition}} of 
> consumed subpartition if any reader view is released. Based on the 
> reference-counter release strategy it might cause problems if one blocking 
> subpartition has multiple readers. That means the whole result partition 
> might be released but there are still alive readers in some subpartitions.
> Although the default release strategy for blocking partition is based on 
> JM/scheduler notification atm. But if we switch the option to based on 
> consumption notification it would cause problems. And from the subpartition 
> side it should has the right behavior no matter what is the specific release 
> strategy in upper layer.
> In order to fix this bug, the {{BoundedBlockingSubpartition}} would only 
> notify {{onConsumedSubpartition}} when all the readers are empty.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty

2019-08-01 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897998#comment-16897998
 ] 

zhijiang edited comment on FLINK-13493 at 8/1/19 12:07 PM:
---

Thanks for the clarification [~azagrebin]!

The set structure of readers in `BoundedBlockingSubpartition` make me think of 
this concern. I agree it should be no problem now because of two things:
 * We do not have multiple readers for one subpartition concurrently. The 
current semantic of consuming multiple times for one subpartition is only 
considering the failover case, that means next consumption attempt only comes 
after the previous attempt canceled.
 * The current blocking subpartition release would check whether reader set is 
empty. As I proposed in FLINK-13478, it might no need to check that if the 
ResultPartition already decides to release the whole partition, and this 
decision might also come from JM RPC.

So the whole logic seems a bit wired that the subpartition would notify result 
partition consumed even though there are other unconsumed readers, and then if 
the result partition triggers the whole release, the subpartition with 
unconsumed readers actually would not be released.


was (Author: zjwang):
Thanks for the clarification [~azagrebin]!

I agree it should be no problem now because we do not have multiple readers for 
one subpartition concurrently. The current semantic of consuming multiple times 
for one subpartition is only considering the failover case, that means next 
consumption attempt only comes after the previous attempt canceled. The set 
structure of readers in `BoundedBlockingSubpartition` make me think of this 
unnecessary concern.

> BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the 
> readers are empty
> ---
>
> Key: FLINK-13493
> URL: https://issues.apache.org/jira/browse/FLINK-13493
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In previous implementation, it would always notify the {{ResultPartition}} of 
> consumed subpartition if any reader view is released. Based on the 
> reference-counter release strategy it might cause problems if one blocking 
> subpartition has multiple readers. That means the whole result partition 
> might be released but there are still alive readers in some subpartitions.
> Although the default release strategy for blocking partition is based on 
> JM/scheduler notification atm. But if we switch the option to based on 
> consumption notification it would cause problems. And from the subpartition 
> side it should has the right behavior no matter what is the specific release 
> strategy in upper layer.
> In order to fix this bug, the {{BoundedBlockingSubpartition}} would only 
> notify {{onConsumedSubpartition}} when all the readers are empty.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (FLINK-13435) Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per partition type

2019-08-01 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang closed FLINK-13435.

Resolution: Fixed

Fixed in master : 6ce4e2b7ec958570068838491bd8f56f880c

> Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per 
> partition type
> 
>
> Key: FLINK-13435
> URL: https://issues.apache.org/jira/browse/FLINK-13435
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In a long term we do not need auto-release semantics for blocking 
> (persistent) partition. We expect them always to be released externally by JM 
> and assume they can be consumed multiple times.
> The pipelined partitions have always only one consumer and one consumption 
> attempt. Afterwards they can be always released automatically.
> ShuffleDescriptor.ReleaseType was introduced to make release semantics more 
> flexible but it is not needed in a long term.
> FORCE_PARTITION_RELEASE_ON_CONSUMPTION was introduced as a safety net to be 
> able to fallback to 1.8 behaviour without the partition tracker and JM taking 
> care about blocking partition release. We can make this option specific for 
> NettyShuffleEnvironment which was the only existing shuffle service before. 
> If it is activated then the blocking partition is also auto-released on a 
> consumption attempt as it was before. The fine-grained recovery will just not 
> find the partition after the job restart in this case and will restart the 
> producer.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty

2019-08-01 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897998#comment-16897998
 ] 

zhijiang commented on FLINK-13493:
--

Thanks for the clarification [~azagrebin]!

I agree it should be no problem now because we do not have multiple readers for 
one subpartition concurrently. The current semantic of consuming multiple times 
for one subpartition is only considering the failover case, that means next 
consumption attempt only comes after the previous attempt canceled. The set 
structure of readers in `BoundedBlockingSubpartition` make me think of this 
unnecessary concern.

> BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the 
> readers are empty
> ---
>
> Key: FLINK-13493
> URL: https://issues.apache.org/jira/browse/FLINK-13493
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In previous implementation, it would always notify the {{ResultPartition}} of 
> consumed subpartition if any reader view is released. Based on the 
> reference-counter release strategy it might cause problems if one blocking 
> subpartition has multiple readers. That means the whole result partition 
> might be released but there are still alive readers in some subpartitions.
> Although the default release strategy for blocking partition is based on 
> JM/scheduler notification atm. But if we switch the option to based on 
> consumption notification it would cause problems. And from the subpartition 
> side it should has the right behavior no matter what is the specific release 
> strategy in upper layer.
> In order to fix this bug, the {{BoundedBlockingSubpartition}} would only 
> notify {{onConsumedSubpartition}} when all the readers are empty.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-10672) Task stuck while writing output to flink

2019-07-31 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897491#comment-16897491
 ] 

zhijiang commented on FLINK-10672:
--

Could you also test the release-1.8.1 or current master branch whether it could 
happen or not. We ever fixed some deadlock issues for both pipelined and 
blocking partitions, which could also cause the similar problems as you 
mentioned.

> Task stuck while writing output to flink
> 
>
> Key: FLINK-10672
> URL: https://issues.apache.org/jira/browse/FLINK-10672
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.5.4
> Environment: OS: Debuan rodente 4.17
> Flink version: 1.5.4
> ||Key||Value||
> |jobmanager.heap.mb|1024|
> |jobmanager.rpc.address|localhost|
> |jobmanager.rpc.port|6123|
> |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter|
> |metrics.reporter.jmx.port|9250-9260|
> |metrics.reporters|jmx|
> |parallelism.default|1|
> |rest.port|8081|
> |taskmanager.heap.mb|1024|
> |taskmanager.numberOfTaskSlots|1|
> |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26|
>  
> h1. Overview
> ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap 
> Size||Flink Managed Memory||
> |43501|1|0|12|62.9 GB|922 MB|642 MB|
> h1. Memory
> h2. JVM (Heap/Non-Heap)
> ||Type||Committed||Used||Maximum||
> |Heap|922 MB|575 MB|922 MB|
> |Non-Heap|68.8 MB|64.3 MB|-1 B|
> |Total|991 MB|639 MB|922 MB|
> h2. Outside JVM
> ||Type||Count||Used||Capacity||
> |Direct|3,292|105 MB|105 MB|
> |Mapped|0|0 B|0 B|
> h1. Network
> h2. Memory Segments
> ||Type||Count||
> |Available|3,194|
> |Total|3,278|
> h1. Garbage Collection
> ||Collector||Count||Time||
> |G1_Young_Generation|13|336|
> |G1_Old_Generation|1|21|
>Reporter: Ankur Goenka
>Priority: Major
>  Labels: beam
> Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, 
> jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, 
> jstack_66985.log
>
>
> I am running a fairly complex pipleline with 200+ task.
> The pipeline works fine with small data (order of 10kb input) but gets stuck 
> with a slightly larger data (300kb input).
>  
> The task gets stuck while writing the output toFlink, more specifically it 
> gets stuck while requesting memory segment in local buffer pool. The Task 
> manager UI shows that it has enough memory and memory segments to work with.
> The relevant stack trace is 
> {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 
> tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]
>  java.lang.Thread.State: TIMED_WAITING (on object monitor)
>  at (C/C++) 0x7fef201c7dae (Unknown Source)
>  at (C/C++) 0x7fef1f2aea07 (Unknown Source)
>  at (C/C++) 0x7fef1f241cd3 (Unknown Source)
>  at java.lang.Object.wait(Native Method)
>  - waiting on <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
>  - locked <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>  at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
>  at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
>  - locked <0xf6a60bd0> (a java.lang.Object)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
>  at 
> 

[jira] [Commented] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition

2019-07-31 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897482#comment-16897482
 ] 

zhijiang commented on FLINK-13478:
--

[~azagrebin], thanks for the consideration.

I think it would no implicit consequences for the consumer side. Just like the 
current way of `PipelinedSubpartition#release`, it only release the view and 
would not trigger the connection close. The connection close is normally 
initiated by consumer/client side.

> Decouple two different release strategies in BoundedBlockingSubpartition
> 
>
> Key: FLINK-13478
> URL: https://issues.apache.org/jira/browse/FLINK-13478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> We have two basic release strategies atm. One is based on consumption via 
> network notification from consumer. The other is based on notification via 
> RPC from JM/scheduler.
> But in current implementation of {{BoundedBlockingSubpartition}}, these two 
> ways are a bit coupled with each other. If the JM decides to release 
> partition and send the release RPC call, it has to wait all the reader views 
> really released before finally closing the data file. So the JM-RPC-based 
> release strategy still relies on the consumption confirmation via network to 
> some extent.
> In order to make these two release strategies independent, if the release 
> call is from JM/scheduler RPC, we could immediately release all the view 
> readers and then close the data file as well. If the release is based on 
> consumption confirmation, after all the view readers for one subpartition are 
> released, the subpartition could further notify the parent 
> {{ResultPartition}} which decides whether to release the whole partition or 
> not.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (FLINK-13489) Heavy deployment end-to-end test fails on Travis with TM heartbeat timeout

2019-07-31 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang reassigned FLINK-13489:


Assignee: Yingjie Cao  (was: zhijiang)

> Heavy deployment end-to-end test fails on Travis with TM heartbeat timeout
> --
>
> Key: FLINK-13489
> URL: https://issues.apache.org/jira/browse/FLINK-13489
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Yingjie Cao
>Priority: Blocker
> Fix For: 1.9.0
>
>
> https://api.travis-ci.org/v3/job/564925128/log.txt
> {code}
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: 1b4f1807cc749628cfc1bdf04647527a)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:250)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>   at 
> org.apache.flink.deployment.HeavyDeploymentStressTestProgram.main(HeavyDeploymentStressTestProgram.java:70)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:247)
>   ... 21 more
> Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager 
> with id ea456d6a590eca7598c19c4d35e56db9 timed out.
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
>   at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:318)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>   at 

[jira] [Assigned] (FLINK-13487) TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall failed on Travis

2019-07-31 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang reassigned FLINK-13487:


Assignee: Yun Gao  (was: Yun Gao)

> TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall 
> failed on Travis
> 
>
> Key: FLINK-13487
> URL: https://issues.apache.org/jira/browse/FLINK-13487
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: error_log.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> https://api.travis-ci.org/v3/job/564925114/log.txt
> {code}
> 21:14:47.090 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 5.754 s <<< FAILURE! - in 
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest
> 21:14:47.090 [ERROR] 
> testPartitionReleaseAfterReleaseCall(org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest)
>   Time elapsed: 0.136 s  <<< ERROR!
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: 
> Could not submit task because there is no JobManager associated for the job 
> 2a0ab40cb53241799b71ff6fd2f53d3d.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionRelease(TaskExecutorPartitionLifecycleTest.java:331)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall(TaskExecutorPartitionLifecycleTest.java:201)
> Caused by: 
> org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: 
> Could not submit task because there is no JobManager associated for the job 
> 2a0ab40cb53241799b71ff6fd2f53d3d.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition

2019-07-30 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13478:
-
Description: 
We have two basic release strategies atm. One is based on consumption via 
network notification from consumer. The other is based on notification via RPC 
from JM/scheduler.

But in current implementation of {{BoundedBlockingSubpartition}}, these two 
ways are a bit coupled with each other. If the JM decides to release partition 
and send the release RPC call, it has to wait all the reader views really 
released before finally closing the data file. So the JM-RPC-based release 
strategy still relies on the consumption confirmation via network to some 
extent.

In order to make these two release strategies independent, if the release call 
is from JM/scheduler RPC, we could immediately release all the view readers and 
then close the data file as well. If the release is based on consumption 
notification, after all the view readers for one subpartition are released, the 
subpartition could further notify the parent {{ResultPartition}} which decides 
whether to release the whole partition or not.

  was:
We have two basic release strategies atm. One is based on consumption via 
network notification from consumer. The other is based on notification via RPC 
from JM/scheduler.

But in current implementation of {{BoundedBlockingSubpartition}}, these two 
ways are coupled with each other. In detail, the network consumption 
notification could only close data file after the release RPC was triggered 
from JM/scheduler. Also for the release RPC it has to wait all the reader views 
really released before closing the data file. So the release RPC still relies 
on network notification to some extent.

In order to make these two release strategies independent, if the release call 
is from JM/scheduler RPC, we could immediately release all the view readers and 
then close the data file as well. If the release is based on consumption 
notification, after all the view readers for one subpartition are released, the 
subpartition could further notify the parent {{ResultPartition}} which decides 
whether to release the whole partition or not.


> Decouple two different release strategies in BoundedBlockingSubpartition
> 
>
> Key: FLINK-13478
> URL: https://issues.apache.org/jira/browse/FLINK-13478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> We have two basic release strategies atm. One is based on consumption via 
> network notification from consumer. The other is based on notification via 
> RPC from JM/scheduler.
> But in current implementation of {{BoundedBlockingSubpartition}}, these two 
> ways are a bit coupled with each other. If the JM decides to release 
> partition and send the release RPC call, it has to wait all the reader views 
> really released before finally closing the data file. So the JM-RPC-based 
> release strategy still relies on the consumption confirmation via network to 
> some extent.
> In order to make these two release strategies independent, if the release 
> call is from JM/scheduler RPC, we could immediately release all the view 
> readers and then close the data file as well. If the release is based on 
> consumption notification, after all the view readers for one subpartition are 
> released, the subpartition could further notify the parent 
> {{ResultPartition}} which decides whether to release the whole partition or 
> not.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition

2019-07-30 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13478:
-
Description: 
We have two basic release strategies atm. One is based on consumption via 
network notification from consumer. The other is based on notification via RPC 
from JM/scheduler.

But in current implementation of {{BoundedBlockingSubpartition}}, these two 
ways are a bit coupled with each other. If the JM decides to release partition 
and send the release RPC call, it has to wait all the reader views really 
released before finally closing the data file. So the JM-RPC-based release 
strategy still relies on the consumption confirmation via network to some 
extent.

In order to make these two release strategies independent, if the release call 
is from JM/scheduler RPC, we could immediately release all the view readers and 
then close the data file as well. If the release is based on consumption 
confirmation, after all the view readers for one subpartition are released, the 
subpartition could further notify the parent {{ResultPartition}} which decides 
whether to release the whole partition or not.

  was:
We have two basic release strategies atm. One is based on consumption via 
network notification from consumer. The other is based on notification via RPC 
from JM/scheduler.

But in current implementation of {{BoundedBlockingSubpartition}}, these two 
ways are a bit coupled with each other. If the JM decides to release partition 
and send the release RPC call, it has to wait all the reader views really 
released before finally closing the data file. So the JM-RPC-based release 
strategy still relies on the consumption confirmation via network to some 
extent.

In order to make these two release strategies independent, if the release call 
is from JM/scheduler RPC, we could immediately release all the view readers and 
then close the data file as well. If the release is based on consumption 
notification, after all the view readers for one subpartition are released, the 
subpartition could further notify the parent {{ResultPartition}} which decides 
whether to release the whole partition or not.


> Decouple two different release strategies in BoundedBlockingSubpartition
> 
>
> Key: FLINK-13478
> URL: https://issues.apache.org/jira/browse/FLINK-13478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> We have two basic release strategies atm. One is based on consumption via 
> network notification from consumer. The other is based on notification via 
> RPC from JM/scheduler.
> But in current implementation of {{BoundedBlockingSubpartition}}, these two 
> ways are a bit coupled with each other. If the JM decides to release 
> partition and send the release RPC call, it has to wait all the reader views 
> really released before finally closing the data file. So the JM-RPC-based 
> release strategy still relies on the consumption confirmation via network to 
> some extent.
> In order to make these two release strategies independent, if the release 
> call is from JM/scheduler RPC, we could immediately release all the view 
> readers and then close the data file as well. If the release is based on 
> consumption confirmation, after all the view readers for one subpartition are 
> released, the subpartition could further notify the parent 
> {{ResultPartition}} which decides whether to release the whole partition or 
> not.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition

2019-07-30 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896163#comment-16896163
 ] 

zhijiang commented on FLINK-13478:
--

I think it might belong to a pure refactoring.

>From behavior aspect, this fix would only speed the release of partition 
>resource if the release call from JM is already reaching on TM side, no need 
>to wait for consumers' network confirmation. But it seems no matter to release 
>partitions  a bit delay based on the assumption that the partition 
>canceled/consumed would be finally confirmed via network once establishment.

It mainly brings a bit confusing to understand the partition release strategy. 
If the strategy of partition release is based on JM's decision which is always 
reliable, then it should not be blocked/delayed by network notification.

We could focus on it if necessary in 1.10 release.

 

> Decouple two different release strategies in BoundedBlockingSubpartition
> 
>
> Key: FLINK-13478
> URL: https://issues.apache.org/jira/browse/FLINK-13478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> We have two basic release strategies atm. One is based on consumption via 
> network notification from consumer. The other is based on notification via 
> RPC from JM/scheduler.
> But in current implementation of {{BoundedBlockingSubpartition}}, these two 
> ways are coupled with each other. In detail, the network consumption 
> notification could only close data file after the release RPC was triggered 
> from JM/scheduler. Also for the release RPC it has to wait all the reader 
> views really released before closing the data file. So the release RPC still 
> relies on network notification to some extent.
> In order to make these two release strategies independent, if the release 
> call is from JM/scheduler RPC, we could immediately release all the view 
> readers and then close the data file as well. If the release is based on 
> consumption notification, after all the view readers for one subpartition are 
> released, the subpartition could further notify the parent 
> {{ResultPartition}} which decides whether to release the whole partition or 
> not.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Issue Comment Deleted] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition

2019-07-30 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13478:
-
Comment: was deleted

(was: I think it might belong to a pure refactoring.

>From behavior aspect, this fix would only speed the release of partition 
>resource if the release call from JM is already reaching on TM side, no need 
>to wait for consumers' network confirmation. But it seems no matter to release 
>partitions  a bit delay based on the assumption that the partition 
>canceled/consumed would be finally confirmed via network once establishment.

It mainly brings a bit confusing to understand the partition release strategy. 
If the strategy of partition release is based on JM's decision which is always 
reliable, then it should not be blocked/delayed by network notification.

We could focus on it if necessary in 1.10 release.

 )

> Decouple two different release strategies in BoundedBlockingSubpartition
> 
>
> Key: FLINK-13478
> URL: https://issues.apache.org/jira/browse/FLINK-13478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> We have two basic release strategies atm. One is based on consumption via 
> network notification from consumer. The other is based on notification via 
> RPC from JM/scheduler.
> But in current implementation of {{BoundedBlockingSubpartition}}, these two 
> ways are coupled with each other. In detail, the network consumption 
> notification could only close data file after the release RPC was triggered 
> from JM/scheduler. Also for the release RPC it has to wait all the reader 
> views really released before closing the data file. So the release RPC still 
> relies on network notification to some extent.
> In order to make these two release strategies independent, if the release 
> call is from JM/scheduler RPC, we could immediately release all the view 
> readers and then close the data file as well. If the release is based on 
> consumption notification, after all the view readers for one subpartition are 
> released, the subpartition could further notify the parent 
> {{ResultPartition}} which decides whether to release the whole partition or 
> not.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition

2019-07-30 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896149#comment-16896149
 ] 

zhijiang commented on FLINK-13478:
--

I think it might belong to a pure refactoring.

>From behavior aspect, this fix would only speed the release of partition 
>resource if the release call from JM is already reaching on TM side, no need 
>to wait for consumers' network confirmation. But it seems no matter to release 
>partitions  a bit delay based on the assumption that the partition 
>canceled/consumed would be finally confirmed via network once establishment.

It mainly brings a bit confusing to understand the partition release strategy. 
If the strategy of partition release is based on JM's decision which is always 
reliable, then it should not be blocked/delayed by network notification.

We could focus on it if necessary in 1.10 release.

 

> Decouple two different release strategies in BoundedBlockingSubpartition
> 
>
> Key: FLINK-13478
> URL: https://issues.apache.org/jira/browse/FLINK-13478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> We have two basic release strategies atm. One is based on consumption via 
> network notification from consumer. The other is based on notification via 
> RPC from JM/scheduler.
> But in current implementation of {{BoundedBlockingSubpartition}}, these two 
> ways are coupled with each other. In detail, the network consumption 
> notification could only close data file after the release RPC was triggered 
> from JM/scheduler. Also for the release RPC it has to wait all the reader 
> views really released before closing the data file. So the release RPC still 
> relies on network notification to some extent.
> In order to make these two release strategies independent, if the release 
> call is from JM/scheduler RPC, we could immediately release all the view 
> readers and then close the data file as well. If the release is based on 
> consumption notification, after all the view readers for one subpartition are 
> released, the subpartition could further notify the parent 
> {{ResultPartition}} which decides whether to release the whole partition or 
> not.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13487) TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall failed on Travis

2019-07-30 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896083#comment-16896083
 ] 

zhijiang commented on FLINK-13487:
--

Thanks for the cooperation from [~SleePy] and [~gaoyunhaii] to trace the root 
cause. It is mainly caused by unfinished registration while submitting tasks 
and we could reoccur it when adjust the codes to sleep for a while during 
registration.  We should make the submitting task happen only after 
registration done.

[~gaoyunhaii] would fix for it.

> TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall 
> failed on Travis
> 
>
> Key: FLINK-13487
> URL: https://issues.apache.org/jira/browse/FLINK-13487
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> https://api.travis-ci.org/v3/job/564925114/log.txt
> {code}
> 21:14:47.090 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 5.754 s <<< FAILURE! - in 
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest
> 21:14:47.090 [ERROR] 
> testPartitionReleaseAfterReleaseCall(org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest)
>   Time elapsed: 0.136 s  <<< ERROR!
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: 
> Could not submit task because there is no JobManager associated for the job 
> 2a0ab40cb53241799b71ff6fd2f53d3d.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionRelease(TaskExecutorPartitionLifecycleTest.java:331)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall(TaskExecutorPartitionLifecycleTest.java:201)
> Caused by: 
> org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: 
> Could not submit task because there is no JobManager associated for the job 
> 2a0ab40cb53241799b71ff6fd2f53d3d.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (FLINK-13487) TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall failed on Travis

2019-07-30 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang reassigned FLINK-13487:


Assignee: Yun Gao  (was: zhijiang)

> TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall 
> failed on Travis
> 
>
> Key: FLINK-13487
> URL: https://issues.apache.org/jira/browse/FLINK-13487
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Yun Gao
>Priority: Blocker
> Fix For: 1.9.0
>
>
> https://api.travis-ci.org/v3/job/564925114/log.txt
> {code}
> 21:14:47.090 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 5.754 s <<< FAILURE! - in 
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest
> 21:14:47.090 [ERROR] 
> testPartitionReleaseAfterReleaseCall(org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest)
>   Time elapsed: 0.136 s  <<< ERROR!
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: 
> Could not submit task because there is no JobManager associated for the job 
> 2a0ab40cb53241799b71ff6fd2f53d3d.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionRelease(TaskExecutorPartitionLifecycleTest.java:331)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall(TaskExecutorPartitionLifecycleTest.java:201)
> Caused by: 
> org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: 
> Could not submit task because there is no JobManager associated for the job 
> 2a0ab40cb53241799b71ff6fd2f53d3d.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader

2019-07-30 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895932#comment-16895932
 ] 

zhijiang commented on FLINK-13442:
--

I change the priority of this issue as not blocker and create another ticket 
FLINK-13493 for tracing the possible issue of blocking partition release.

I know the default release strategy for blocking partition is via notification 
from JM/scheduler. But if we might switch the option to based on consumption 
notification, it would cause problems.

> Remove unnecessary notifySubpartitionConsumed method from view reader 
> --
>
> Key: FLINK-13442
> URL: https://issues.apache.org/jira/browse/FLINK-13442
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods of 
> `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` 
> NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in 
> netty stack during releasing resources.
> As confirmed in FLINK-13245, in order to make this release logic simple and 
> clean, we could remove the redundant `notifySubpartitionConsumed` from 
> `NetworkSequenceViewReader` side, and also remove it from 
> `ResultSubpartitionView` side. In the implementation of 
> `ResultSubpartitionView#releaseAllResources` it would further notify the 
> parent subpartition of consumed state via 
> `ResultSubpartition#notifySubpartitionConsumed` which further feedback to 
> parent `ResultPartition` layer via `onConsumedSubpartition`. Finally 
> `ResultPartition` could decide whether to release itself or not.
> E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly 
> used for pipelined partition, it would release partition after reference 
> counter decreased to 0. For the case of `ResultPartition` which would be 
> generated for blocking partition by default, it would never be released after 
> notifying consumed. And the JM/scheduler would decide when to release 
> partition properly.
> In addition, `InputChannel#notifySubpartitionConsumed` could also be removed 
> as a result of above.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty

2019-07-30 Thread zhijiang (JIRA)
zhijiang created FLINK-13493:


 Summary: BoundedBlockingSubpartition only notifies 
onConsumedSubpartition when all the readers are empty
 Key: FLINK-13493
 URL: https://issues.apache.org/jira/browse/FLINK-13493
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


In previous implementation, it would always notify the {{ResultPartition}} of 
consumed subpartition if any reader view is released. Based on the 
reference-counter release strategy it might cause problems if one blocking 
subpartition has multiple readers. That means the whole result partition might 
be released but there are still alive readers in some subpartitions.

Although the default release strategy for blocking partition is based on 
JM/scheduler notification atm. But if we switch the option to based on 
consumption notification it would cause problems. And from the subpartition 
side it should has the right behavior no matter what is the specific release 
strategy in upper layer.

In order to fix this bug, the {{BoundedBlockingSubpartition}} would only notify 
{{onConsumedSubpartition}} when all the readers are empty.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader

2019-07-30 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13442:
-
Fix Version/s: (was: 1.9.0)
   1.10

> Remove unnecessary notifySubpartitionConsumed method from view reader 
> --
>
> Key: FLINK-13442
> URL: https://issues.apache.org/jira/browse/FLINK-13442
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods of 
> `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` 
> NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in 
> netty stack during releasing resources.
> As confirmed in FLINK-13245, in order to make this release logic simple and 
> clean, we could remove the redundant `notifySubpartitionConsumed` from 
> `NetworkSequenceViewReader` side, and also remove it from 
> `ResultSubpartitionView` side. In the implementation of 
> `ResultSubpartitionView#releaseAllResources` it would further notify the 
> parent subpartition of consumed state via 
> `ResultSubpartition#notifySubpartitionConsumed` which further feedback to 
> parent `ResultPartition` layer via `onConsumedSubpartition`. Finally 
> `ResultPartition` could decide whether to release itself or not.
> E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly 
> used for pipelined partition, it would release partition after reference 
> counter decreased to 0. For the case of `ResultPartition` which would be 
> generated for blocking partition by default, it would never be released after 
> notifying consumed. And the JM/scheduler would decide when to release 
> partition properly.
> In addition, `InputChannel#notifySubpartitionConsumed` could also be removed 
> as a result of above.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader

2019-07-30 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13442:
-
Issue Type: Improvement  (was: Bug)

> Remove unnecessary notifySubpartitionConsumed method from view reader 
> --
>
> Key: FLINK-13442
> URL: https://issues.apache.org/jira/browse/FLINK-13442
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods of 
> `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` 
> NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in 
> netty stack during releasing resources.
> As confirmed in FLINK-13245, in order to make this release logic simple and 
> clean, we could remove the redundant `notifySubpartitionConsumed` from 
> `NetworkSequenceViewReader` side, and also remove it from 
> `ResultSubpartitionView` side. In the implementation of 
> `ResultSubpartitionView#releaseAllResources` it would further notify the 
> parent subpartition of consumed state via 
> `ResultSubpartition#notifySubpartitionConsumed` which further feedback to 
> parent `ResultPartition` layer via `onConsumedSubpartition`. Finally 
> `ResultPartition` could decide whether to release itself or not.
> E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly 
> used for pipelined partition, it would release partition after reference 
> counter decreased to 0. For the case of `ResultPartition` which would be 
> generated for blocking partition by default, it would never be released after 
> notifying consumed. And the JM/scheduler would decide when to release 
> partition properly.
> In addition, `InputChannel#notifySubpartitionConsumed` could also be removed 
> as a result of above.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader

2019-07-30 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13442:
-
Priority: Minor  (was: Blocker)

> Remove unnecessary notifySubpartitionConsumed method from view reader 
> --
>
> Key: FLINK-13442
> URL: https://issues.apache.org/jira/browse/FLINK-13442
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods of 
> `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` 
> NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in 
> netty stack during releasing resources.
> As confirmed in FLINK-13245, in order to make this release logic simple and 
> clean, we could remove the redundant `notifySubpartitionConsumed` from 
> `NetworkSequenceViewReader` side, and also remove it from 
> `ResultSubpartitionView` side. In the implementation of 
> `ResultSubpartitionView#releaseAllResources` it would further notify the 
> parent subpartition of consumed state via 
> `ResultSubpartition#notifySubpartitionConsumed` which further feedback to 
> parent `ResultPartition` layer via `onConsumedSubpartition`. Finally 
> `ResultPartition` could decide whether to release itself or not.
> E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly 
> used for pipelined partition, it would release partition after reference 
> counter decreased to 0. For the case of `ResultPartition` which would be 
> generated for blocking partition by default, it would never be released after 
> notifying consumed. And the JM/scheduler would decide when to release 
> partition properly.
> In addition, `InputChannel#notifySubpartitionConsumed` could also be removed 
> as a result of above.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader

2019-07-30 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13442:
-
Issue Type: Bug  (was: Improvement)

> Remove unnecessary notifySubpartitionConsumed method from view reader 
> --
>
> Key: FLINK-13442
> URL: https://issues.apache.org/jira/browse/FLINK-13442
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods of 
> `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` 
> NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in 
> netty stack during releasing resources.
> As confirmed in FLINK-13245, in order to make this release logic simple and 
> clean, we could remove the redundant `notifySubpartitionConsumed` from 
> `NetworkSequenceViewReader` side, and also remove it from 
> `ResultSubpartitionView` side. In the implementation of 
> `ResultSubpartitionView#releaseAllResources` it would further notify the 
> parent subpartition of consumed state via 
> `ResultSubpartition#notifySubpartitionConsumed` which further feedback to 
> parent `ResultPartition` layer via `onConsumedSubpartition`. Finally 
> `ResultPartition` could decide whether to release itself or not.
> E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly 
> used for pipelined partition, it would release partition after reference 
> counter decreased to 0. For the case of `ResultPartition` which would be 
> generated for blocking partition by default, it would never be released after 
> notifying consumed. And the JM/scheduler would decide when to release 
> partition properly.
> In addition, `InputChannel#notifySubpartitionConsumed` could also be removed 
> as a result of above.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader

2019-07-30 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13442:
-
Priority: Blocker  (was: Minor)

> Remove unnecessary notifySubpartitionConsumed method from view reader 
> --
>
> Key: FLINK-13442
> URL: https://issues.apache.org/jira/browse/FLINK-13442
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods of 
> `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` 
> NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in 
> netty stack during releasing resources.
> As confirmed in FLINK-13245, in order to make this release logic simple and 
> clean, we could remove the redundant `notifySubpartitionConsumed` from 
> `NetworkSequenceViewReader` side, and also remove it from 
> `ResultSubpartitionView` side. In the implementation of 
> `ResultSubpartitionView#releaseAllResources` it would further notify the 
> parent subpartition of consumed state via 
> `ResultSubpartition#notifySubpartitionConsumed` which further feedback to 
> parent `ResultPartition` layer via `onConsumedSubpartition`. Finally 
> `ResultPartition` could decide whether to release itself or not.
> E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly 
> used for pipelined partition, it would release partition after reference 
> counter decreased to 0. For the case of `ResultPartition` which would be 
> generated for blocking partition by default, it would never be released after 
> notifying consumed. And the JM/scheduler would decide when to release 
> partition properly.
> In addition, `InputChannel#notifySubpartitionConsumed` could also be removed 
> as a result of above.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition

2019-07-30 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895832#comment-16895832
 ] 

zhijiang commented on FLINK-13478:
--

[~StephanEwen] [~pnowojski] [~azagrebin] What do you think this issue?

> Decouple two different release strategies in BoundedBlockingSubpartition
> 
>
> Key: FLINK-13478
> URL: https://issues.apache.org/jira/browse/FLINK-13478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> We have two basic release strategies atm. One is based on consumption via 
> network notification from consumer. The other is based on notification via 
> RPC from JM/scheduler.
> But in current implementation of {{BoundedBlockingSubpartition}}, these two 
> ways are coupled with each other. In detail, the network consumption 
> notification could only close data file after the release RPC was triggered 
> from JM/scheduler. Also for the release RPC it has to wait all the reader 
> views really released before closing the data file. So the release RPC still 
> relies on network notification to some extent.
> In order to make these two release strategies independent, if the release 
> call is from JM/scheduler RPC, we could immediately release all the view 
> readers and then close the data file as well. If the release is based on 
> consumption notification, after all the view readers for one subpartition are 
> released, the subpartition could further notify the parent 
> {{ResultPartition}} which decides whether to release the whole partition or 
> not.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader

2019-07-30 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13442:
-
Fix Version/s: 1.9.0

> Remove unnecessary notifySubpartitionConsumed method from view reader 
> --
>
> Key: FLINK-13442
> URL: https://issues.apache.org/jira/browse/FLINK-13442
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods of 
> `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` 
> NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in 
> netty stack during releasing resources.
> As confirmed in FLINK-13245, in order to make this release logic simple and 
> clean, we could remove the redundant `notifySubpartitionConsumed` from 
> `NetworkSequenceViewReader` side, and also remove it from 
> `ResultSubpartitionView` side. In the implementation of 
> `ResultSubpartitionView#releaseAllResources` it would further notify the 
> parent subpartition of consumed state via 
> `ResultSubpartition#notifySubpartitionConsumed` which further feedback to 
> parent `ResultPartition` layer via `onConsumedSubpartition`. Finally 
> `ResultPartition` could decide whether to release itself or not.
> E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly 
> used for pipelined partition, it would release partition after reference 
> counter decreased to 0. For the case of `ResultPartition` which would be 
> generated for blocking partition by default, it would never be released after 
> notifying consumed. And the JM/scheduler would decide when to release 
> partition properly.
> In addition, `InputChannel#notifySubpartitionConsumed` could also be removed 
> as a result of above.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition

2019-07-29 Thread zhijiang (JIRA)
zhijiang created FLINK-13478:


 Summary: Decouple two different release strategies in 
BoundedBlockingSubpartition
 Key: FLINK-13478
 URL: https://issues.apache.org/jira/browse/FLINK-13478
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


We have two basic release strategies atm. One is based on consumption via 
network notification from consumer. The other is based on notification via RPC 
from JM/scheduler.

But in current implementation of {{BoundedBlockingSubpartition}}, these two 
ways are coupled with each other. In detail, the network consumption 
notification could only close data file after the release RPC was triggered 
from JM/scheduler. Also for the release RPC it has to wait all the reader views 
really released before closing the data file. So the release RPC still relies 
on network notification to some extent.

In order to make these two release strategies independent, if the release call 
is from JM/scheduler RPC, we could immediately release all the view readers and 
then close the data file as well. If the release is based on consumption 
notification, after all the view readers for one subpartition are released, the 
subpartition could further notify the parent {{ResultPartition}} which decides 
whether to release the whole partition or not.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader

2019-07-29 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13442:
-
Description: 
Currently the methods of `NetworkSequenceViewReader#notifySubpartitionConsumed` 
and ` NetworkSequenceViewReader#releaseAllResources` would be called meanwhile 
in netty stack during releasing resources.

As confirmed in FLINK-13245, in order to make this release logic simple and 
clean, we could remove the redundant `notifySubpartitionConsumed` from 
`NetworkSequenceViewReader` side, and also remove it from 
`ResultSubpartitionView` side. In the implementation of 
`ResultSubpartitionView#releaseAllResources` it would further notify the parent 
subpartition of consumed state via 
`ResultSubpartition#notifySubpartitionConsumed` which further feedback to 
parent `ResultPartition` layer via `onConsumedSubpartition`. Finally 
`ResultPartition` could decide whether to release itself or not.

E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly used 
for pipelined partition, it would release partition after reference counter 
decreased to 0. For the case of `ResultPartition` which would be generated for 
blocking partition by default, it would never be released after notifying 
consumed. And the JM/scheduler would decide when to release partition properly.

In addition, `InputChannel#notifySubpartitionConsumed` could also be removed as 
a result of above.

  was:
Currently the methods of `NetworkSequenceViewReader#notifySubpartitionConsumed` 
and ` NetworkSequenceViewReader#releaseAllResources` would be called meanwhile 
in netty stack during releasing resources.

To make this release logic simple and clean, we could remove the redundant 
`notifySubpartitionConsumed` from `NetworkSequenceViewReader` side, and also 
remove it from `ResultSubpartitionView` side. In the implementation of 
`ResultSubpartitionView#releaseAllResources` it would further notify the parent 
subpartition of consumed state via 
`ResultSubpartition#notifySubpartitionConsumed` which further feedback to 
parent `ResultPartition` layer via `onConsumedSubpartition`. Finally 
`ResultPartition` could decide whether to release itself or not.

E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly used 
for pipelined partition, it would release partition after reference counter 
decreased to 0. For the case of `ResultPartition` which would be generated for 
blocking partition by default, it would never be released after notifying 
consumed. And the JM/scheduler would decide when to release partition properly.

In addition, `InputChannel#notifySubpartitionConsumed` could also be removed as 
a result of above.


> Remove unnecessary notifySubpartitionConsumed method from view reader 
> --
>
> Key: FLINK-13442
> URL: https://issues.apache.org/jira/browse/FLINK-13442
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> Currently the methods of 
> `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` 
> NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in 
> netty stack during releasing resources.
> As confirmed in FLINK-13245, in order to make this release logic simple and 
> clean, we could remove the redundant `notifySubpartitionConsumed` from 
> `NetworkSequenceViewReader` side, and also remove it from 
> `ResultSubpartitionView` side. In the implementation of 
> `ResultSubpartitionView#releaseAllResources` it would further notify the 
> parent subpartition of consumed state via 
> `ResultSubpartition#notifySubpartitionConsumed` which further feedback to 
> parent `ResultPartition` layer via `onConsumedSubpartition`. Finally 
> `ResultPartition` could decide whether to release itself or not.
> E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly 
> used for pipelined partition, it would release partition after reference 
> counter decreased to 0. For the case of `ResultPartition` which would be 
> generated for blocking partition by default, it would never be released after 
> notifying consumed. And the JM/scheduler would decide when to release 
> partition properly.
> In addition, `InputChannel#notifySubpartitionConsumed` could also be removed 
> as a result of above.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader

2019-07-26 Thread zhijiang (JIRA)
zhijiang created FLINK-13442:


 Summary: Remove unnecessary notifySubpartitionConsumed method from 
view reader 
 Key: FLINK-13442
 URL: https://issues.apache.org/jira/browse/FLINK-13442
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently the methods of `NetworkSequenceViewReader#notifySubpartitionConsumed` 
and ` NetworkSequenceViewReader#releaseAllResources` would be called meanwhile 
in netty stack during releasing resources.

To make this release logic simple and clean, we could remove the redundant 
`notifySubpartitionConsumed` from `NetworkSequenceViewReader` side, and also 
remove it from `ResultSubpartitionView` side. In the implementation of 
`ResultSubpartitionView#releaseAllResources` it would further notify the parent 
subpartition of consumed state via 
`ResultSubpartition#notifySubpartitionConsumed` which further feedback to 
parent `ResultPartition` layer via `onConsumedSubpartition`. Finally 
`ResultPartition` could decide whether to release itself or not.

E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly used 
for pipelined partition, it would release partition after reference counter 
decreased to 0. For the case of `ResultPartition` which would be generated for 
blocking partition by default, it would never be released after notifying 
consumed. And the JM/scheduler would decide when to release partition properly.

In addition, `InputChannel#notifySubpartitionConsumed` could also be removed as 
a result of above.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13325) Add test case for FLINK-13249

2019-07-26 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16893462#comment-16893462
 ] 

zhijiang commented on FLINK-13325:
--

Yes, I already reviewed this PR several days ago. Waiting for the further 
feedback from [~srichter].

> Add test case for FLINK-13249
> -
>
> Key: FLINK-13325
> URL: https://issues.apache.org/jira/browse/FLINK-13325
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.9.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13245) Network stack is leaking files

2019-07-26 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16893427#comment-16893427
 ] 

zhijiang commented on FLINK-13245:
--

Thanks for joining this discussion and giving helpful suggestions 
[~StephanEwen].

I agree with the further refactoring of above relevant processes for partition 
release in release-1.10.

Considering the inconsistent situation mentioned above, actually it could work 
correctly by default now. Because the reference counter in 
`ReleaseOnConsumptionResultPartition` would only be used for the case of 
pipelined partitions by default. For the blocking partitions which would be 
consumed multiple times via creating multiple readers/views, the release should 
be controlled by the scheduler.

But I agree the above logic is fragile and might cause inconsistent easily if 
not setting correctly. We could further refactor this issue later.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13245) Network stack is leaking files

2019-07-22 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16890630#comment-16890630
 ] 

zhijiang commented on FLINK-13245:
--

[~azagrebin] I totally agree with your above point. The first priority should 
make the current fine grained recovery work in release-1.9. As we confirmed 
before, the consumption notification via network is just best-effort atm, not 
always reliable especially when the network connection is not established 
during consumer failed.

I remembered that the JM would always release partitions while restarting 
producer tasks before, maybe I missed some parts while reviewing the relevant 
PRs of partition lifecycle feature. I am sorry for not giving this potential 
issue from network stack before.

We could solve the current issue in FLINK-13771 now, and further address the 
semantics of partition release and refactor the network behavior if necessary 
in release-1.10.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13245) Network stack is leaking files

2019-07-19 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889022#comment-16889022
 ] 

zhijiang commented on FLINK-13245:
--

After confirming the comments from [~Zentol] in PR, I found that for the case 
of `SlotCountExceedingParallelismTest` it would not generate 
`ReleaseOnConsumptionResultPartition` because the partition is blocking type. 
So the reference counter would not be used in `ResultPartition`, and the files 
for bounded blocking partition could be released finally via calling 
`TaskExecutorGateway#releasePartitions` based on 
`RegionPartitionReleaseStrategy`.

The description of this jira ticket might not be accurate. In my local running 
this test in Mac system, it has no file leaks after finished. I am not sure why 
it has file leaks in windows system and I guess it might be relevant with mmap 
internal mechanism in different systems. I would double verify this test in 
windows system.

My PR modifications seems only for the case of pipelined partition which is 
using `ReleaseOnConsumptionResultPartition`, then the call of 
`notifySubpartitionConsumed` would make the reference counter become 0 finally 
to trigger release. But for the pipelined partition it is no issues for 
persistent file.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13245) Network stack is leaking files

2019-07-19 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888788#comment-16888788
 ] 

zhijiang commented on FLINK-13245:
--

Thanks for these further thoughts and I think it would make things more clearly 
after discussion. [~azagrebin]

I could understand your above concerns. I agree that the current semantics of 
`CancelPartitionRequest/CloseRequest` are not very accurate, because they could 
indicate either successful consumption on consumer side or consumer task fails 
/any exceptions during consumption.

Considering the concern of when to call `notifySubpartitionConsumed`, I think 
the current implementation is based on whether the producer receives the 
confirmable notification(Cancel/CloseRequest) from consumer side. If it 
receives any messages then it would call `notifySubpartitionConsumed` no matter 
with consumer finishes/fails. In the case of handling channel exception, it 
only happens in consumer locally, so it would not call 
`notifySubpartitionConsumed`. Also for the case of channel inactive, the 
producer could not distinguish whether it is caused by initiative close 
connection on consumer side or TM lost exceptionally, so it would not call ` 
notifySubpartitionConsumed`.

For the first concern, we could provide the more definitely messages for 
clearly semantics of consumption successful/failed instead of current 
`Cancel/ClosePartition`. For the second concern we could also consider the 
proper way for handling messages/exception/inactive. But one precondition is 
that we should confirm the specific semantic of releasing partition based on 
consumption in partition management feature.

Currently there are three strategies which could release partition:
 * partition release based on consumers confirmation via network
 * partition release based on JM notification
 * partition release when disconnection between TM/JM

For the first strategy (partition release based on consumers confirmation): 
 * We could define the network message as `ReleasePartition` instead of current 
`Cancel/ClosePartition`. Then it might not care about whether the consumer 
finishes/fails during consumption. The precondition for this way is reliable 
network notification, but actually we have no ack mechanism for such message in 
application layer. Even if consumer task fails before establishing the 
connection with producer, we still need rely on JM notification of releasing 
partitions of producers.
 * We could not provide specific semantic as now, and the current strategy is 
only coupling with existing mechanism in network stack.
 * The semantic actually could be defined clearly as partition release based on 
one successful consumption. And considering the implementation it could be done 
by both consumer notification and JM notification.

In general I think we should consider how to define different release 
strategies which should provide specific semantics, and not caring about 
implementations when thinking about strategy. Actually any strategy could be 
implemented in multiple ways. E.g. the semantics might be divided into at-least 
once consumption, exactly-once consumption and at-most once consumption. After 
we confirm the specific semantics, then we would know how to refactor the 
current network stack considering implementation for certain strategy.

It might need worth further re-architecture the partition release strategy in 
release-1.10, because the feature of interactive queries is difficult to expand 
another strategy based on current architecture. In order not to block current 
release, the existing modifications could solve the file leak issue I think.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count 

[jira] [Commented] (FLINK-10672) Task stuck while writing output to flink

2019-07-17 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16887642#comment-16887642
 ] 

zhijiang commented on FLINK-10672:
--

[~ibzib] I think it is the case of back pressure. The producer is blocked in 
the process of  `requestMemorySegment` because its produced partition data 
could not be consumed by downstream tasks. In order to trace the root issue, 
the key point is to find the proper downstream task which triggers the back 
pressure.

E.G. for the topology of A-->B–>C—>D, if we found vertex A is blocked by 
`requestMemorySegment` long time, we can trace the state of upstream vertex in 
topology. If vertex B is also blocked, we could continue tracing the upstream 
until we find the vertex which is not blocked any more, assuming vertex C in 
this case.  Then we further check which specific parallelism task in vertex C 
causes the above serious block. Such task has a feature that its inqueue buffer 
size is very high and its out queue size is low even empty, and the relevant 
metrics could help. If such task is found, we could further check its stack to 
confirm where it is stuck.

Maybe you could get the root cause then, or you can provide further findings 
for us. 

 

> Task stuck while writing output to flink
> 
>
> Key: FLINK-10672
> URL: https://issues.apache.org/jira/browse/FLINK-10672
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.5.4
> Environment: OS: Debuan rodente 4.17
> Flink version: 1.5.4
> ||Key||Value||
> |jobmanager.heap.mb|1024|
> |jobmanager.rpc.address|localhost|
> |jobmanager.rpc.port|6123|
> |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter|
> |metrics.reporter.jmx.port|9250-9260|
> |metrics.reporters|jmx|
> |parallelism.default|1|
> |rest.port|8081|
> |taskmanager.heap.mb|1024|
> |taskmanager.numberOfTaskSlots|1|
> |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26|
>  
> h1. Overview
> ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap 
> Size||Flink Managed Memory||
> |43501|1|0|12|62.9 GB|922 MB|642 MB|
> h1. Memory
> h2. JVM (Heap/Non-Heap)
> ||Type||Committed||Used||Maximum||
> |Heap|922 MB|575 MB|922 MB|
> |Non-Heap|68.8 MB|64.3 MB|-1 B|
> |Total|991 MB|639 MB|922 MB|
> h2. Outside JVM
> ||Type||Count||Used||Capacity||
> |Direct|3,292|105 MB|105 MB|
> |Mapped|0|0 B|0 B|
> h1. Network
> h2. Memory Segments
> ||Type||Count||
> |Available|3,194|
> |Total|3,278|
> h1. Garbage Collection
> ||Collector||Count||Time||
> |G1_Young_Generation|13|336|
> |G1_Old_Generation|1|21|
>Reporter: Ankur Goenka
>Priority: Major
>  Labels: beam
> Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, 
> jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, 
> jstack_66985.log
>
>
> I am running a fairly complex pipleline with 200+ task.
> The pipeline works fine with small data (order of 10kb input) but gets stuck 
> with a slightly larger data (300kb input).
>  
> The task gets stuck while writing the output toFlink, more specifically it 
> gets stuck while requesting memory segment in local buffer pool. The Task 
> manager UI shows that it has enough memory and memory segments to work with.
> The relevant stack trace is 
> {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 
> tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]
>  java.lang.Thread.State: TIMED_WAITING (on object monitor)
>  at (C/C++) 0x7fef201c7dae (Unknown Source)
>  at (C/C++) 0x7fef1f2aea07 (Unknown Source)
>  at (C/C++) 0x7fef1f241cd3 (Unknown Source)
>  at java.lang.Object.wait(Native Method)
>  - waiting on <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
>  - locked <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>  at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
>  at 
> 

[jira] [Commented] (FLINK-10672) Task stuck while writing output to flink

2019-07-17 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16887201#comment-16887201
 ] 

zhijiang commented on FLINK-10672:
--

[~ibzib] which fix do you mean? 

In addition which flink version is used to cause above issue? I have not looked 
through this issue yet, and I would further check the jstack to confirm whether 
it has been solved already.

> Task stuck while writing output to flink
> 
>
> Key: FLINK-10672
> URL: https://issues.apache.org/jira/browse/FLINK-10672
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.5.4
> Environment: OS: Debuan rodente 4.17
> Flink version: 1.5.4
> ||Key||Value||
> |jobmanager.heap.mb|1024|
> |jobmanager.rpc.address|localhost|
> |jobmanager.rpc.port|6123|
> |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter|
> |metrics.reporter.jmx.port|9250-9260|
> |metrics.reporters|jmx|
> |parallelism.default|1|
> |rest.port|8081|
> |taskmanager.heap.mb|1024|
> |taskmanager.numberOfTaskSlots|1|
> |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26|
>  
> h1. Overview
> ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap 
> Size||Flink Managed Memory||
> |43501|1|0|12|62.9 GB|922 MB|642 MB|
> h1. Memory
> h2. JVM (Heap/Non-Heap)
> ||Type||Committed||Used||Maximum||
> |Heap|922 MB|575 MB|922 MB|
> |Non-Heap|68.8 MB|64.3 MB|-1 B|
> |Total|991 MB|639 MB|922 MB|
> h2. Outside JVM
> ||Type||Count||Used||Capacity||
> |Direct|3,292|105 MB|105 MB|
> |Mapped|0|0 B|0 B|
> h1. Network
> h2. Memory Segments
> ||Type||Count||
> |Available|3,194|
> |Total|3,278|
> h1. Garbage Collection
> ||Collector||Count||Time||
> |G1_Young_Generation|13|336|
> |G1_Old_Generation|1|21|
>Reporter: Ankur Goenka
>Priority: Major
>  Labels: beam
> Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, 
> jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, 
> jstack_66985.log
>
>
> I am running a fairly complex pipleline with 200+ task.
> The pipeline works fine with small data (order of 10kb input) but gets stuck 
> with a slightly larger data (300kb input).
>  
> The task gets stuck while writing the output toFlink, more specifically it 
> gets stuck while requesting memory segment in local buffer pool. The Task 
> manager UI shows that it has enough memory and memory segments to work with.
> The relevant stack trace is 
> {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 
> tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]
>  java.lang.Thread.State: TIMED_WAITING (on object monitor)
>  at (C/C++) 0x7fef201c7dae (Unknown Source)
>  at (C/C++) 0x7fef1f2aea07 (Unknown Source)
>  at (C/C++) 0x7fef1f241cd3 (Unknown Source)
>  at java.lang.Object.wait(Native Method)
>  - waiting on <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
>  - locked <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>  at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
>  at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
>  - locked <0xf6a60bd0> (a java.lang.Object)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
>  at 
> 

[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885832#comment-16885832
 ] 

zhijiang edited comment on FLINK-13245 at 7/16/19 4:16 AM:
---

We have some previous assumptions that ResultSubpartitionView could be released 
individually, but all the subpartitions are released together via 
`ResultPartition/ResultPartitionManager`.

After thinking it through, it might be reasonable to have both methods as 
{{ResultSubpartitionView#notifySubpartitionConsumed}} and 
{{ResultSubpartitionView#releaseAllResources}}, because they describe the 
different semantics. 
 * `releaseAllResources` is used for releasing resources from 
ResultSubpartitionView aspect. The view is created by netty stack which is also 
responsible for triggering the release. In detail it has two scenarios to 
trigger release: One case is that netty channel inactive/exception as current 
{{PartitonRequestQueue#channelInactive}} and 
{{PartitionRequestQueue#exceptionCaught}} done. The other case is that when 
ResultSubpartitionView is actually consumed via 
{{NettyMessage#CannelPartitionRequest}}.

 * `notifySubpartitionConsumed` only indicates the ResultSubpartition/View 
consumed via {{CannelPartitionRequest}}, so we should call this method while 
handling the cancel message. For the case of channel exception/inactive, it 
does not always indicate the consumption semantic, so we should not call this 
method as current done in {{PartitionRequestQueue}}. It is up to {{JobMaster}} 
whether to release partition in the case of channel inactive/exception. For the 
streaming job if the consumer fails, the {{JobMaster}} would also cancel the 
producer task to release the whole {{ResultPartition}}. For the batch job of 
blocking partition, if the consumer TM exits to cause channel inactive, the 
{{ResultPartition}} might not need to be released.

Overall, these two methods seem to decouple the release between 
{{ResultPartition}} and {{ResultSubpartitionView}}. So it makes sense to keep 
them as now, as long as we could handle the {{CannelPartitionRequest}} message 
correctly based on above modifications. [~azagrebin]

 


was (Author: zjwang):
We have some previous assumptions that ResultSubpartitionView could be released 
individually, but all the subpartitions are released together via 
`ResultPartition/ResultPartitionManager`.

After thinking it through, it might be reasonable to have both methods as 
{{ResultSubpartitionView#notifySubpartitionConsumed}} and 
{{ResultSubpartitionView#releaseAllResources}}, because they describe the 
different semantics. 
 * `releaseAllResources` is used for releasing resources from 
ResultSubpartitionView aspect. The view is created by netty stack which is also 
responsible for triggering the release. In detail it has two scenarios to 
trigger release: One case is that netty channel inactive/exception as current 
{{PartitonRequestQueue#channelInactive}} and 
{{PartitionRequestQueue#exceptionCaught}} done. The other case is that when 
ResultSubpartitionView is actually consumed via 
{{NettyMessage#CannelPartitionRequest}}.

 * `notifySubpartitionConsumed` only indicates the ResultSubpartition/View 
consumed via {{CannelPartitionRequest}}, so we should call this method while 
handling the cancel message. For the case of channel exception/inactive, it 
does not always indicate the consumption semantic, so we should not call this 
method as current done in {{PartitionRequestQueue}}. It is up to {{JobMaster}} 
whether to release partition in the case of channel inactive/exception. For the 
streaming job if the consumer fails, the {{JobMaster}} would also cancel the 
producer task to release the whole {{ResultPartition}}. For the batch job of 
blocking partition, if the consumer TM exits to cause channel inactive, the 
{{ResultPartition}} might not need to be released.

Overall, these two methods seem to decouple the release between 
{{ResultPartition}} and {{ResultSubpartitionView}}. So it makes sense to keep 
them as now, as long as we could handle the {{CannelPartitionRequest}} message 
correctly based on above modifications.

 

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> 

[jira] [Commented] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885832#comment-16885832
 ] 

zhijiang commented on FLINK-13245:
--

We have some previous assumptions that ResultSubpartitionView could be released 
individually, but all the subpartitions are released together via 
`ResultPartition/ResultPartitionManager`.

After thinking it through, it might be reasonable to have both methods as 
{{ResultSubpartitionView#notifySubpartitionConsumed}} and 
{{ResultSubpartitionView#releaseAllResources}}, because they describe the 
different semantics. 
 * `releaseAllResources` is used for releasing resources from 
ResultSubpartitionView aspect. The view is created by netty stack which is also 
responsible for triggering the release. In detail it has two scenarios to 
trigger release: One case is that netty channel inactive/exception as current 
{{PartitonRequestQueue#channelInactive}} and 
{{PartitionRequestQueue#exceptionCaught}} done. The other case is that when 
ResultSubpartitionView is actually consumed via 
{{NettyMessage#CannelPartitionRequest}}.

 * `notifySubpartitionConsumed` only indicates the ResultSubpartition/View 
consumed via {{CannelPartitionRequest}}, so we should call this method while 
handling the cancel message. For the case of channel exception/inactive, it 
does not always indicate the consumption semantic, so we should not call this 
method as current done in {{PartitionRequestQueue}}. It is up to {{JobMaster}} 
whether to release partition in the case of channel inactive/exception. For the 
streaming job if the consumer fails, the {{JobMaster}} would also cancel the 
producer task to release the whole {{ResultPartition}}. For the batch job of 
blocking partition, if the consumer TM exits to cause channel inactive, the 
{{ResultPartition}} might not need to be released.

Overall, these two methods seem to decouple the release between 
{{ResultPartition}} and {{ResultSubpartitionView}}. So it makes sense to keep 
them as now, as long as we could handle the {{CannelPartitionRequest}} message 
correctly based on above modifications.

 

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Issue Comment Deleted] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13245:
-
Comment: was deleted

(was: [~azagrebin] I have not seen your latest comments before submitting above.

I agree with your above two points. It is not consistent to remove the reader 
from `allReaders` if it is not in `availableReaders` for canceled partition. If 
we do not remove it from `allReader`, it still could be released while handling 
{{CloseRequest}} from last {{RemoteInputChannel}}. I think the above 
modifications could solve this issue based on existing mechanism.

For the second problem I also ever found it seems a bit strange to release 
partition via circle call/dependency. That means network notification --> 
ResultPartition --> ResultPartitionManager --> ResultPartition --> 
{{ResultSubpartition}}. And I also considered integrating these methods of 
`releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we 
should do this refactoring at this point in release-1.9.  I think it might be 
safe and proper to do this refactoring in next version 1.10. 

At this point we could fix the potential file leak to reuse the previous way 
and make less work.)

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885396#comment-16885396
 ] 

zhijiang edited comment on FLINK-13245 at 7/15/19 4:47 PM:
---

[~azagrebin] I have not seen your latest comments before submitting above.

I agree with your above two points. It is not consistent to remove the reader 
from `allReaders` if it is not in `availableReaders` for canceled partition. If 
we do not remove it from `allReader`, it still could be released while handling 
{{CloseRequest}} from last {{RemoteInputChannel}}. I think the above 
modifications could solve this issue based on existing mechanism.

For the second problem I also ever found it seems a bit strange to release 
partition via circle call/dependency. That means network notification --> 
ResultPartition --> ResultPartitionManager --> ResultPartition --> 
{{ResultSubpartition}}. And I also considered integrating these methods of 
`releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we 
should do this refactoring at this point in release-1.9.  I think it might be 
safe and proper to do this refactoring in next version 1.10. 

At this point we could fix the potential file leak to reuse the previous way 
and make less work.


was (Author: zjwang):
[~azagrebin] I have not seen your latest comments before submitting above.

I agree with your above two points. It is not consistent to remove the reader 
from `allReaders` if it is not in `availableReaders` for canceled partition. If 
we do not remove it from `allReader`, it still could be released while handling 
{{CloseRequest}} from last {{RemoteInputChannel}}. I think the above 
modifications could solve this issue based on existing mechanism.

For the second problem I also ever found it seems a bit strange to release 
partition via circle call/dependency. That means network notification -> 
ResultPartition -> {{ResultPartitionManager}} -> ResultPartition -> 
{{ResultSubpartition}}. And I also considered integrating these methods of 
`releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we 
should do this refactoring at this point in release-1.9.  I think it might be 
safe and proper to do this refactoring in next version 1.10. 

At this point we could fix the potential file leak to reuse the previous way 
and make less work.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885396#comment-16885396
 ] 

zhijiang edited comment on FLINK-13245 at 7/15/19 4:46 PM:
---

[~azagrebin] I have not seen your latest comments before submitting above.

I agree with your above two points. It is not consistent to remove the reader 
from `allReaders` if it is not in `availableReaders` for canceled partition. If 
we do not remove it from `allReader`, it still could be released while handling 
{{CloseRequest}} from last {{RemoteInputChannel}}. I think the above 
modifications could solve this issue based on existing mechanism.

For the second problem I also ever found it seems a bit strange to release 
partition via circle call/dependency. That means network notification -> 
ResultPartition -> {{ResultPartitionManager}} -> ResultPartition -> 
{{ResultSubpartition}}. And I also considered integrating these methods of 
`releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we 
should do this refactoring at this point in release-1.9.  I think it might be 
safe and proper to do this refactoring in next version 1.10. 

At this point we could fix the potential file leak to reuse the previous way 
and make less work.


was (Author: zjwang):
[~azagrebin] I have not seen your latest comments before submitting above.

I agree with your above two points. It is not consistent to remove the reader 
from `allReaders` if it is not in `availableReaders` for canceled partition. If 
we do not remove it from `allReader`, it still could be released while handling 
{{CloseRequest}} from last {{RemoteInputChannel}}. I think the above 
modifications could solve this issue based on existing mechanism.

For the second problem I also ever found it seems a bit strange to release 
partition via circle call/dependency. That means network notification -> 
{{ResultPartition}} -> {{ResultPartitionManager}} -> {{ResultPartition}} -> 
{{ResultSubpartition}}. And I also considered integrating these methods of 
`releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we 
should do this refactoring at this point in release-1.9.  I think it might be 
safe and proper to do this refactoring in next version 1.10. 

At this point we could fix the potential file leak to reuse the previous way 
and make less work.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885396#comment-16885396
 ] 

zhijiang edited comment on FLINK-13245 at 7/15/19 4:44 PM:
---

[~azagrebin] I have not seen your latest comments before submitting above.

I agree with your above two points. It is not consistent to remove the reader 
from `allReaders` if it is not in `availableReaders` for canceled partition. If 
we do not remove it from `allReader`, it still could be released while handling 
{{CloseRequest}} from last {{RemoteInputChannel}}. I think the above 
modifications could solve this issue based on existing mechanism.

For the second problem I also ever found it seems a bit strange to release 
partition via circle call/dependency. That means network notification -> 
{{ResultPartition}} -> {{ResultPartitionManager}} -> {{ResultPartition}} -> 
{{ResultSubpartition}}. And I also considered integrating these methods of 
`releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we 
should do this refactoring at this point in release-1.9.  I think it might be 
safe and proper to do this refactoring in next version 1.10. 

At this point we could fix the potential file leak to reuse the previous way 
and make less work.


was (Author: zjwang):
[~azagrebin] I have not seen your latest comments before submitting above.

I agree with your above two points. It is not consistent to remove the reader 
from `allReaders` if it is not in `availableReaders` for canceled partition. If 
we do not remove it from `allReader`, it still could be released while handling 
`CloseRequest` from `RemoteInputChannel`. I think the above modifications could 
solve this issue based on existing mechanism.

For the second problem I also ever found it seems a bit strange to release 
partition via circle call/dependency. That means network notification -> 
ResultPartition- > ResultPartitionManager -> ResultPartition -> 
ResultSubpartition. And I also considered integrating these methods of 
`releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we 
should do this refactoring at this point in release-1.9.  I think it might be 
safe and proper to do this refactoring in next version 1.10. 

At this point we could fix the potential file leak to reuse the previous way 
and make less work.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885396#comment-16885396
 ] 

zhijiang edited comment on FLINK-13245 at 7/15/19 4:43 PM:
---

[~azagrebin] I have not seen your latest comments before submitting above.

I agree with your above two points. It is not consistent to remove the reader 
from `allReaders` if it is not in `availableReaders` for canceled partition. If 
we do not remove it from `allReader`, it still could be released while handling 
`CloseRequest` from `RemoteInputChannel`. I think the above modifications could 
solve this issue based on existing mechanism.

For the second problem I also ever found it seems a bit strange to release 
partition via circle call/dependency. That means network notification -> 
ResultPartition- > ResultPartitionManager -> ResultPartition -> 
ResultSubpartition. And I also considered integrating these methods of 
`releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we 
should do this refactoring at this point in release-1.9.  I think it might be 
safe and proper to do this refactoring in next version 1.10. 

At this point we could fix the potential file leak to reuse the previous way 
and make less work.


was (Author: zjwang):
[~azagrebin] I have not seen your latest comments before I submitting above.

I agree with your above two points. It is not consistent to remove the reader 
from `allReaders` if it is not in `availableReaders` for canceled partition. If 
we do not remove it from `allReader`, it still could be released while handling 
`CloseRequest` from `RemoteInputChannel`. I think the above modifications could 
solve this issue based on existing mechanism.

For the second problem I also ever found it seems a bit strange to release 
partition via circle call/dependency. That means network notification -> 
ResultPartition -> ResultPartitionManager -> ResultPartition -> 
ResultSubpartition. And I also considered integrating these methods of 
`releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we 
should do this refactoring at this point in release-1.9.  I think it might be 
safe and proper to do this refactoring in next version 1.10. 

At this point we could fix the potential file leak to reuse the previous way 
and make less work.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885396#comment-16885396
 ] 

zhijiang commented on FLINK-13245:
--

[~azagrebin] I have not seen your latest comments before I submitting above.

I agree with your above two points. It is not consistent to remove the reader 
from `allReaders` if it is not in `availableReaders` for canceled partition. If 
we do not remove it from `allReader`, it still could be released while handling 
`CloseRequest` from `RemoteInputChannel`. I think the above modifications could 
solve this issue based on existing mechanism.

For the second problem I also ever found it seems a bit strange to release 
partition via circle call/dependency. That means network notification -> 
ResultPartition -> ResultPartitionManager -> ResultPartition -> 
ResultSubpartition. And I also considered integrating these methods of 
`releaseAllResources` and `notifySubpartitionConsumed`. But I am not sure we 
should do this refactoring at this point in release-1.9.  I think it might be 
safe and proper to do this refactoring in next version 1.10. 

At this point we could fix the potential file leak to reuse the previous way 
and make less work.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885371#comment-16885371
 ] 

zhijiang commented on FLINK-13245:
--

Thanks for finding this potential issue and the investigation! [~Zentol]  
[~azagrebin]

I think the above idea of above modifications makes sense, because the 
`availableReader` is not always equivalent to `allReaders`, then it is proper 
to find the canceled view reader from `allReaders` instead.

This issue also exists in previous {{SpillableSubpartition}} which actually 
uses memory type in {{SlotCountExceedingParallelismTest,}} so we could not find 
this potential bug then.

In detail, we should also call `toRelease.notifySubpartitionConsumed` before 
calling `toRelease.releaseAllResources` in above modifications. Otherwise the 
reference counter in {{ReleaseOnConsumptionResultPartition}} would not decrease 
to zero and really release partition via {{ResultPartitionManager}}.

I would submit the PR and add some unite tests later tomorrow.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885371#comment-16885371
 ] 

zhijiang edited comment on FLINK-13245 at 7/15/19 4:22 PM:
---

Thanks for finding this potential issue and the investigation! [~Zentol]  
[~azagrebin]

I think the idea of above modifications makes sense, because the 
`availableReader` is not always equivalent to `allReaders`, then it is proper 
to find the canceled view reader from `allReaders` instead.

This issue also exists in previous {{SpillableSubpartition}} which actually 
uses memory type in {{SlotCountExceedingParallelismTest,}} so we could not find 
this potential bug then.

In detail, we should also call `toRelease.notifySubpartitionConsumed` before 
calling `toRelease.releaseAllResources` in above modifications. Otherwise the 
reference counter in {{ReleaseOnConsumptionResultPartition}} would not decrease 
to zero and really release partition via {{ResultPartitionManager}}.

I would submit the PR and add some unite tests later tomorrow.


was (Author: zjwang):
Thanks for finding this potential issue and the investigation! [~Zentol]  
[~azagrebin]

I think the above idea of above modifications makes sense, because the 
`availableReader` is not always equivalent to `allReaders`, then it is proper 
to find the canceled view reader from `allReaders` instead.

This issue also exists in previous {{SpillableSubpartition}} which actually 
uses memory type in {{SlotCountExceedingParallelismTest,}} so we could not find 
this potential bug then.

In detail, we should also call `toRelease.notifySubpartitionConsumed` before 
calling `toRelease.releaseAllResources` in above modifications. Otherwise the 
reference counter in {{ReleaseOnConsumptionResultPartition}} would not decrease 
to zero and really release partition via {{ResultPartitionManager}}.

I would submit the PR and add some unite tests later tomorrow.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Issue Comment Deleted] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13245:
-
Comment: was deleted

(was: Thanks for finding this potential issue and the investigation! [~Zentol]  
[~azagrebin] 
After reviewing the relevant codes, it actually has two issues here:
 * Considering handling the {{CancelPartitionRequest}} issue, I guess we might 
have two assumptions before. One assumption is that `availableReaders` is 
always equivalent to `allReaders`, but now this assumption is not right because 
of credit. The other assumption is that we make the logic of 
{{CancelPartitionRequest}} as best-effort way, because the last 
{{RemoteInputChannel}} would send {{CloseRequest}} message, then the 
{{PartitionRequestQueue}} would always release all the view readers in 
`allReaders` in final. The details are in {{PartitionRequestQueue#close}}. If 
so, it seems no problem even though the previous {{CancelPartitionRequest}} 
does not work sometimes. But I think it would be more proper/strict if we 
handle the {{CancelPartitionRequest}} via `allReaders` instead.

 *  Another root problem is that the 
{{ReleaseOnConsumptionResultPartition#onConsumedSubpartition}} is not working 
for the implementation of {{BoundedBlockingSubpartition}}. 
{{onConsumedSubpartition}} would be triggered from consumer notification via 
network, but it is only feasible for {{RemoteInputChannel}}. For the case of 
{{LocalInputChannel}}, it would call 
{{ResultSubpartitionView#releaseAllResources}} directly.  So in the 
{{SlotCountExceedingParallelismTest}} there are some local channels which cause 
the reference counter in {{ReleaseOnConsumptionResultPartition}} would never 
decrease to 0, then it would never release the {{BoundedBlockingSubpartition}}. 
But in the process of {{BoundedBlockingSubpartition#releaseReaderReference}} it 
would check the `isReleased` tag before deleting the file. So it has the 
conflict here. In fact this issue already exists in previous 
{{SpillableSubpartition}} , but in the test the spillable subpartition would 
always use the memory way then it hides the potential problem.)

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Issue Comment Deleted] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13245:
-
Comment: was deleted

(was: For the first issue, I think it is not very important currently, because 
we also has the {{PartitionRequestQueue#close}} to release all the view readers.

For the second issue, it needs to fixed and I think we should still stick to 
the way of releasing partition in whole via {{ResultPartitionManager}}, that 
means we could not release one subpartition if the reference counter is not 
becoming zero in {{ReleaseOnConsumptionResultPartition}}. In detail:
 * In the process of {{BoundedBlockingSubpartitionReader#releaseAllResources}}, 
we should also call the {{parent.onConsumedSubpartition()}} as 
{{PipelinedSubpartitionView}} done in order to notify the 
{{ReleaseOnConsumptionResultPartition}} to decrease the reference counter.

[~Zentol] [~azagrebin] [~StephanEwen])

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885280#comment-16885280
 ] 

zhijiang commented on FLINK-13245:
--

For the first issue, I think it is not very important currently, because we 
also has the {{PartitionRequestQueue#close }}to release all the view readers.

For the second issue, it needs to fixed and I think we should still stick to 
the way of releasing partition in whole via {{ResultPartitionManager}}, that 
means we could not release one subpartition if the reference counter is not 
becoming zero in {{ReleaseOnConsumptionResultPartition}}. In detail:
 * In the process of {{BoundedBlockingSubpartitionReader#releaseAllResources}}, 
we should also call the {{parent.onConsumedSubpartition()}} as 
{{PipelinedSubpartitionView}} done in order to notify the 
{{ReleaseOnConsumptionResultPartition}} to decrease the reference counter.

[~Zentol] [~azagrebin] [~StephanEwen]

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885280#comment-16885280
 ] 

zhijiang edited comment on FLINK-13245 at 7/15/19 2:27 PM:
---

For the first issue, I think it is not very important currently, because we 
also has the {{PartitionRequestQueue#close}} to release all the view readers.

For the second issue, it needs to fixed and I think we should still stick to 
the way of releasing partition in whole via {{ResultPartitionManager}}, that 
means we could not release one subpartition if the reference counter is not 
becoming zero in {{ReleaseOnConsumptionResultPartition}}. In detail:
 * In the process of {{BoundedBlockingSubpartitionReader#releaseAllResources}}, 
we should also call the {{parent.onConsumedSubpartition()}} as 
{{PipelinedSubpartitionView}} done in order to notify the 
{{ReleaseOnConsumptionResultPartition}} to decrease the reference counter.

[~Zentol] [~azagrebin] [~StephanEwen]


was (Author: zjwang):
For the first issue, I think it is not very important currently, because we 
also has the {{PartitionRequestQueue#close }}to release all the view readers.

For the second issue, it needs to fixed and I think we should still stick to 
the way of releasing partition in whole via {{ResultPartitionManager}}, that 
means we could not release one subpartition if the reference counter is not 
becoming zero in {{ReleaseOnConsumptionResultPartition}}. In detail:
 * In the process of {{BoundedBlockingSubpartitionReader#releaseAllResources}}, 
we should also call the {{parent.onConsumedSubpartition()}} as 
{{PipelinedSubpartitionView}} done in order to notify the 
{{ReleaseOnConsumptionResultPartition}} to decrease the reference counter.

[~Zentol] [~azagrebin] [~StephanEwen]

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885251#comment-16885251
 ] 

zhijiang commented on FLINK-13245:
--

Thanks for finding this potential issue and the investigation! [~Zentol]  
[~azagrebin] 
After reviewing the relevant codes, it actually has two issues here:
 * Considering handling the {{CancelPartitionRequest}} issue, I guess we might 
have two assumptions before. One assumption is that `availableReaders` is 
always equivalent to `allReaders`, but now this assumption is not right because 
of credit. The other assumption is that we make the logic of 
{{CancelPartitionRequest}} as best-effort way, because the last 
{{RemoteInputChannel}} would send {{CloseRequest}} message, then the 
{{PartitionRequestQueue}} would always release all the view readers in 
`allReaders` in final. The details are in {{PartitionRequestQueue#close}}. If 
so, it seems no problem even though the previous {{CancelPartitionRequest}} 
does not work sometimes. But I think it would be more proper/strict if we 
handle the {{CancelPartitionRequest}} via `allReaders` instead.

 *  Another root problem is that the 
{{ReleaseOnConsumptionResultPartition#onConsumedSubpartition}} is not working 
for the implementation of {{BoundedBlockingSubpartition}}. 
{{onConsumedSubpartition}} would be triggered from consumer notification via 
network, but it is only feasible for {{RemoteInputChannel}}. For the case of 
{{LocalInputChannel}}, it would call 
{{ResultSubpartitionView#releaseAllResources}} directly.  So in the 
{{SlotCountExceedingParallelismTest}} there are some local channels which cause 
the reference counter in {{ReleaseOnConsumptionResultPartition}} would never 
decrease to 0, then it would never release the {{BoundedBlockingSubpartition}}. 
But in the process of {{BoundedBlockingSubpartition#releaseReaderReference}} it 
would check the `isReleased` tag before deleting the file. So it has the 
conflict here. In fact this issue already exists in previous 
{{SpillableSubpartition}} , but in the test the spillable subpartition would 
always use the memory way then it hides the potential problem.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Issue Comment Deleted] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13245:
-
Comment: was deleted

(was: Thanks for finding this potential issue and the investigation! [~Zentol]  
[~azagrebin] 
 After reviewing the relevant codes, it actually has two issues here:
 * Considering handling the {{CancelPartitionRequest}} issue, I guess we might 
have two assumptions before. One assumption is that `availableReaders` is 
always equivalent to `allReaders`, but now this assumption is not right because 
of credit. The other assumption is that we make the logic of 
{{CancelPartitionRequest}} as best-effort way, because the last 
{{RemoteInputChannel}} would send {{CloseRequest}} message, then the 
{{PartitionRequestQueue}} would always release all the view readers in 
`allReaders` in final. The details are in {{PartitionRequestQueue#close}}. If 
so, it seems no problem even though the previous {{CancelPartitionRequest}} 
does not work sometimes. But I think it would be more proper/strict if we 
handle the {{CancelPartitionRequest}} via `allReaders` instead.

 *  Another root problem is that the 
{{ReleaseOnConsumptionResultPartition#onConsumedSubpartition}} is not working 
for the implementation of {{BoundedBlockingSubpartition}}. 
{{onConsumedSubpartition}} would be triggered from consumer notification via 
network, but it is only feasible for {{RemoteInputChannel}}. For the case of 
{{LocalInputChannel}}, it would call 
{{ResultSubpartitionView#releaseAllResources}} directly.  So in the 
{{SlotCountExceedingParallelismTest}} there are some local channels which cause 
the reference counter in {{ReleaseOnConsumptionResultPartition}} would never 
decrease to 0, then it would never release the {{BoundedBlockingSubpartition}}. 
But in the process of {{BoundedBlockingSubpartition#releaseReaderReference}} it 
would check the `isReleased` tag before deleting the file. So it has the 
conflict here. In fact this issue already exists in previous 
{{SpillableSubpartition}} , but in the test the spillable subpartition would 
always use the memory way then it hides the potential problem.)

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885241#comment-16885241
 ] 

zhijiang edited comment on FLINK-13245 at 7/15/19 1:46 PM:
---

Thanks for finding this potential issue and the investigation! [~Zentol]  
[~azagrebin] 
 After reviewing the relevant codes, it actually has two issues here:
 * Considering handling the {{CancelPartitionRequest}} issue, I guess we might 
have two assumptions before. One assumption is that `availableReaders` is 
always equivalent to `allReaders`, but now this assumption is not right because 
of credit. The other assumption is that we make the logic of 
{{CancelPartitionRequest}} as best-effort way, because the last 
{{RemoteInputChannel}} would send {{CloseRequest}} message, then the 
{{PartitionRequestQueue}} would always release all the view readers in 
`allReaders` in final. The details are in {{PartitionRequestQueue#close}}. If 
so, it seems no problem even though the previous {{CancelPartitionRequest}} 
does not work sometimes. But I think it would be more proper/strict if we 
handle the {{CancelPartitionRequest}} via `allReaders` instead.

 *  Another root problem is that the 
{{ReleaseOnConsumptionResultPartition#onConsumedSubpartition}} is not working 
for the implementation of {{BoundedBlockingSubpartition}}. 
{{onConsumedSubpartition}} would be triggered from consumer notification via 
network, but it is only feasible for {{RemoteInputChannel}}. For the case of 
{{LocalInputChannel}}, it would call 
{{ResultSubpartitionView#releaseAllResources}} directly.  So in the 
{{SlotCountExceedingParallelismTest}} there are some local channels which cause 
the reference counter in {{ReleaseOnConsumptionResultPartition}} would never 
decrease to 0, then it would never release the {{BoundedBlockingSubpartition}}. 
But in the process of {{BoundedBlockingSubpartition#releaseReaderReference}} it 
would check the `isReleased` tag before deleting the file. So it has the 
conflict here. In fact this issue already exists in previous 
{{SpillableSubpartition}} , but in the test the spillable subpartition would 
always use the memory way then it hides the potential problem.


was (Author: zjwang):
Thanks for finding this potential issue and the investigation! [~Zentol]  
[~azagrebin] 
After reviewing the relevant codes, it actually has two issues here:
 * Considering handling the {{CancelPartitionRequest}} issue, I guess we might 
have two assumptions before. One assumption is that `availableReaders` is 
always equivalent to `allReaders`, but now this assumption is not right because 
of credit. The other assumption is that we make the logic of 
{{CancelPartitionRequest}} as best-effort way, because the last 
{{RemoteInputChannel}} would send {{CloseRequest}} message, then the 
{{PartitionRequestQueue}} would always release all the view readers in 
`allReaders` in final. The details are in {{PartitionRequestQueue#close}}. If 
so, it seems no problem even though the previous {{CancelPartitionRequest}} 
does not work sometimes. But I think it would be more proper/strict if we 
handle the {{CancelPartitionRequest}} via `allReaders` instead.

 *  Another root problem is that the 
{{ReleaseOnConsumptionResultPartition#onConsumedSubpartition}} is not working 
for the implementation of {{BoundedBlockingSubpartition}}. 
{{onConsumedSubpartition}} would be triggered from consumer notification via 
network, but it is only feasible for {{RemoteInputChannel}}. For the case of 
{{LocalInputChannel}}, it would call 
{{ResultSubpartitionView#releaseAllResources}} directly.  So in the 
{{SlotCountExceedingParallelismTest}} there are some local channels which cause 
the reference counter in {{ReleaseOnConsumptionResultPartition}} would never 
decrease to 0, then it would never release the {{BoundedBlockingSubpartition}}. 
But in the process of {{BoundedBlockingSubpartition#releaseReaderReference}} it 
would check the `isReleased` tag before deleting the file. So it has the 
conflict here. In fact, this issue already exists in previous 
\{{SpillableSubpartition}} , but in the test the spoilable subpartition would 
always use the memory way then it hides the potential problem.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've 

[jira] [Commented] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885241#comment-16885241
 ] 

zhijiang commented on FLINK-13245:
--

Thanks for finding this potential issue and the investigation! [~Zentol]  
[~azagrebin] 
After reviewing the relevant codes, it actually has two issues here:
 * Considering handling the {{CancelPartitionRequest}} issue, I guess we might 
have two assumptions before. One assumption is that `availableReaders` is 
always equivalent to `allReaders`, but now this assumption is not right because 
of credit. The other assumption is that we make the logic of 
{{CancelPartitionRequest}} as best-effort way, because the last 
{{RemoteInputChannel}} would send {{CloseRequest}} message, then the 
{{PartitionRequestQueue}} would always release all the view readers in 
`allReaders` in final. The details are in {{PartitionRequestQueue#close}}. If 
so, it seems no problem even though the previous {{CancelPartitionRequest}} 
does not work sometimes. But I think it would be more proper/strict if we 
handle the {{CancelPartitionRequest}} via `allReaders` instead.

 *  Another root problem is that the 
{{ReleaseOnConsumptionResultPartition#onConsumedSubpartition}} is not working 
for the implementation of {{BoundedBlockingSubpartition}}. 
{{onConsumedSubpartition}} would be triggered from consumer notification via 
network, but it is only feasible for {{RemoteInputChannel}}. For the case of 
{{LocalInputChannel}}, it would call 
{{ResultSubpartitionView#releaseAllResources}} directly.  So in the 
{{SlotCountExceedingParallelismTest}} there are some local channels which cause 
the reference counter in {{ReleaseOnConsumptionResultPartition}} would never 
decrease to 0, then it would never release the {{BoundedBlockingSubpartition}}. 
But in the process of {{BoundedBlockingSubpartition#releaseReaderReference}} it 
would check the `isReleased` tag before deleting the file. So it has the 
conflict here. In fact, this issue already exists in previous 
\{{SpillableSubpartition}} , but in the test the spoilable subpartition would 
always use the memory way then it hides the potential problem.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Issue Comment Deleted] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13245:
-
Comment: was deleted

(was: Thanks for finding this potential issue and the investigation! [~Zentol] 
[~azagrebin]

After reviewing the relevant codes, it actually has two issues here:
 * Considering handling the `CancelPartitionRequest` issue, I guess we might 
have two assumptions before. One assumption is that `availableReaders` is 
always equivalent to `allReaders`, but now this assumption is not right because 
of credit. The other assumption is that we make the logic of 
`CancelPartitionRequest` as best-effort way, because the last 
`RemoteInputChannel` would send `CloseRequest` message, then the 
`PartitionRequestQueue` would always release all the view readers in 
`allReaders` in final. The details are in `PartitionRequestQueue#close`. If so, 
it seems no problem even though the previous `CancelPartitionRequest` does not 
work. But I think it would be more proper/strict if we handle the ` 
CancelPartitionRequest` via `allReaders` instead.

 * Another critical problem is that the notification from consumer side of 
releasing subpartition is not actually making sense for the implementation of 
`BoundedBlockingSubpartition`, because it would check `isReleased` tag during 
`releaseReaderReference`, and this tag could only be changed from producer 
call. In other words, for blocking case when to release partition is determined 
by JobMaster based on PartitionReleaseStrategy. So even though we solve the 
above first issue, the file for blocking partition still could not be deleted 
after release notification from network.

In addition, this issue is actually hidden in previous `SpillableSubpartition` 
because in this test the partition is only using the memory type not spilled 
file.)

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885076#comment-16885076
 ] 

zhijiang edited comment on FLINK-13245 at 7/15/19 10:46 AM:


Thanks for finding this potential issue and the investigation! [~Zentol] 
[~azagrebin]

After reviewing the relevant codes, it actually has two issues here:
 * Considering handling the `CancelPartitionRequest` issue, I guess we might 
have two assumptions before. One assumption is that `availableReaders` is 
always equivalent to `allReaders`, but now this assumption is not right because 
of credit. The other assumption is that we make the logic of 
`CancelPartitionRequest` as best-effort way, because the last 
`RemoteInputChannel` would send `CloseRequest` message, then the 
`PartitionRequestQueue` would always release all the view readers in 
`allReaders` in final. The details are in `PartitionRequestQueue#close`. If so, 
it seems no problem even though the previous `CancelPartitionRequest` does not 
work. But I think it would be more proper/strict if we handle the ` 
CancelPartitionRequest` via `allReaders` instead.

 * Another critical problem is that the notification from consumer side of 
releasing subpartition is not actually making sense for the implementation of 
`BoundedBlockingSubpartition`, because it would check `isReleased` tag during 
`releaseReaderReference`, and this tag could only be changed from producer 
call. In other words, for blocking case when to release partition is determined 
by JobMaster based on PartitionReleaseStrategy. So even though we solve the 
above first issue, the file for blocking partition still could not be deleted 
after release notification from network.

In addition, this issue is actually hidden in previous `SpillableSubpartition` 
because in this test the partition is only using the memory type not spilled 
file.


was (Author: zjwang):
Thanks for finding this potential issue and the investigation! [~Zentol] 
[~azagrebin]

After reviewing the relevant codes, it actually has two issues here:
 * Considering handling the `CancelPartitionRequest` issue, I guess we might 
have two assumptions before. One assumption is that `availableReaders` is 
always equivalent to `allReaders`, but now this assumption is not right because 
of credit. The other assumption is that we make the logic of 
`CancelPartitionRequest` as best-effort way, because the last 
`RemoteInputChannel` would send `CloseRequest` message, then the 
`PartitionRequestQueue` would also release all the view readers in 
`allReaders`. The details are in `PartitionRequestQueue#close`. If so, it seems 
no problem even though the previous `CancelPartitionRequest` does not work. But 
I think it would be more proper/strict if we handle the ` 
CancelPartitionRequest` via `allReaders` instead.

 * Another critical problem is that the notification from consumer side of 
releasing subpartition is not actually making sense for the implementation of 
`BoundedBlockingSubpartition`, because it would check `isReleased` tag during 
`releaseReaderReference`, and this tag could only be changed from producer 
call. In other words, for blocking case when to release partition is determined 
by JobMaster based on PartitionReleaseStrategy. So even though we solve the 
above first issue, the file for blocking partition still could not be deleted 
after release notification from network.

In addition, this issue is actually hidden in previous `SpillableSubpartition` 
because in this test the partition is only using the memory type not spilled 
file.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently 

[jira] [Commented] (FLINK-13245) Network stack is leaking files

2019-07-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885076#comment-16885076
 ] 

zhijiang commented on FLINK-13245:
--

Thanks for finding this potential issue and the investigation! [~Zentol] 
[~azagrebin]

After reviewing the relevant codes, it actually has two issues here:
 * Considering handling the `CancelPartitionRequest` issue, I guess we might 
have two assumptions before. One assumption is that `availableReaders` is 
always equivalent to `allReaders`, but now this assumption is not right because 
of credit. The other assumption is that we make the logic of 
`CancelPartitionRequest` as best-effort way, because the last 
`RemoteInputChannel` would send `CloseRequest` message, then the 
`PartitionRequestQueue` would also release all the view readers in 
`allReaders`. The details are in `PartitionRequestQueue#close`. If so, it seems 
no problem even though the previous `CancelPartitionRequest` does not work. But 
I think it would be more proper/strict if we handle the ` 
CancelPartitionRequest` via `allReaders` instead.

 * Another critical problem is that the notification from consumer side of 
releasing subpartition is not actually making sense for the implementation of 
`BoundedBlockingSubpartition`, because it would check `isReleased` tag during 
`releaseReaderReference`, and this tag could only be changed from producer 
call. In other words, for blocking case when to release partition is determined 
by JobMaster based on PartitionReleaseStrategy. So even though we solve the 
above first issue, the file for blocking partition still could not be deleted 
after release notification from network.

In addition, this issue is actually hidden in previous `SpillableSubpartition` 
because in this test the partition is only using the memory type not spilled 
file.

> Network stack is leaking files
> --
>
> Key: FLINK-13245
> URL: https://issues.apache.org/jira/browse/FLINK-13245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (FLINK-13100) Fix the bug of throwing IOException while FileBufferReader#nextBuffer

2019-07-12 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang closed FLINK-13100.

Resolution: Resolved

Fixed in 2c8ee78f714af4995f618c768661f5b638cd3025

> Fix the bug of throwing IOException while FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Blocker
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13235) Change the Netty default transport mode to auto

2019-07-11 Thread zhijiang (JIRA)
zhijiang created FLINK-13235:


 Summary: Change the Netty default transport mode to auto
 Key: FLINK-13235
 URL: https://issues.apache.org/jira/browse/FLINK-13235
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The current default config for "taskmanager.net.transport" in 
NettyShuffleEnvironmentOptions is "NIO". In order to use "EPOLL" mode which has 
better performance and is recommended when available, we could change the 
default config as "AUTO". Then the "NIO" mode is used as a fallback.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (FLINK-13141) Remove getBufferSize method from BufferPoolFactory

2019-07-11 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang closed FLINK-13141.

Resolution: Resolved

Fixed in  b6563e385263beb556ce19855ca6dfdbb7f6c853

> Remove getBufferSize method from BufferPoolFactory
> --
>
> Key: FLINK-13141
> URL: https://issues.apache.org/jira/browse/FLINK-13141
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This is just a refactor work to make the interfacer of BufferPoolFactory more 
> simple and clean.
> BufferPoolFactory#getBufferSize is only used for creating subpartitions in 
> ResultPartitionFactory. We could pass the network buffer size from 
> NettyShuffleEnvironmentConfiguration while constructing the 
> ResultPartitionFactory, then the interface method getBufferSize could be 
> removed form BufferPoolFactory.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13100) Fix the bug of throwing IOException while FileBufferReader#nextBuffer

2019-07-10 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13100:
-
Summary: Fix the bug of throwing IOException while 
FileBufferReader#nextBuffer  (was: Fix the bug of throwing IOException during 
FileBufferReader#nextBuffer)

> Fix the bug of throwing IOException while FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Blocker
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13100) Fix the bug of throwing IOException during FileBufferReader#nextBuffer

2019-07-10 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13100:
-
Summary: Fix the bug of throwing IOException during 
FileBufferReader#nextBuffer  (was: Fix the unexpected IOException during 
FileBufferReader#nextBuffer)

> Fix the bug of throwing IOException during FileBufferReader#nextBuffer
> --
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Blocker
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-09 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang reassigned FLINK-13100:


Assignee: zhijiang

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-09 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881029#comment-16881029
 ] 

zhijiang commented on FLINK-13100:
--

Yes, the ITCase is also necessary and I think it should already have IT cases 
for blocking partitions before. The key problem is we have two different 
implementations for blocking partitions which would be used in real practice. 
But the selection is based on whether it is 32-bit or 64-bit system 
automatically, then the way for 32-bit might never be touched when executing IT 
cases before. So I think we might need to support the file-file way 
configurable in previous IT cases.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-09 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881003#comment-16881003
 ] 

zhijiang commented on FLINK-13100:
--

[~pnowojski] the previous deadlock was ever happened on our side and we fixed 
it internally in Alibaba. Considering the new BoundedBlockingSubpartition only 
focusing on mmap way at the first version, so we were not caring about this 
issue then. Now the 32-bit system could touch the way of file-file, then it is 
very easy to trigger this problem now when the flush is pending as I described 
above.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-09 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880999#comment-16880999
 ] 

zhijiang commented on FLINK-13100:
--

In netty internal implementation of writeAndFlush(), the write operation could 
always be done but the flink buffer is not released yet. The flush operation 
could only be done after write operation when the channel interest ops include 
SelectionKey.OP_WRITE, that means the socket cache has enough space to hold the 
flushed message, otherwise the flush operation would be done with following 
writeAndFlush().

In summary, the channel future of writeAndFlush() done only indicates the write 
done, the flush is not always done. But the flink buffer is only recycled after 
flush done based on zero-copy improvement in netty stack. So it would cause 
this issue.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Fix the calculation of backlog in PipelinedSubpartition

2019-07-08 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880499#comment-16880499
 ] 

zhijiang commented on FLINK-11082:
--

Yes, I also thought it was not an issue.

> Fix the calculation of backlog in PipelinedSubpartition
> ---
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.5.6, 1.7.1
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The backlog of subpartition should indicate how many buffers are consumable, 
> then the consumer could feedback the corresponding credits for transporting 
> these buffers. But in current PipelinedSubpartitionimplementation, the 
> backlog is increased by 1 when a BufferConsumer is added into 
> PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed 
> from PipelinedSubpartition. So the backlog only reflects how many buffers are 
> retained in PipelinedSubpartition, which is not always equivalent to the 
> number of consumable buffers.
> The backlog inconsistency might result in floating buffers misdistribution on 
> consumer side, because the consumer would request floating buffers based on 
> backlog value, then one floating buffer might not be used in 
> RemoteInputChannel long time after requesting.
> Considering the solution, the last buffer in PipelinedSubpartition could only 
> be consumable in the case of flush triggered or partition finished. So we 
> could calculate the backlog precisely based on partition flushed/finished 
> conditions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-08 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880465#comment-16880465
 ] 

zhijiang edited comment on FLINK-13100 at 7/8/19 3:30 PM:
--

Yes, the previous SpilledSubpartitionView really has the deadlock issue based 
on two buffers. And now we have the similar issue as before, but throw the 
IOException instead.

Your understanding of above two parts is right. But there exists another case:
 * When the view is created, it would be enqueued into the netty thread loop at 
first time, because it has both data and credit now as you mentioned.

 * Then a buffer is fetched from view and writeAndFlush() is called. If it is 
still available in the queue, that means it must has credit because next buffer 
is always available for reading ahead. For this case it has no problem, because 
we only wait the previous writeAndFlush() done to trigger the next one if 
current queue is not empty.

 * But if it is not available (no credit) when writeAndFlush() is called, then 
the queue is empty in netty thread loop. Before the future of writeAndFlush() 
is done, the netty thread could still process the AddCredit message which would 
make the view become available again, then it would be added into queue to 
trigger the next writeAndFlush(). That means the previous buffer is not 
recycled but the next writeAndFlush() is also triggered to cause the problem. 
The process might be like this : first writeAndFlush() pending -> 
addCredit(trigger second writeAndFlush) -> finish the first writeAndFlush() to 
recycle buffer.

 * I think it might be caused by the improvement of reusing flink buffer in 
netty stack from release-1.5. We could break writeAndFlush()  into write and 
flush two processes. In the before when the write process finishes, the flink 
buffer is copied into netty internal ByteBuffer to be recycled then, so it 
would not cause problem even though the second writeAndFlush is triggered 
before first pending done. But now the write process would still reference the 
flink buffer in netty stack until the flush is done.

I would try to mock this process in relevant unit tests to verify and I might 
submit this test tomorrow for understanding it easily.


was (Author: zjwang):
Yes, the previous SpilledSubpartitionView really has the deadlock issue based 
on two buffers. And now we have the similar issue as before, but throw the 
IOException instead.

Your understanding of above two parts is right. But there exists another case:
 * When the view is created, it would be enqueued into the netty thread loop at 
first time, because it has both data and credit now as you mentioned.

 * Then a buffer is fetched from view and writeAndFlush() is called. If it is 
still available in the queue, that means it must has credit because next buffer 
is always available for reading ahead. For this case it has no problem, because 
we only wait the previous writeAndFlush() done to trigger the next one if 
current queue is not empty.

 * But if it is not available (no credit) when writeAndFlush() is called, then 
the queue is empty in netty thread loop. Before the future of writeAndFlush() 
is done, the netty thread could still process the AddCredit message which would 
make the view become available again, then it would be added into queue to 
trigger the next writeAndFlush(). That means the previous buffer is not 
recycled but the next write is also triggered to cause the problem. The process 
might be like this : first writeAndFlush() pending -> addCredit(trigger second 
writeAndFlush) -> finish the first writeAndFlush() to recycle buffer.

 * I think it might be caused by the improvement of reusing flink buffer in 
netty stack from release-1.5. We could break writeAndFlush()  into write and 
flush two processes. In the before when the write process finishes, the flink 
buffer is copied into netty internal ByteBuffer to be recycled then, so it 
would not cause problem even though the second writeAndFlush is triggered 
before first pending done. But now the write process would still reference the 
flink buffer in netty stack until the flush is done.

I would try to mock this process in relevant unit tests to verify and I might 
submit this test tomorrow for understanding it easily.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled 

[jira] [Comment Edited] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-08 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880465#comment-16880465
 ] 

zhijiang edited comment on FLINK-13100 at 7/8/19 3:29 PM:
--

Yes, the previous SpilledSubpartitionView really has the deadlock issue based 
on two buffers. And now we have the similar issue as before, but throw the 
IOException instead.

Your understanding of above two parts is right. But there exists another case:
 * When the view is created, it would be enqueued into the netty thread loop at 
first time, because it has both data and credit now as you mentioned.

 * Then a buffer is fetched from view and writeAndFlush() is called. If it is 
still available in the queue, that means it must has credit because next buffer 
is always available for reading ahead. For this case it has no problem, because 
we only wait the previous writeAndFlush() done to trigger the next one if 
current queue is not empty.

 * But if it is not available (no credit) when writeAndFlush() is called, then 
the queue is empty in netty thread loop. Before the future of writeAndFlush() 
is done, the netty thread could still process the AddCredit message which would 
make the view become available again, then it would be added into queue to 
trigger the next writeAndFlush(). That means the previous buffer is not 
recycled but the next write is also triggered to cause the problem. The process 
might be like this : first writeAndFlush() pending -> addCredit(trigger second 
writeAndFlush) -> finish the first writeAndFlush() to recycle buffer.

 * I think it might be caused by the improvement of reusing flink buffer in 
netty stack from release-1.5. We could break writeAndFlush()  into write and 
flush two processes. In the before when the write process finishes, the flink 
buffer is copied into netty internal ByteBuffer to be recycled then, so it 
would not cause problem even though the second writeAndFlush is triggered 
before first pending done. But now the write process would still reference the 
flink buffer in netty stack until the flush is done.

I would try to mock this process in relevant unit tests to verify and I might 
submit this test tomorrow for understanding it easily.


was (Author: zjwang):
Yes, the previous SpilledSubpartitionView really has the deadlock issue based 
on two buffers. And now we have the similar issue as before, but throw the 
IOException instead.

Your understanding of above two parts is right. But there exists another case:
 * When the view is created, it would be enqueued into the netty thread loop at 
first time, because it has both data and credit now as you mentioned.
 * Then a buffer is fetched from view and writeAndFlush() is called. If it is 
still available in the queue, that means it must has credit because next buffer 
is always available for reading ahead. For this case it has no problem, because 
we only wait the previous writeAndFlush() done to trigger the next one if 
current queue is not empty.
 * But if it is not available (no credit) when writeAndFlush() is called, then 
the queue is empty in netty thread loop. Before the future of writeAndFlush() 
is done, the netty thread could still process the AddCredit message which would 
make the view become available again, then it would be added into queue to 
trigger the next writeAndFlush(). That means the previous buffer is not 
recycled but the next write is also triggered to cause the problem. The process 
might be like this : first writeAndFlush() pending -> addCredit(trigger second 
writeAndFlush) -> finish the first writeAndFlush() to recycle buffer.
 * I think it might be caused by the improvement of reusing flink buffer in 
netty stack from release-1.5. We could break writeAndFlush()  into write and 
flush two processes. In the before when the write process finishes, the flink 
buffer is copied into netty internal ByteBuffer to be recycled then, so it 
would not cause problem even though the second writeAndFlush is triggered 
before first pending done. But now the write process would still reference the 
flink buffer in netty stack until the flush is done.

I would try to mock this process in relevant unit tests to verify and I might 
submit this test tomorrow for understanding it easily.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> 

[jira] [Comment Edited] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-08 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880465#comment-16880465
 ] 

zhijiang edited comment on FLINK-13100 at 7/8/19 3:28 PM:
--

Yes, the previous SpilledSubpartitionView really has the deadlock issue based 
on two buffers. And now we have the similar issue as before, but throw the 
IOException instead.

Your understanding of above two parts is right. But there exists another case:
 * When the view is created, it would be enqueued into the netty thread loop at 
first time, because it has both data and credit now as you mentioned.
 * Then a buffer is fetched from view and writeAndFlush() is called. If it is 
still available in the queue, that means it must has credit because next buffer 
is always available for reading ahead. For this case it has no problem, because 
we only wait the previous writeAndFlush() done to trigger the next one if 
current queue is not empty.
 * But if it is not available (no credit) when writeAndFlush() is called, then 
the queue is empty in netty thread loop. Before the future of writeAndFlush() 
is done, the netty thread could still process the AddCredit message which would 
make the view become available again, then it would be added into queue to 
trigger the next writeAndFlush(). That means the previous buffer is not 
recycled but the next write is also triggered to cause the problem. The process 
might be like this : first writeAndFlush() pending -> addCredit(trigger second 
writeAndFlush) -> finish the first writeAndFlush() to recycle buffer.
 * I think it might be caused by the improvement of reusing flink buffer in 
netty stack from release-1.5. We could break writeAndFlush()  into write and 
flush two processes. In the before when the write process finishes, the flink 
buffer is copied into netty internal ByteBuffer to be recycled then, so it 
would not cause problem even though the second writeAndFlush is triggered 
before first pending done. But now the write process would still reference the 
flink buffer in netty stack until the flush is done.

I would try to mock this process in relevant unit tests to verify and I might 
submit this test tomorrow for understanding it easily.


was (Author: zjwang):
Yes, the previous SpilledSubpartitionView really has the deadlock issue based 
on two buffers. And now we have the similar issue as before, but throw the 
IOException instead.

Your understanding of above two parts is right. But there exists another case:
 * When the view is created, it would be enqueued into the netty thread loop at 
first time, because it has both data and credit now as you mentioned.
 * Then a buffer is fetched from view and writeAndFlush() is called. If it is 
still available in the queue, that means it must has credit because next buffer 
is always available for reading ahead. For this case it has no problem, because 
we only wait the previous writeAndFlush() done to trigger the next one.
 * But if it is not available (no credit) when writeAndFlush() is called, then 
the queue is empty in netty thread loop. Before the future of writeAndFlush() 
is done, the netty thread could still process the AddCredit message which would 
make the view become available again, then it would be added into queue to 
trigger the next writeAndFlush(). That means the previous buffer is not 
recycled but the next write is also triggered to cause the problem. The process 
might be like this : first writeAndFlush() pending -> addCredit(trigger second 
writeAndFlush) -> finish the first writeAndFlush() to recycle buffer.
 * I think it might be caused by the improvement of reusing flink buffer in 
netty stack from release-1.5. We could break writeAndFlush()  into write and 
flush two processes. In the before when the write process finishes, the flink 
buffer is copied into netty internal ByteBuffer to be recycled then, so it 
would not cause problem even though the second writeAndFlush is triggered 
before first pending done. But now the write process would still reference the 
flink buffer in netty stack until the flush is done.

I would try to mock this process in relevant unit tests to verify and I might 
submit this test tomorrow for understanding it easily.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an 

<    3   4   5   6   7   8   9   10   11   12   >