[jira] [Updated] (FLINK-14234) All partition consumable events should be notified to SchedulingStrategy (SchedulerNG)

2020-04-07 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14234:

Description: 
{{SchedulingStrategy}} requires partition consumable notification to make 
scheduling decisions.
According to {{SchedulingStrategy#onPartitionConsumable}} definition, all 
partition consumable events should be notified to {{SchedulingStrategy}}, 
including those from TMs (pipelined partitions consumable for data produced) 
and from within JM(blocking partitions consumable for producer finished).

In this way, the LazyFromSourcesSchedulingStrategy does not need to maintain 
the result partition status by itself. InputDependencyConstraintChecker can be 
simplified a lot in this way.

It would also simplify the input checking for pipelined region scheduling.

More details see 
[here|https://github.com/apache/flink/pull/9663#discussion_r326540913].



  was:
{{SchedulingStrategy}} requires partition consumable notification to make 
scheduling decisions.
According to {{SchedulingStrategy#onPartitionConsumable}} definition, all 
partition consumable events should be notified to {{SchedulingStrategy}}, 
including those from TMs (pipelined partitions consumable for data produced) 
and from within JM(blocking partitions consumable for producer finished).

More details see 
[here|https://github.com/apache/flink/pull/9663#discussion_r326540913].

This can be helpful for a more flexible scheduling.


> All partition consumable events should be notified to SchedulingStrategy 
> (SchedulerNG)
> --
>
> Key: FLINK-14234
> URL: https://issues.apache.org/jira/browse/FLINK-14234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.11.0
>
>
> {{SchedulingStrategy}} requires partition consumable notification to make 
> scheduling decisions.
> According to {{SchedulingStrategy#onPartitionConsumable}} definition, all 
> partition consumable events should be notified to {{SchedulingStrategy}}, 
> including those from TMs (pipelined partitions consumable for data produced) 
> and from within JM(blocking partitions consumable for producer finished).
> In this way, the LazyFromSourcesSchedulingStrategy does not need to maintain 
> the result partition status by itself. InputDependencyConstraintChecker can 
> be simplified a lot in this way.
> It would also simplify the input checking for pipelined region scheduling.
> More details see 
> [here|https://github.com/apache/flink/pull/9663#discussion_r326540913].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17046) SavepointWriterITCase failed on travis

2020-04-08 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17046:
---

 Summary: SavepointWriterITCase failed on travis
 Key: FLINK-17046
 URL: https://issues.apache.org/jira/browse/FLINK-17046
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.11.0


https://api.travis-ci.com/v3/job/316732861/log.txt

[ERROR] Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 7.67 s 
<<< FAILURE! - in org.apache.flink.state.api.SavepointWriterITCase
[ERROR] testStateBootstrapAndModification[Savepoint Writer: MemoryStateBackend 
(data in heap memory / checkpoints to JobManager) (checkpoints: 'null', 
savepoints: 'null', asynchronous: UNDEFINED, maxStateSize: 
5242880)](org.apache.flink.state.api.SavepointWriterITCase)  Time elapsed: 
1.736 s  <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.state.api.SavepointWriterITCase.bootstrapState(SavepointWriterITCase.java:147)
at 
org.apache.flink.state.api.SavepointWriterITCase.testStateBootstrapAndModification(SavepointWriterITCase.java:114)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
Caused by: java.lang.UnsupportedOperationException: This method should never be 
called
[ERROR] testStateBootstrapAndModification[Savepoint Writer: 
RocksDBStateBackend{checkpointStreamBackend=MemoryStateBackend (data in heap 
memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
asynchronous: UNDEFINED, maxStateSize: 5242880), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=UNDEFINED, numberOfTransferThreads=-1, 
writeBatchSize=-1}](org.apache.flink.state.api.SavepointWriterITCase)  Time 
elapsed: 0.486 s  <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.state.api.SavepointWriterITCase.bootstrapState(SavepointWriterITCase.java:147)
at 
org.apache.flink.state.api.SavepointWriterITCase.testStateBootstrapAndModification(SavepointWriterITCase.java:114)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
Caused by: java.lang.UnsupportedOperationException: This method should never be 
called



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-17046) SavepointWriterITCase failed on travis

2020-04-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-17046.
---
Resolution: Duplicate

> SavepointWriterITCase failed on travis
> --
>
> Key: FLINK-17046
> URL: https://issues.apache.org/jira/browse/FLINK-17046
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Zhu Zhu
>Priority: Critical
> Fix For: 1.11.0
>
>
> https://api.travis-ci.com/v3/job/316732861/log.txt
> [ERROR] Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 7.67 
> s <<< FAILURE! - in org.apache.flink.state.api.SavepointWriterITCase
> [ERROR] testStateBootstrapAndModification[Savepoint Writer: 
> MemoryStateBackend (data in heap memory / checkpoints to JobManager) 
> (checkpoints: 'null', savepoints: 'null', asynchronous: UNDEFINED, 
> maxStateSize: 5242880)](org.apache.flink.state.api.SavepointWriterITCase)  
> Time elapsed: 1.736 s  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.state.api.SavepointWriterITCase.bootstrapState(SavepointWriterITCase.java:147)
>   at 
> org.apache.flink.state.api.SavepointWriterITCase.testStateBootstrapAndModification(SavepointWriterITCase.java:114)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> Caused by: java.lang.UnsupportedOperationException: This method should never 
> be called
> [ERROR] testStateBootstrapAndModification[Savepoint Writer: 
> RocksDBStateBackend{checkpointStreamBackend=MemoryStateBackend (data in heap 
> memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: UNDEFINED, maxStateSize: 5242880), 
> localRocksDbDirectories=null, enableIncrementalCheckpointing=UNDEFINED, 
> numberOfTransferThreads=-1, 
> writeBatchSize=-1}](org.apache.flink.state.api.SavepointWriterITCase)  Time 
> elapsed: 0.486 s  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.state.api.SavepointWriterITCase.bootstrapState(SavepointWriterITCase.java:147)
>   at 
> org.apache.flink.state.api.SavepointWriterITCase.testStateBootstrapAndModification(SavepointWriterITCase.java:114)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> Caused by: java.lang.UnsupportedOperationException: This method should never 
> be called



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17046) SavepointWriterITCase failed on travis

2020-04-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17078005#comment-17078005
 ] 

Zhu Zhu commented on FLINK-17046:
-

Yes, it should be the same issue. I encountered it after rebasing onto master 
this morning.

> SavepointWriterITCase failed on travis
> --
>
> Key: FLINK-17046
> URL: https://issues.apache.org/jira/browse/FLINK-17046
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Zhu Zhu
>Priority: Critical
>
> https://api.travis-ci.com/v3/job/316732861/log.txt
> [ERROR] Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 7.67 
> s <<< FAILURE! - in org.apache.flink.state.api.SavepointWriterITCase
> [ERROR] testStateBootstrapAndModification[Savepoint Writer: 
> MemoryStateBackend (data in heap memory / checkpoints to JobManager) 
> (checkpoints: 'null', savepoints: 'null', asynchronous: UNDEFINED, 
> maxStateSize: 5242880)](org.apache.flink.state.api.SavepointWriterITCase)  
> Time elapsed: 1.736 s  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.state.api.SavepointWriterITCase.bootstrapState(SavepointWriterITCase.java:147)
>   at 
> org.apache.flink.state.api.SavepointWriterITCase.testStateBootstrapAndModification(SavepointWriterITCase.java:114)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> Caused by: java.lang.UnsupportedOperationException: This method should never 
> be called
> [ERROR] testStateBootstrapAndModification[Savepoint Writer: 
> RocksDBStateBackend{checkpointStreamBackend=MemoryStateBackend (data in heap 
> memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: UNDEFINED, maxStateSize: 5242880), 
> localRocksDbDirectories=null, enableIncrementalCheckpointing=UNDEFINED, 
> numberOfTransferThreads=-1, 
> writeBatchSize=-1}](org.apache.flink.state.api.SavepointWriterITCase)  Time 
> elapsed: 0.486 s  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.state.api.SavepointWriterITCase.bootstrapState(SavepointWriterITCase.java:147)
>   at 
> org.apache.flink.state.api.SavepointWriterITCase.testStateBootstrapAndModification(SavepointWriterITCase.java:114)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> Caused by: java.lang.UnsupportedOperationException: This method should never 
> be called



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17046) SavepointWriterITCase failed on travis

2020-04-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17078011#comment-17078011
 ] 

Zhu Zhu commented on FLINK-17046:
-

I tried the fix locally and it worked.

> SavepointWriterITCase failed on travis
> --
>
> Key: FLINK-17046
> URL: https://issues.apache.org/jira/browse/FLINK-17046
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Zhu Zhu
>Priority: Critical
>
> https://api.travis-ci.com/v3/job/316732861/log.txt
> [ERROR] Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 7.67 
> s <<< FAILURE! - in org.apache.flink.state.api.SavepointWriterITCase
> [ERROR] testStateBootstrapAndModification[Savepoint Writer: 
> MemoryStateBackend (data in heap memory / checkpoints to JobManager) 
> (checkpoints: 'null', savepoints: 'null', asynchronous: UNDEFINED, 
> maxStateSize: 5242880)](org.apache.flink.state.api.SavepointWriterITCase)  
> Time elapsed: 1.736 s  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.state.api.SavepointWriterITCase.bootstrapState(SavepointWriterITCase.java:147)
>   at 
> org.apache.flink.state.api.SavepointWriterITCase.testStateBootstrapAndModification(SavepointWriterITCase.java:114)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> Caused by: java.lang.UnsupportedOperationException: This method should never 
> be called
> [ERROR] testStateBootstrapAndModification[Savepoint Writer: 
> RocksDBStateBackend{checkpointStreamBackend=MemoryStateBackend (data in heap 
> memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: UNDEFINED, maxStateSize: 5242880), 
> localRocksDbDirectories=null, enableIncrementalCheckpointing=UNDEFINED, 
> numberOfTransferThreads=-1, 
> writeBatchSize=-1}](org.apache.flink.state.api.SavepointWriterITCase)  Time 
> elapsed: 0.486 s  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.state.api.SavepointWriterITCase.bootstrapState(SavepointWriterITCase.java:147)
>   at 
> org.apache.flink.state.api.SavepointWriterITCase.testStateBootstrapAndModification(SavepointWriterITCase.java:114)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> Caused by: java.lang.UnsupportedOperationException: This method should never 
> be called



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17047) Simplify SchedulingStrategy#onPartitionConsumable(...) parameters

2020-04-08 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17047:
---

 Summary: Simplify SchedulingStrategy#onPartitionConsumable(...) 
parameters
 Key: FLINK-17047
 URL: https://issues.apache.org/jira/browse/FLINK-17047
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.11.0


I'd propose to simplify SchedulingStrategy#onPartitionConsumable(...) 
parameters as below:
1. take IntermediateResultPartitionID instead of ResultPartitionID
ResultPartitionID is a composition of IntermediateResultPartitionID and 
ExecutionAttemptID. SchedulingStrategy is not aware of ExecutionAttemptID so 
there is no need to expose it.
2. drop the executionVertexId param. executionVertexId does not provide extra 
information. The check in LazyFromSourcesSchedulingStrategy does not make much 
sense since the executionVertexId is just retrieved by the partitionId in an 
earlier stage. It makes things more complex since a blocking result partition 
can become consumable when a vertex who is not its producer finishes.

This simplification also eases the work of FLINK-14234.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14162) Unify SchedulerOperations#allocateSlotsAndDeploy implementation for all scheduling strategies

2020-04-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-14162.
---
Resolution: Fixed

Implemented via: c2c9297f9fc45c052bfb245bbca2d91d8da28c7f

> Unify SchedulerOperations#allocateSlotsAndDeploy implementation for all 
> scheduling strategies
> -
>
> Key: FLINK-14162
> URL: https://issues.apache.org/jira/browse/FLINK-14162
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In scheduler NG, scheduling strategies invokes 
> {{SchedulerOperations#allocateSlotsAndDeploy(Collection)}}
>  to trigger scheduling of tasks.
> However, {{EagerSchedulingStrategy}} and 
> {{LazyFromSourcesSchedulingStrategy}} both invokes it by passing a batch of 
> tasks, but requires the scheduling process to be conducted in 2 different 
> ways:
>  * {{EagerSchedulingStrategy}} requires the batch of tasks to deploy after 
> all of them have acquired slots. This is essential to avoid partition update 
> RPCs in streaming job scheduling.
>  * {{LazyFromSourcesSchedulingStrategy}} requires tasks in the batch to 
> allocate slot and get deployed individually, so that it can deploy a few 
> tasks even if the slots is not enough for all tasks in the batch. This is 
> helpful for batch job scheduling.
> The scheduler then have to decide the scheduling pattern based whether the 
> scheduling strategy is a {{LazyFromSourcesSchedulingStrategy}}. This is not 
> good, as there can be more strategies in the future, and even customized 
> scheduling strategies.
> I think it's better to define the 
> {{SchedulerOperations#allocateSlotsAndDeploy(Collection)}}
>  to be that all tasks in the batch need to be assigned and deployed together, 
> like what we do for {{EagerSchedulingStrategy}}.
> All scheduling strategies need to follow this rule. If tasks should be 
> scheduled individually, the strategy should invoke {{allocateSlotsAndDeploy}} 
> multiple times, one for each task. As a result, the 
> {{LazyFromSourcesSchedulingStrategy}} needs to be adjusted for that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14234) All partition consumable events should be notified to SchedulingStrategy (SchedulerNG)

2020-04-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14234:

Description: 
{{SchedulingStrategy}} requires partition consumable notification to make 
scheduling decisions.
According to {{SchedulingStrategy#onPartitionConsumable}} definition, all 
partition consumable events should be notified to {{SchedulingStrategy}}, 
including those from TMs (pipelined partitions consumable for data produced) 
and from within JM(blocking partitions consumable for producer finished).

In this way, the LazyFromSourcesSchedulingStrategy does not need to maintain 
the result partition status by itself. InputDependencyConstraintChecker can be 
simplified a lot in this way.
Besides that, LazyFromSourcesSchedulingStrategy does not need to be aware of 
result partition types(PIPELINED/BLOCKING) in this way.

It would also simplify the input checking for pipelined region scheduling.

More details see 
[here|https://github.com/apache/flink/pull/9663#discussion_r326540913].



  was:
{{SchedulingStrategy}} requires partition consumable notification to make 
scheduling decisions.
According to {{SchedulingStrategy#onPartitionConsumable}} definition, all 
partition consumable events should be notified to {{SchedulingStrategy}}, 
including those from TMs (pipelined partitions consumable for data produced) 
and from within JM(blocking partitions consumable for producer finished).

In this way, the LazyFromSourcesSchedulingStrategy does not need to maintain 
the result partition status by itself. InputDependencyConstraintChecker can be 
simplified a lot in this way.

It would also simplify the input checking for pipelined region scheduling.

More details see 
[here|https://github.com/apache/flink/pull/9663#discussion_r326540913].




> All partition consumable events should be notified to SchedulingStrategy 
> (SchedulerNG)
> --
>
> Key: FLINK-14234
> URL: https://issues.apache.org/jira/browse/FLINK-14234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.11.0
>
>
> {{SchedulingStrategy}} requires partition consumable notification to make 
> scheduling decisions.
> According to {{SchedulingStrategy#onPartitionConsumable}} definition, all 
> partition consumable events should be notified to {{SchedulingStrategy}}, 
> including those from TMs (pipelined partitions consumable for data produced) 
> and from within JM(blocking partitions consumable for producer finished).
> In this way, the LazyFromSourcesSchedulingStrategy does not need to maintain 
> the result partition status by itself. InputDependencyConstraintChecker can 
> be simplified a lot in this way.
> Besides that, LazyFromSourcesSchedulingStrategy does not need to be aware of 
> result partition types(PIPELINED/BLOCKING) in this way.
> It would also simplify the input checking for pipelined region scheduling.
> More details see 
> [here|https://github.com/apache/flink/pull/9663#discussion_r326540913].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-17047) Simplify SchedulingStrategy#onPartitionConsumable(...) parameters

2020-04-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-17047:
---

Assignee: Zhu Zhu

> Simplify SchedulingStrategy#onPartitionConsumable(...) parameters
> -
>
> Key: FLINK-17047
> URL: https://issues.apache.org/jira/browse/FLINK-17047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> I'd propose to simplify SchedulingStrategy#onPartitionConsumable(...) 
> parameters as below:
> 1. take IntermediateResultPartitionID instead of ResultPartitionID
> ResultPartitionID is a composition of IntermediateResultPartitionID and 
> ExecutionAttemptID. SchedulingStrategy is not aware of ExecutionAttemptID so 
> there is no need to expose it.
> 2. drop the executionVertexId param. executionVertexId does not provide extra 
> information. The check in LazyFromSourcesSchedulingStrategy does not make 
> much sense since the executionVertexId is just retrieved by the partitionId 
> in an earlier stage. It makes things more complex since a blocking result 
> partition can become consumable when a vertex who is not its producer 
> finishes.
> This simplification also eases the work of FLINK-14234.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15307) Subclasses of FailoverStrategy are easily confused with implementation classes of RestartStrategy

2019-12-19 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17000660#comment-17000660
 ] 

Zhu Zhu commented on FLINK-15307:
-

Agreed that we can change the class name to avoid confusion to *developers*. 
However I would prefer the name to be 
 * RestartAllFailoverStrategy
 * RestartIndividualFailoverStrategy
 * RestartPipelinedRegionFailoverStrategy
so that they better reveals what the failover strategy does to recover from a 
failure.

Given that the strategy *class* names are transparent to users and some of the 
strategies(AdaptedPipelinedRegionFailoverStrategyNG, 
FailoverIndividualStrategy) would be removed in 1.11, I think we can do it 
later in 1.11 after we have removed the legacy failover strategies.


> Subclasses of FailoverStrategy are easily confused with implementation 
> classes of RestartStrategy
> -
>
> Key: FLINK-15307
> URL: https://issues.apache.org/jira/browse/FLINK-15307
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.9.0, 1.9.1, 1.10.0
>Reporter: andrew.D.lin
>Priority: Minor
> Attachments: image-2019-12-18-14-59-03-181.png
>
>
> Subclasses of RestartStrategy
>  * FailingRestartStrategy
>  * FailureRateRestartStrategy
>  * FixedDelayRestartStrategy
>  * InfiniteDelayRestartStrategy
> Implementation class of FailoverStrategy
>  * AdaptedRestartPipelinedRegionStrategyNG
>  * RestartAllStrategy
>  * RestartIndividualStrategy
>  * RestartPipelinedRegionStrategy
>  
> FailoverStrategy describes how the job computation recovers from task 
> failures.
> I think the following names may be easier to understand and easier to 
> distinguish:
> Implementation class of FailoverStrategy
>  * AdaptedPipelinedRegionFailoverStrategyNG
>  * FailoverAllStrategy
>  * FailoverIndividualStrategy
>  * FailoverPipelinedRegionStrategy
> FailoverStrategy is currently generated by configuration. If we change the 
> name of the implementation class, it will not affect compatibility.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15307) Subclasses of FailoverStrategy are easily confused with implementation classes of RestartStrategy

2019-12-19 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17000664#comment-17000664
 ] 

Zhu Zhu commented on FLINK-15307:
-

Sure, I can ping him here after the pre-requisites are ready.

> Subclasses of FailoverStrategy are easily confused with implementation 
> classes of RestartStrategy
> -
>
> Key: FLINK-15307
> URL: https://issues.apache.org/jira/browse/FLINK-15307
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: andrew.D.lin
>Priority: Minor
> Fix For: 1.11.0
>
> Attachments: image-2019-12-18-14-59-03-181.png
>
>
> Subclasses of RestartStrategy
>  * FailingRestartStrategy
>  * FailureRateRestartStrategy
>  * FixedDelayRestartStrategy
>  * InfiniteDelayRestartStrategy
> Implementation class of FailoverStrategy
>  * AdaptedRestartPipelinedRegionStrategyNG
>  * RestartAllStrategy
>  * RestartIndividualStrategy
>  * RestartPipelinedRegionStrategy
>  
> FailoverStrategy describes how the job computation recovers from task 
> failures.
> I think the following names may be easier to understand and easier to 
> distinguish:
> Implementation class of FailoverStrategy
>  * AdaptedPipelinedRegionFailoverStrategyNG
>  * FailoverAllStrategy
>  * FailoverIndividualStrategy
>  * FailoverPipelinedRegionStrategy
> FailoverStrategy is currently generated by configuration. If we change the 
> name of the implementation class, it will not affect compatibility.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15249) Improve PipelinedRegions calculation with Union Set

2019-12-19 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17000686#comment-17000686
 ] 

Zhu Zhu commented on FLINK-15249:
-

Hi [~nppoly], sorry but I'm still busy with some 1.10 release matters.
May get back to you later when 1.10 issues are calmed down a bit.
It would be great if we can improve the region building performance without 
introducing regression to any other cases/topologies.

> Improve PipelinedRegions calculation with Union Set
> ---
>
> Key: FLINK-15249
> URL: https://issues.apache.org/jira/browse/FLINK-15249
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Chongchen Chen
>Priority: Major
>  Labels: pull-request-available
> Attachments: PipelinedRegionComputeUtil.diff
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Union Set's Merge Set cost is O(1). current implementation is O(N). the 
> attachment is patch.
> [Disjoint Set Data 
> Structure|[https://en.wikipedia.org/wiki/Disjoint-set_data_structure]]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15320) JobManager crashes in the standalone model when cancelling job which subtask' status is scheduled

2019-12-20 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17000718#comment-17000718
 ] 

Zhu Zhu commented on FLINK-15320:
-

The PR is opened. [~trohrmann]
I checked the code and found the only possible entry points which cancel 
vertices out of {{DefaultScheduler#restartTasksWithDelay()}} (the method would 
increment vertex versions) are 3 methods in SchedulerBase: {{#failJob()}}, 
{{#cancel()}} and {{#suspend()}}.
So I added `SchedulerBase#incrementVersionsOfAllVertices()` and use it in those 
3 cases above. In this way, outdated deployments can always be identified by 
checking the vertex versions and this unexpected issue would not happen.

> JobManager crashes in the standalone model when cancelling job which subtask' 
> status is scheduled
> -
>
> Key: FLINK-15320
> URL: https://issues.apache.org/jira/browse/FLINK-15320
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: lining
>Assignee: Zhu Zhu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Use start-cluster.sh to start a standalone cluster, and then submit a job 
> from the streaming's example which name is TopSpeedWindowing, parallelism is 
> 20. Wait for one minute, cancel the job, jobmanager will crash. The exception 
> stack is:
> 2019-12-19 10:12:11,060 ERROR 
> org.apache.flink.runtime.util.FatalExitExceptionHandler       - FATAL: Thread 
> 'flink-akka.actor.default-dispatcher-2' produced an uncaught exception. 
> Stopping the process...2019-12-19 10:12:11,060 ERROR 
> org.apache.flink.runtime.util.FatalExitExceptionHandler       - FATAL: Thread 
> 'flink-akka.actor.default-dispatcher-2' produced an uncaught exception. 
> Stopping the process...java.util.concurrent.CompletionException: 
> java.util.concurrent.CompletionException: java.lang.IllegalStateException: 
> Could not assign resource 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@583585c6 to 
> current execution Attempt #0 (Window(GlobalWindows(), DeltaTrigger, 
> TimeEvictor, ComparableAggregator, PassThroughWindowFunction) -> Sink: Print 
> to Std. Out (19/20)) @ (unassigned) - [CANCELED]. at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.propagateIfNonNull(DefaultScheduler.java:387)
>  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$deployAll$4(DefaultScheduler.java:372)
>  at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:705)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:170)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.tryFulfillSlotRequestOrMakeAvailable(SlotPoolImpl.java:534)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseSingleSlot(SlotPoolImpl.java:479)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseSlot(SlotPoolImpl.java:390)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:557)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.releaseChild(SlotSharingManager.java:607)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.access$700(SlotSharingManager.java:352)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:716)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.Sche

[jira] [Commented] (FLINK-15345) CurrentExecution and priorExecutions are inconsistent in the test case

2019-12-20 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17000740#comment-17000740
 ] 

Zhu Zhu commented on FLINK-15345:
-

>> When the length of priorExecutions does not exceed the value of the 
>> configuration item jobmanager.execution.attempts-history-size, the attempt 
>> should be equal to the length of priorExecutions

Not sure if there is any logic that relies on this assumption. Is there any 
issue caused by it?

> CurrentExecution and priorExecutions are inconsistent in the test case
> --
>
> Key: FLINK-15345
> URL: https://issues.apache.org/jira/browse/FLINK-15345
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Tests
>Reporter: lining
>Priority: Minor
>
> When the length of priorExecutions does not exceed the value of the 
> configuration item jobmanager.execution.attempts-history-size, the attempt 
> should be equal to the length of priorExecutions. So [code in 
> JobExceptionsHandlerTest|https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java#L104-L121]
>  and [code in 
> SubtaskCurrentAttemptDetailsHandlerTest|https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java#L94-L111]
>  are wrong, we need to update the attempt to 0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15325) Input location preference which affects task distribution may make certain job performance worse

2019-12-20 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17000940#comment-17000940
 ] 

Zhu Zhu commented on FLINK-15325:
-

Yes, if the config is enabled, 
{{ExecutionVertex#getPreferredLocationsBasedOnInputs()}} can simply return an 
empty set to achieve the goal.

Looks to me it could be a problem only if 
1. there is a 1-to-N pattern in topology (or k-to-N where k is a small number 
<= 8 while N is much larger), and
2. number of available slots in the JM SlotPool are much more than N (currently 
this only happens in batch jobs that runs more tasks in previous stages so that 
the latter stages would see more slots than needed), and
3. loads of tasks are heavy so that performance degradation happens when too 
many tasks are running on the same machine

So yes it is not a very common case and at the moment is not a problem for 
streaming jobs. I think it is not very urgent to do this improvement.
But given that users may have no other way to solve it when this problem 
happens, and the config is easy to understand and simple to implement, I think 
it's also valid to introduce such a config.



> Input location preference which affects task distribution may make certain 
> job performance worse 
> -
>
> Key: FLINK-15325
> URL: https://issues.apache.org/jira/browse/FLINK-15325
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Attachments: D58ADB03-7187-46B1-B077-91E5005FD463.png
>
>
> When running TPC-DS jobs in a session cluster, we observed that sometimes 
> tasks are not evenly distributed in TMs. The root cause turned out to be that 
> the downstream tasks tend to be TM or host local with its input tasks. This 
> helps to reduce network shuffle. 
> However, in certain cases, like the topology presented in the attached image, 
> jamming the input task's TM and machine with downstream tasks would affect 
> the performance. In this case, respecting input location preferences is 
> causing troubles more than bringing benefits.
> So I'm wondering whether we should introduce a config so that users can 
> disable input location preferences?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15354) Start and stop minikube only in kubernetes related e2e tests

2019-12-23 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15354:
---

Assignee: Zhu Zhu

> Start and stop minikube only in kubernetes related e2e tests
> 
>
> Key: FLINK-15354
> URL: https://issues.apache.org/jira/browse/FLINK-15354
> Project: Flink
>  Issue Type: Bug
>Reporter: Yang Wang
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, we start minikube in {{nightly.sh for every e2e test}}, and it 
> will never be stopped. It is unnecessary and will occupy some resources on 
> travis. I think the minikube should only be started in the kubernetes related 
> test(\{{test_kubernetes_embedded_job.sh}}) and need to be stopped once the 
> test finished.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15354) Start and stop minikube only in kubernetes related e2e tests

2019-12-23 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15354:
---

Assignee: Yang Wang  (was: Zhu Zhu)

> Start and stop minikube only in kubernetes related e2e tests
> 
>
> Key: FLINK-15354
> URL: https://issues.apache.org/jira/browse/FLINK-15354
> Project: Flink
>  Issue Type: Bug
>Reporter: Yang Wang
>Assignee: Yang Wang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, we start minikube in {{nightly.sh for every e2e test}}, and it 
> will never be stopped. It is unnecessary and will occupy some resources on 
> travis. I think the minikube should only be started in the kubernetes related 
> test(\{{test_kubernetes_embedded_job.sh}}) and need to be stopped once the 
> test finished.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15354) Start and stop minikube only in kubernetes related e2e tests

2019-12-23 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17002590#comment-17002590
 ] 

Zhu Zhu commented on FLINK-15354:
-

Thanks [~fly_in_gis] for reporting this issue. I have assigned the ticket to 
you.

> Start and stop minikube only in kubernetes related e2e tests
> 
>
> Key: FLINK-15354
> URL: https://issues.apache.org/jira/browse/FLINK-15354
> Project: Flink
>  Issue Type: Bug
>Reporter: Yang Wang
>Assignee: Yang Wang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, we start minikube in {{nightly.sh for every e2e test}}, and it 
> will never be stopped. It is unnecessary and will occupy some resources on 
> travis. I think the minikube should only be started in the kubernetes related 
> test(\{{test_kubernetes_embedded_job.sh}}) and need to be stopped once the 
> test finished.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15364) Introduce streaming task using heap backend e2e tests for Mesos

2019-12-23 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15364:
---

Assignee: Yangze Guo

> Introduce streaming task using heap backend e2e tests for Mesos
> ---
>
> Key: FLINK-15364
> URL: https://issues.apache.org/jira/browse/FLINK-15364
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Yangze Guo
>Assignee: Yangze Guo
>Priority: Major
> Fix For: 1.11.0
>
>
> As discussed, we need to test streaming task using heap backend. It should 
> explicitly set the
> “taskmanager.memory.managed.size” to zero to check the potential
> unexpected usage of off-heap memory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15367) Handle backwards compatibility of "taskmanager.heap.size" differently for standalone / active setups

2019-12-23 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15367:
---

Assignee: Xintong Song

> Handle backwards compatibility of "taskmanager.heap.size" differently for 
> standalone / active setups
> 
>
> Key: FLINK-15367
> URL: https://issues.apache.org/jira/browse/FLINK-15367
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Blocker
> Fix For: 1.10.0
>
>
> Previously, "taskmanager.heap.size" were used differently for calculating TM 
> memory sizes on standalone / active setups. To fully align with the previous 
> behaviors, we need to map this deprecated key to 
> "taskmanager.memory.flink.size" for standalone setups and 
> "taskmanager.memory.process.size" for active setups.
> Detailed discussion can be found in this [ML 
> thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-feedback-after-trying-out-the-new-FLIP-49-memory-configurations-td36129.html].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15365) Introduce streaming task using rocksDB backend e2e tests for Mesos

2019-12-23 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15365:
---

Assignee: Yangze Guo

> Introduce streaming task using rocksDB backend e2e tests for Mesos
> --
>
> Key: FLINK-15365
> URL: https://issues.apache.org/jira/browse/FLINK-15365
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Yangze Guo
>Assignee: Yangze Guo
>Priority: Major
> Fix For: 1.11.0
>
>
> As discussed, we need to test streaming task using rocks backend. It covers 
> the scenario using
> off-heap memory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15369) MiniCluster use fixed network / managed memory sizes by defualt

2019-12-23 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15369:
---

Assignee: Xintong Song

> MiniCluster use fixed network / managed memory sizes by defualt
> ---
>
> Key: FLINK-15369
> URL: https://issues.apache.org/jira/browse/FLINK-15369
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Blocker
> Fix For: 1.10.0
>
>
> Currently, Mini Cluster may allocate off-heap memory (managed & network) 
> according to the JVM free heap size and configured off-heap fractions. This 
> could lead to unnecessary large off-heap memory usage and unpredictable / 
> hard-to-understand behaviors.
> We believe a fix value for managed / network memory would be enough for a 
> such a setup that runs Flink as a library.
> Detailed discussion can be found in this [ML 
> thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-feedback-after-trying-out-the-new-FLIP-49-memory-configurations-td36129.html].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15373) Update descriptions for framework / task off-heap memory config options

2019-12-23 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15373:
---

Assignee: Xintong Song

> Update descriptions for framework / task off-heap memory config options
> ---
>
> Key: FLINK-15373
> URL: https://issues.apache.org/jira/browse/FLINK-15373
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Blocker
> Fix For: 1.10.0
>
>
> Update descriptions for "taskmanager.memory.framework.off-heap.size" and 
> "taskmanager.memory.task.off-heap.size" to explicitly state that:
> * Both direct and native memory are accounted
> * Will be fully counted into MaxDirectMemorySize
> Detailed discussion can be found in this [ML 
> thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-feedback-after-trying-out-the-new-FLIP-49-memory-configurations-td36129.html].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15372) Use shorter config keys for FLIP-49 total memory config options

2019-12-23 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15372:
---

Assignee: Xintong Song

> Use shorter config keys for FLIP-49 total memory config options
> ---
>
> Key: FLINK-15372
> URL: https://issues.apache.org/jira/browse/FLINK-15372
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Blocker
> Fix For: 1.10.0
>
>
> We propose to use shorter keys for total flink / process memory config 
> options, to make it less clumsy without loss of expressiveness.
> To be specific, we propose to:
> * Change the config option key "taskmanager.memory.total-flink.size" to 
> "taskmanager.memory.flink.size"
> * Change the config option key "taskmanager.memory.total-process.size" to 
> "taskmanager.memory.process.size"
> Detailed discussion can be found in this [ML 
> thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-feedback-after-trying-out-the-new-FLIP-49-memory-configurations-td36129.html].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15371) Change FLIP-49 memory configurations to use the new memory type config options

2019-12-23 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15371:
---

Assignee: Xintong Song

> Change FLIP-49 memory configurations to use the new memory type config options
> --
>
> Key: FLINK-15371
> URL: https://issues.apache.org/jira/browse/FLINK-15371
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Blocker
> Fix For: 1.10.0
>
>
> FLIP-49 memory configurations can leverage the new strong typed ConfigOption, 
> to make validation automatic and save from breaking the options later.
> Detailed discussion can be found in this [ML 
> thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-feedback-after-trying-out-the-new-FLIP-49-memory-configurations-td36129.html].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15374) Update descriptions for jvm overhead config options

2019-12-23 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15374:
---

Assignee: Xintong Song

> Update descriptions for jvm overhead config options
> ---
>
> Key: FLINK-15374
> URL: https://issues.apache.org/jira/browse/FLINK-15374
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Blocker
> Fix For: 1.10.0
>
>
> Update descriptions for "taskmanager.memory.jvm-overhead.[min|max|fraction]" 
> to remove "I/O direct memory" and explicitly state that it's not counted into 
> MaxDirectMemorySize.
> Detailed discussion can be found in this [ML 
> thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-feedback-after-trying-out-the-new-FLIP-49-memory-configurations-td36129.html].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15375) Improve MemorySize to print / parse with better readability.

2019-12-23 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15375:
---

Assignee: Xintong Song

> Improve MemorySize to print / parse with better readability.
> 
>
> Key: FLINK-15375
> URL: https://issues.apache.org/jira/browse/FLINK-15375
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Critical
> Fix For: 1.10.0
>
>
> * Print MemorySize with proper unit rather than tremendous number of bytes.
> * Parse memory size in numbers instead of {{parse(xxx + "m")}}
> Detailed discussion can be found in this [ML 
> thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-feedback-after-trying-out-the-new-FLIP-49-memory-configurations-td36129.html].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-13662) FlinkKinesisProducerTest.testBackpressure failed on Travis

2019-12-23 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-13662.
---
Resolution: Fixed

Fixed via

master:
a62641a0918aaedbac6312293cf8826e4d11f300
20041cafbfe500ed386e11da5d09f116e7a45b81

1.10.0:
a57da33111ba6f9155fef0dde8635ae54e641507
b9b422e2f282e374aba6f207ffbe280bd5f91c9e

> FlinkKinesisProducerTest.testBackpressure failed on Travis
> --
>
> Key: FLINK-13662
> URL: https://issues.apache.org/jira/browse/FLINK-13662
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis, Tests
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The {{FlinkKinesisProducerTest.testBackpressure}} failed on Travis with
> {code}
> 14:45:50.489 [ERROR] Failures: 
> 14:45:50.489 [ERROR]   FlinkKinesisProducerTest.testBackpressure:298 Flush 
> triggered before reaching queue limit
> {code}
> https://api.travis-ci.org/v3/job/569262823/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15377) Mesos WordCount test fails on travis

2019-12-24 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15377:
---

Assignee: Yangze Guo

> Mesos WordCount test fails on travis
> 
>
> Key: FLINK-15377
> URL: https://issues.apache.org/jira/browse/FLINK-15377
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Mesos
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Assignee: Yangze Guo
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The "Run Mesos WordCount test" fails nightly run on travis with below error:
> {code}
> rm: cannot remove 
> '/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/test-data/log/mesos-sl/mesos-slave.INFO':
>  Permission denied
> rm: cannot remove 
> '/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/test-data/log/mesos-sl/mesos-fetcher.INFO':
>  Permission denied
> rm: cannot remove 
> '/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/test-data/log/mesos-sl/mesos-slave.4a4fda410c57.invalid-user.log.INFO.20191224-031307.1':
>  Permission denied
> ...
> [FAIL] 'Run Mesos WordCount test' failed after 5 minutes and 26 seconds! Test 
> exited with exit code 0 but the logs contained errors, exceptions or 
> non-empty .out files
> {code}
> https://api.travis-ci.org/v3/job/628795106/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-11524) LeaderChangeClusterComponentsTest.testReelectionOfJobMaster failed on Travis

2019-12-24 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-11524.
---
Resolution: Later

#10553 is merged to make the error more descriptive for trouble shooting.
master:
5e5cdd84eb802e193e37f5a27bf439112401c8f5
985787abcf917e6ea0aa0df3e24806015b3ddbad

release-1.10:
a22bcb7e28715e462b870692e6830619fb29cbe8
f78b781d999e19e7adfc8c14485a42b10a1db820

Close this issue for now and let's re-open it if it happens again.

> LeaderChangeClusterComponentsTest.testReelectionOfJobMaster failed on Travis
> 
>
> Key: FLINK-11524
> URL: https://issues.apache.org/jira/browse/FLINK-11524
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.8.0, 1.10.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> The {{LeaderChangeClusterComponentsTest.testReelectionOfJobMaster}} failed on 
> Travis: https://api.travis-ci.org/v3/job/488578456/log.txt
> It looks as if the {{JobMaster}} cannot reconnect to the {{ResourceManager}}. 
> Maybe this is caused by a wrong interleaving of the revoke and grant 
> leadership calls to the {{TestingEmbeddedHaServices}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15377) Mesos WordCount test fails on travis

2019-12-24 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17003085#comment-17003085
 ] 

Zhu Zhu commented on FLINK-15377:
-

Fixed via

master:
3036dd97eef4d1451baf6984885ad3784333a617

1.10.0:
d3eb451c2551ffa928bb04c8445c0967af69da60

> Mesos WordCount test fails on travis
> 
>
> Key: FLINK-15377
> URL: https://issues.apache.org/jira/browse/FLINK-15377
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Mesos
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Assignee: Yangze Guo
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The "Run Mesos WordCount test" fails nightly run on travis with below error:
> {code}
> rm: cannot remove 
> '/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/test-data/log/mesos-sl/mesos-slave.INFO':
>  Permission denied
> rm: cannot remove 
> '/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/test-data/log/mesos-sl/mesos-fetcher.INFO':
>  Permission denied
> rm: cannot remove 
> '/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/test-data/log/mesos-sl/mesos-slave.4a4fda410c57.invalid-user.log.INFO.20191224-031307.1':
>  Permission denied
> ...
> [FAIL] 'Run Mesos WordCount test' failed after 5 minutes and 26 seconds! Test 
> exited with exit code 0 but the logs contained errors, exceptions or 
> non-empty .out files
> {code}
> https://api.travis-ci.org/v3/job/628795106/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15377) Mesos WordCount test fails on travis

2019-12-24 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-15377.
---
Resolution: Fixed

> Mesos WordCount test fails on travis
> 
>
> Key: FLINK-15377
> URL: https://issues.apache.org/jira/browse/FLINK-15377
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Mesos
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Assignee: Yangze Guo
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The "Run Mesos WordCount test" fails nightly run on travis with below error:
> {code}
> rm: cannot remove 
> '/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/test-data/log/mesos-sl/mesos-slave.INFO':
>  Permission denied
> rm: cannot remove 
> '/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/test-data/log/mesos-sl/mesos-fetcher.INFO':
>  Permission denied
> rm: cannot remove 
> '/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/test-data/log/mesos-sl/mesos-slave.4a4fda410c57.invalid-user.log.INFO.20191224-031307.1':
>  Permission denied
> ...
> [FAIL] 'Run Mesos WordCount test' failed after 5 minutes and 26 seconds! Test 
> exited with exit code 0 but the logs contained errors, exceptions or 
> non-empty .out files
> {code}
> https://api.travis-ci.org/v3/job/628795106/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15374) Update descriptions for jvm overhead config options

2019-12-24 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-15374.
---
Resolution: Fixed

Fixed via

master:
e9c8282becb947ec230893b3e212153118ffc7b1

release-1.10:
d9d75d8b0f561338e71774fb9c51ca96536d7834

> Update descriptions for jvm overhead config options
> ---
>
> Key: FLINK-15374
> URL: https://issues.apache.org/jira/browse/FLINK-15374
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Update descriptions for "taskmanager.memory.jvm-overhead.[min|max|fraction]" 
> to remove "I/O direct memory" and explicitly state that it's not counted into 
> MaxDirectMemorySize.
> Detailed discussion can be found in this [ML 
> thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-feedback-after-trying-out-the-new-FLIP-49-memory-configurations-td36129.html].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15373) Update descriptions for framework / task off-heap memory config options

2019-12-25 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-15373.
---
Resolution: Fixed

Fixed via:

master:
768be4f9bc31bcd38053c896af92f4af7a212144
66da8eb710b2700f46747b7775400ead1ca0ceae

release-1.10:
55760b1f90e72fb6a35a464ee0824bba4d388e8a
807bbe8210ba1ce4f3004c67cb6bb78853e97dde

> Update descriptions for framework / task off-heap memory config options
> ---
>
> Key: FLINK-15373
> URL: https://issues.apache.org/jira/browse/FLINK-15373
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Update descriptions for "taskmanager.memory.framework.off-heap.size" and 
> "taskmanager.memory.task.off-heap.size" to explicitly state that:
> * Both direct and native memory are accounted
> * Will be fully counted into MaxDirectMemorySize
> Detailed discussion can be found in this [ML 
> thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-feedback-after-trying-out-the-new-FLIP-49-memory-configurations-td36129.html].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15372) Use shorter config keys for FLIP-49 total memory config options

2019-12-25 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-15372.
---
Resolution: Fixed

Fixed via:

master:
82619f260af0b24ff8244a14f8fc84816edae083
0ba4a2b4cc2b48886d5a7948d631ea7da0068a0e

release-1.10:
ac1661cc4c7abe70b99543cfb6c275ca8effb184
c69110d0341987d7895ec648332543535f4ad906

> Use shorter config keys for FLIP-49 total memory config options
> ---
>
> Key: FLINK-15372
> URL: https://issues.apache.org/jira/browse/FLINK-15372
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> We propose to use shorter keys for total flink / process memory config 
> options, to make it less clumsy without loss of expressiveness.
> To be specific, we propose to:
> * Change the config option key "taskmanager.memory.total-flink.size" to 
> "taskmanager.memory.flink.size"
> * Change the config option key "taskmanager.memory.total-process.size" to 
> "taskmanager.memory.process.size"
> Detailed discussion can be found in this [ML 
> thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-feedback-after-trying-out-the-new-FLIP-49-memory-configurations-td36129.html].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15394) in yarn mode, jobmanager fails to start in case of rest.port conflict.

2019-12-25 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17003426#comment-17003426
 ] 

Zhu Zhu commented on FLINK-15394:
-

Hi [~watters_fish], I think it is not a bug. If a user has configured the port 
for the jobmanager and the jobmanager fails to take that port, it is expected 
to fail the job rather than let it take another port.
You can specify a port range with config "rest.bind-port" so that jobmanagers 
on the same machine would not conflict for a single configured rest port.

> in yarn mode, jobmanager fails to start in case of rest.port conflict.
> --
>
> Key: FLINK-15394
> URL: https://issues.apache.org/jira/browse/FLINK-15394
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.9.0
> Environment: a small yarn cluster with three node.
>Reporter: watters.wang
>Priority: Major
>
> In flink 1.9, if rest.port is set in flink.yaml,  jobmanager will start 
> failed in case of rest.port conflict in small cluster. 
> in 1.7, that's ok .
>  
> {code:java}
> //代码占位符
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:182)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)
>   at 
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
> Caused by: org.apache.flink.util.FlinkException: Could not create the 
> DispatcherResourceManagerComponent.
>   at 
> org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:257)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:210)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
>   ... 2 more
> Caused by: java.net.BindException: Could not start rest endpoint on any port 
> in port range 8081
>   at 
> org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:219)
>   at 
> org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:161)
>   ... 9 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14163) Execution#producedPartitions is possibly not assigned when deploying tasks with DefaultScheduler

2019-12-26 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14163:

Summary: Execution#producedPartitions is possibly not assigned when 
deploying tasks with DefaultScheduler  (was: Execution#producedPartitions is 
possibly not assigned when used)

> Execution#producedPartitions is possibly not assigned when deploying tasks 
> with DefaultScheduler
> 
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> But the task deployment process (in {{Execution#deploy())}} will create 
> {{ResultPartitionDeploymentDescriptor}} directly from 
> {{Execution#producedPartitions}} without checking whether it's assigned.
> This may lead to a task deployed without its result partitions. And 
> eventually cause the job to hang.
> It is not problematic at the moment when using Flink default shuffle master 
> {{NettyShuffleMaster}} since it returns a completed future on registration. 
> However, if the behavior is changed or if users are using a customized 
> {{ShuffleMaster}}, it may cause problems.
> Besides that, {{Execution#producedPartitions}} is also used for 
>  * generating downstream task input descriptor
>  * retrieve {{ResultPartitionID}} for partition releasing
> To avoid issues to happen, we may need to change all the usages of 
> {{Execution#producedPartitions}} to a callback way, e.g. change 
> {{Execution#producedPartitions}} from {{Map ResultPartitionDeploymentDescriptor>}} to 
> {{CompletableFuture ResultPartitionDeploymentDescriptor>>}} and adjust all its usages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14163) Execution#producedPartitions is possibly not assigned when used

2019-12-26 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14163:

Summary: Execution#producedPartitions is possibly not assigned when used  
(was: Execution#producedPartitions is possibly not assigned when deploying 
tasks with DefaultScheduler)

> Execution#producedPartitions is possibly not assigned when used
> ---
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> But the task deployment process (in {{Execution#deploy())}} will create 
> {{ResultPartitionDeploymentDescriptor}} directly from 
> {{Execution#producedPartitions}} without checking whether it's assigned.
> This may lead to a task deployed without its result partitions. And 
> eventually cause the job to hang.
> It is not problematic at the moment when using Flink default shuffle master 
> {{NettyShuffleMaster}} since it returns a completed future on registration. 
> However, if the behavior is changed or if users are using a customized 
> {{ShuffleMaster}}, it may cause problems.
> Besides that, {{Execution#producedPartitions}} is also used for 
>  * generating downstream task input descriptor
>  * retrieve {{ResultPartitionID}} for partition releasing
> To avoid issues to happen, we may need to change all the usages of 
> {{Execution#producedPartitions}} to a callback way, e.g. change 
> {{Execution#producedPartitions}} from {{Map ResultPartitionDeploymentDescriptor>}} to 
> {{CompletableFuture ResultPartitionDeploymentDescriptor>>}} and adjust all its usages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15427) State TTL RocksDb backend end-to-end test stalls on travis

2019-12-27 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15427:
---

Assignee: Congxian Qiu(klion26)

> State TTL RocksDb backend end-to-end test stalls on travis
> --
>
> Key: FLINK-15427
> URL: https://issues.apache.org/jira/browse/FLINK-15427
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Assignee: Congxian Qiu(klion26)
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The 'State TTL RocksDb backend end-to-end test' case stalls and finally 
> timedout with error message:
> {noformat}
> The job exceeded the maximum log length, and has been terminated.
> {noformat}
> https://api.travis-ci.org/v3/job/629699416/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15456) Job keeps failing on slot allocation timeout due to RM not allocating new TMs for slot requests

2020-01-02 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-15456:
---

 Summary: Job keeps failing on slot allocation timeout due to RM 
not allocating new TMs for slot requests
 Key: FLINK-15456
 URL: https://issues.apache.org/jira/browse/FLINK-15456
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0
 Attachments: jm_part.log

As in the attached JM log, the job tried to start 30 TMs but only 29 are 
registered. So the job fails due to not able to acquire all 30 slots needed in 
time.
And when the failover happens and tasks are re-scheduled, the RM will not ask 
for new TMs even if it cannot fulfill the slot requests. So the job will keep 
failing for slot allocation timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15456) Job keeps failing on slot allocation timeout due to RM not allocating new TMs for slot requests

2020-01-02 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006696#comment-17006696
 ] 

Zhu Zhu commented on FLINK-15456:
-

This issue looks like the case described in FLINK-13554. 
[~xintongsong] do you have idea how can to solve it without must risk?

cc: [~trohrmann]

> Job keeps failing on slot allocation timeout due to RM not allocating new TMs 
> for slot requests
> ---
>
> Key: FLINK-15456
> URL: https://issues.apache.org/jira/browse/FLINK-15456
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Blocker
> Fix For: 1.10.0
>
> Attachments: jm_part.log
>
>
> As in the attached JM log, the job tried to start 30 TMs but only 29 are 
> registered. So the job fails due to not able to acquire all 30 slots needed 
> in time.
> And when the failover happens and tasks are re-scheduled, the RM will not ask 
> for new TMs even if it cannot fulfill the slot requests. So the job will keep 
> failing for slot allocation timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15456) Job keeps failing on slot allocation timeout due to RM not allocating new TMs for slot requests

2020-01-02 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006696#comment-17006696
 ] 

Zhu Zhu edited comment on FLINK-15456 at 1/2/20 9:41 AM:
-

[~xintongsong] Yes, it looks like the case described in FLINK-13554. 
Do you have idea how can to solve it without must risk?
I will also try to repro the issue with DEBUG logs.

cc: [~trohrmann]


was (Author: zhuzh):
This issue looks like the case described in FLINK-13554. 
[~xintongsong] do you have idea how can to solve it without must risk?

cc: [~trohrmann]

> Job keeps failing on slot allocation timeout due to RM not allocating new TMs 
> for slot requests
> ---
>
> Key: FLINK-15456
> URL: https://issues.apache.org/jira/browse/FLINK-15456
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Blocker
> Fix For: 1.10.0
>
> Attachments: jm_part.log
>
>
> As in the attached JM log, the job tried to start 30 TMs but only 29 are 
> registered. So the job fails due to not able to acquire all 30 slots needed 
> in time.
> And when the failover happens and tasks are re-scheduled, the RM will not ask 
> for new TMs even if it cannot fulfill the slot requests. So the job will keep 
> failing for slot allocation timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15456) Job keeps failing on slot allocation timeout due to RM not allocating new TMs for slot requests

2020-01-02 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-15456:

Attachment: jm_part2.log

> Job keeps failing on slot allocation timeout due to RM not allocating new TMs 
> for slot requests
> ---
>
> Key: FLINK-15456
> URL: https://issues.apache.org/jira/browse/FLINK-15456
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Blocker
> Fix For: 1.10.0
>
> Attachments: jm_part.log, jm_part2.log
>
>
> As in the attached JM log, the job tried to start 30 TMs but only 29 are 
> registered. So the job fails due to not able to acquire all 30 slots needed 
> in time.
> And when the failover happens and tasks are re-scheduled, the RM will not ask 
> for new TMs even if it cannot fulfill the slot requests. So the job will keep 
> failing for slot allocation timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15456) Job keeps failing on slot allocation timeout due to RM not allocating new TMs for slot requests

2020-01-02 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006719#comment-17006719
 ] 

Zhu Zhu commented on FLINK-15456:
-

Thanks for the explanation [~xintongsong]. 
I still have one question about this issue. In {{jm_part2.log}}, I found the 
job recovered on a failover triggered by a RM leadership lost (around 
03:08:44). After that the RM did ask for a new TM for slot requests so that the 
job recovered. Does that mean the pending TM was abandoned in this case?

> Job keeps failing on slot allocation timeout due to RM not allocating new TMs 
> for slot requests
> ---
>
> Key: FLINK-15456
> URL: https://issues.apache.org/jira/browse/FLINK-15456
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Blocker
> Fix For: 1.10.0
>
> Attachments: jm_part.log, jm_part2.log
>
>
> As in the attached JM log, the job tried to start 30 TMs but only 29 are 
> registered. So the job fails due to not able to acquire all 30 slots needed 
> in time.
> And when the failover happens and tasks are re-scheduled, the RM will not ask 
> for new TMs even if it cannot fulfill the slot requests. So the job will keep 
> failing for slot allocation timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-02 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006721#comment-17006721
 ] 

Zhu Zhu commented on FLINK-15448:
-

TBH, I think it's not very necessary to add the host to each log line that 
contains a ResourceID as long as we can easily find the host of a container in 
the same log file. This also makes the log file larger due to redundancy.

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Minor
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15456) Job keeps failing on slot allocation timeout due to RM not allocating new TMs for slot requests

2020-01-02 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006779#comment-17006779
 ] 

Zhu Zhu commented on FLINK-15456:
-

[~xintongsong], in this case, the RM recovered without a restart, you can check 
the log around 03:08:44.

> Job keeps failing on slot allocation timeout due to RM not allocating new TMs 
> for slot requests
> ---
>
> Key: FLINK-15456
> URL: https://issues.apache.org/jira/browse/FLINK-15456
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Blocker
> Fix For: 1.10.0
>
> Attachments: jm_part.log, jm_part2.log
>
>
> As in the attached JM log, the job tried to start 30 TMs but only 29 are 
> registered. So the job fails due to not able to acquire all 30 slots needed 
> in time.
> And when the failover happens and tasks are re-scheduled, the RM will not ask 
> for new TMs even if it cannot fulfill the slot requests. So the job will keep 
> failing for slot allocation timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-02 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006798#comment-17006798
 ] 

Zhu Zhu commented on FLINK-15448:
-

Hi Victor, have you investigated which logs should be improved?

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Minor
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-02 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006798#comment-17006798
 ] 

Zhu Zhu edited comment on FLINK-15448 at 1/2/20 1:01 PM:
-

[~victor-wong], have you investigated which logs should be improved?


was (Author: zhuzh):
Hi Victor, have you investigated which logs should be improved?

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Minor
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14163) Execution#producedPartitions is possibly not assigned when used

2020-01-02 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14163:

Description: 
Currently {{Execution#producedPartitions}} is assigned after the partitions 
have completed the registration to shuffle master in 
{{Execution#registerProducedPartitions(...)}}.
The partition registration is an async interface 
({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
{{Execution#producedPartitions}} is possible[1] not set when used. 

Usages includes:
1. deploying this task, so that the task may be deployed without its result 
partitions assigned, and the job would hang. (DefaultScheduler issue only, 
since legacy scheduler handled this case)
2. generating input descriptors for downstream tasks: 
3. retrieve {{ResultPartitionID}} for partition releasing: 

[1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
not problematic at the moment since it returns a completed future on 
registration, so that it would be a synchronized process. However, if users 
implement their own shuffle service in which the 
{{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
can be a problem. This is possible since customizable shuffle service is open 
to users since 1.9 (via config "shuffle-service-factory.class").

To avoid issues to happen, we may either 
1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
set, or 
2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
interface

  was:
Currently {{Execution#producedPartitions}} is assigned after the partitions 
have completed the registration to shuffle master in 
{{Execution#registerProducedPartitions(...)}}.
But the task deployment process (in {{Execution#deploy())}} will create 
{{ResultPartitionDeploymentDescriptor}} directly from 
{{Execution#producedPartitions}} without checking whether it's assigned.
This may lead to a task deployed without its result partitions. And eventually 
cause the job to hang.

It is not problematic at the moment when using Flink default shuffle master 
{{NettyShuffleMaster}} since it returns a completed future on registration. 
However, if the behavior is changed or if users are using a customized 
{{ShuffleMaster}}, it may cause problems.

Besides that, {{Execution#producedPartitions}} is also used for 
 * generating downstream task input descriptor
 * retrieve {{ResultPartitionID}} for partition releasing

To avoid issues to happen, we may need to change all the usages of 
{{Execution#producedPartitions}} to a callback way, e.g. change 
{{Execution#producedPartitions}} from {{Map}} to 
{{CompletableFuture>}} and adjust all its usages.


> Execution#producedPartitions is possibly not assigned when used
> ---
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> set, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14163) Execution#producedPartitions is possibly not assigned when used

2020-01-02 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14163:

Description: 
Currently {{Execution#producedPartitions}} is assigned after the partitions 
have completed the registration to shuffle master in 
{{Execution#registerProducedPartitions(...)}}.
The partition registration is an async interface 
({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
{{Execution#producedPartitions}} is possible[1] not set when used. 

Usages includes:
1. deploying this task, so that the task may be deployed without its result 
partitions assigned, and the job would hang. (DefaultScheduler issue only, 
since legacy scheduler handled this case)
2. generating input descriptors for downstream tasks: 
3. retrieve {{ResultPartitionID}} for partition releasing: 

[1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
not problematic at the moment since it returns a completed future on 
registration, so that it would be a synchronized process. However, if users 
implement their own shuffle service in which the 
{{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
can be a problem. This is possible since customizable shuffle service is open 
to users since 1.9 (via config "shuffle-service-factory.class").

To avoid issues to happen, we may either 
1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
assigning, or 
2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
interface

  was:
Currently {{Execution#producedPartitions}} is assigned after the partitions 
have completed the registration to shuffle master in 
{{Execution#registerProducedPartitions(...)}}.
The partition registration is an async interface 
({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
{{Execution#producedPartitions}} is possible[1] not set when used. 

Usages includes:
1. deploying this task, so that the task may be deployed without its result 
partitions assigned, and the job would hang. (DefaultScheduler issue only, 
since legacy scheduler handled this case)
2. generating input descriptors for downstream tasks: 
3. retrieve {{ResultPartitionID}} for partition releasing: 

[1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
not problematic at the moment since it returns a completed future on 
registration, so that it would be a synchronized process. However, if users 
implement their own shuffle service in which the 
{{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
can be a problem. This is possible since customizable shuffle service is open 
to users since 1.9 (via config "shuffle-service-factory.class").

To avoid issues to happen, we may either 
1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
set, or 
2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
interface


> Execution#producedPartitions is possibly not assigned when used
> ---
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#

[jira] [Commented] (FLINK-15456) Job keeps failing on slot allocation timeout due to RM not allocating new TMs for slot requests

2020-01-02 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007187#comment-17007187
 ] 

Zhu Zhu commented on FLINK-15456:
-

[~xintongsong], the RM was not really revoked leadership nor suspended in this 
case. It was just the TMs thought it to be and thus disconnected themselves.

> Job keeps failing on slot allocation timeout due to RM not allocating new TMs 
> for slot requests
> ---
>
> Key: FLINK-15456
> URL: https://issues.apache.org/jira/browse/FLINK-15456
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Blocker
> Fix For: 1.10.0
>
> Attachments: jm_part.log, jm_part2.log
>
>
> As in the attached JM log, the job tried to start 30 TMs but only 29 are 
> registered. So the job fails due to not able to acquire all 30 slots needed 
> in time.
> And when the failover happens and tasks are re-scheduled, the RM will not ask 
> for new TMs even if it cannot fulfill the slot requests. So the job will keep 
> failing for slot allocation timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15456) Job keeps failing on slot allocation timeout due to RM not allocating new TMs for slot requests

2020-01-02 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007193#comment-17007193
 ] 

Zhu Zhu commented on FLINK-15456:
-

Synced with [~xintongsong] offline, the RM recovered because a disconnection 
and reconnection of a existing TM fulfilled the request which had been pending 
for long,  so that the RM would request new TMs on new slot requests.

Now we can focus on the original issue, I will try to reproduce it with debug 
logs.

> Job keeps failing on slot allocation timeout due to RM not allocating new TMs 
> for slot requests
> ---
>
> Key: FLINK-15456
> URL: https://issues.apache.org/jira/browse/FLINK-15456
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Blocker
> Fix For: 1.10.0
>
> Attachments: jm_part.log, jm_part2.log
>
>
> As in the attached JM log, the job tried to start 30 TMs but only 29 are 
> registered. So the job fails due to not able to acquire all 30 slots needed 
> in time.
> And when the failover happens and tasks are re-scheduled, the RM will not ask 
> for new TMs even if it cannot fulfill the slot requests. So the job will keep 
> failing for slot allocation timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-02 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007198#comment-17007198
 ] 

Zhu Zhu commented on FLINK-15448:
-

[~victor-wong], have you investigated which logs should be improved?
It would be great if you can share your plan on which logs you'd like to 
adjust. And just as you have mentioned, would the changes spread the host info 
in many places?

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Minor
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-03 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007302#comment-17007302
 ] 

Zhu Zhu commented on FLINK-15448:
-

How could we get the host of a ResourceID in 
{{org.apache.flink.runtime.resourcemanager.ResourceManager.TaskManagerHeartbeatListener#notifyHeartbeatTimeout}}?

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Minor
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14163) Execution#producedPartitions is possibly not assigned when used

2020-01-06 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17008606#comment-17008606
 ] 

Zhu Zhu commented on FLINK-14163:
-

Thanks [~zjwang] for the feedbacks.
I think partition releasing could also be a problem for both legacy and 
DefaultScheduler. If a task is released before its partitions are successfully 
registered to shuffle master, JM will see no partition to release when 
cancelling the task, so that the partitions could be leaked in shuffle master.
And agree that making {{ShuffleMaster#registerPartitionWithProducer}} a sync 
interface would be much easier to avoid such possible issues at the moment.
[~gjy] do you think we need to fix this in 1.10? The major problem (streaming 
job tasks get deployed without knowing its inputs) is only possible to happen 
with DefaultScheduler (with custom shuffle service). I'd prefer to fix it in 
1.10 if we can have an easy and low risk fix.

> Execution#producedPartitions is possibly not assigned when used
> ---
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14572) BlobsCleanupITCase failed on Travis

2020-01-06 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-14572.
---
Resolution: Cannot Reproduce

[~gaoyunhaii] added some detailed logs when the case fails. Close this ticket 
for now. Let's re-open it if it happens again.

master:
0fd8f3970a417758ca32efce718399c0f8c222e0

release-1.10:
e73494b751be3a1c4c3e53901e42cca8f0741200

> BlobsCleanupITCase failed on Travis
> ---
>
> Key: FLINK-14572
> URL: https://issues.apache.org/jira/browse/FLINK-14572
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Assignee: Yun Gao
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> {noformat}
> java.lang.AssertionError: 
> Expected: is 
>  but: was 
>   at 
> org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanup(BlobsCleanupITCase.java:220)
>   at 
> org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanupFinishedJob(BlobsCleanupITCase.java:133)
> {noformat}
> https://api.travis-ci.com/v3/job/250445874/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14163) Execution#producedPartitions is possibly not assigned when used

2020-01-07 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17009471#comment-17009471
 ] 

Zhu Zhu commented on FLINK-14163:
-

+1 to the solution proposed by [~azagrebin]. The fix should be simple enough 
and it's good that we do not need to change the shuffle interface back&forth. 
We can also easily improve the usage implementation of the returned completable 
future when needed in the future.

[~ym], yes keeping the async interface is generally good in long term. So using 
the future in a sync way for now as proposed by Andrey is better than changing 
the shuffle interface to sync pattern.

> Execution#producedPartitions is possibly not assigned when used
> ---
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15499) No debug log describes the host of a TM before any task is deployed to it in YARN mode

2020-01-07 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-15499:
---

 Summary: No debug log describes the host of a TM before any task 
is deployed to it  in YARN mode 
 Key: FLINK-15499
 URL: https://issues.apache.org/jira/browse/FLINK-15499
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.10.0
Reporter: Zhu Zhu


When troubleshooting FLINK-15456, I noticed a TM hang in starting and not able 
to register to RM. However, there is no info on which host the TM located on 
and thus we can hardly find the logs of the problematic TM.
I think we should print the host name when starting a TM, i.e. in this logs
"TaskExecutor container_ will be started ...".
This would make it possible for us to troubleshoot similar problems. (not only 
for cases that TM hang in starting, but also for cases that TM exits in 
starting)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15499) No debug log describes the host of a TM before any task is deployed to it in YARN mode

2020-01-07 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17009575#comment-17009575
 ] 

Zhu Zhu commented on FLINK-15499:
-

[~xintongsong] would you help to confirm if it is really a problem?

> No debug log describes the host of a TM before any task is deployed to it  in 
> YARN mode 
> 
>
> Key: FLINK-15499
> URL: https://issues.apache.org/jira/browse/FLINK-15499
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
>
> When troubleshooting FLINK-15456, I noticed a TM hang in starting and not 
> able to register to RM. However, there is no info on which host the TM 
> located on and thus we can hardly find the logs of the problematic TM.
> I think we should print the host name when starting a TM, i.e. in this logs
> "TaskExecutor container_ will be started ...".
> This would make it possible for us to troubleshoot similar problems. (not 
> only for cases that TM hang in starting, but also for cases that TM exits in 
> starting)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15499) No debug log describes the host of a TM before any task is deployed to it in YARN mode

2020-01-07 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-15499:

Description: 
When troubleshooting FLINK-15456, I noticed a TM hang in starting and not able 
to register to RM. However, there is no debug log on which host the TM located 
on and thus I can hardly find the logs of the problematic TM.
I think we should print the host name when starting a TM, i.e. in this logs
"TaskExecutor container_ will be started ...".
This would make it possible for us to troubleshoot similar problems. (not only 
for cases that TM hang in starting, but also for cases that TM exits in 
starting)

  was:
When troubleshooting FLINK-15456, I noticed a TM hang in starting and not able 
to register to RM. However, there is no info on which host the TM located on 
and thus we can hardly find the logs of the problematic TM.
I think we should print the host name when starting a TM, i.e. in this logs
"TaskExecutor container_ will be started ...".
This would make it possible for us to troubleshoot similar problems. (not only 
for cases that TM hang in starting, but also for cases that TM exits in 
starting)


> No debug log describes the host of a TM before any task is deployed to it  in 
> YARN mode 
> 
>
> Key: FLINK-15499
> URL: https://issues.apache.org/jira/browse/FLINK-15499
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
>
> When troubleshooting FLINK-15456, I noticed a TM hang in starting and not 
> able to register to RM. However, there is no debug log on which host the TM 
> located on and thus I can hardly find the logs of the problematic TM.
> I think we should print the host name when starting a TM, i.e. in this logs
> "TaskExecutor container_ will be started ...".
> This would make it possible for us to troubleshoot similar problems. (not 
> only for cases that TM hang in starting, but also for cases that TM exits in 
> starting)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15456) Job keeps failing on slot allocation timeout due to RM not allocating new TMs for slot requests

2020-01-07 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-15456:

Attachment: jm.log
tm_container_07.log

> Job keeps failing on slot allocation timeout due to RM not allocating new TMs 
> for slot requests
> ---
>
> Key: FLINK-15456
> URL: https://issues.apache.org/jira/browse/FLINK-15456
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Blocker
> Fix For: 1.10.0
>
> Attachments: jm.log, jm_part.log, jm_part2.log, tm_container_07.log
>
>
> As in the attached JM log, the job tried to start 30 TMs but only 29 are 
> registered. So the job fails due to not able to acquire all 30 slots needed 
> in time.
> And when the failover happens and tasks are re-scheduled, the RM will not ask 
> for new TMs even if it cannot fulfill the slot requests. So the job will keep 
> failing for slot allocation timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15456) Job keeps failing on slot allocation timeout due to RM not allocating new TMs for slot requests

2020-01-07 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17009644#comment-17009644
 ] 

Zhu Zhu commented on FLINK-15456:
-

I just reproduced the issue with debug logs enabled. See attached files jm.log 
and tm_container_07.log.
>From jm.log, RM started 28 containers but only 27 successfully registered 
>back. The pending one, which is container_e14_1578278362819_0013_29_07, 
>failed to find RM leader on zk so it did not register to RM (this is 
>intentioned triggered in this stability test).
So I think it is the issue described in FLINK-13554.
[~xintongsong] would you help to confirm it? If so, we can make it critical to 
not block 1.10 release since it has been there since previous Flink versions, 
but I'd still prefer to fix it in 1.10.

> Job keeps failing on slot allocation timeout due to RM not allocating new TMs 
> for slot requests
> ---
>
> Key: FLINK-15456
> URL: https://issues.apache.org/jira/browse/FLINK-15456
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Blocker
> Fix For: 1.10.0
>
> Attachments: jm.log, jm_part.log, jm_part2.log, tm_container_07.log
>
>
> As in the attached JM log, the job tried to start 30 TMs but only 29 are 
> registered. So the job fails due to not able to acquire all 30 slots needed 
> in time.
> And when the failover happens and tasks are re-scheduled, the RM will not ask 
> for new TMs even if it cannot fulfill the slot requests. So the job will keep 
> failing for slot allocation timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14163) Execution#producedPartitions is possibly not assigned when used

2020-01-07 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17009799#comment-17009799
 ] 

Zhu Zhu commented on FLINK-14163:
-

Makes sense. Unexpected blocked main thread can be hard to troubleshoot for 
users, and throwing explicit errors may be better.



> Execution#producedPartitions is possibly not assigned when used
> ---
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15499) No debug log describes the host of a TM before any task is deployed to it in YARN mode

2020-01-07 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15499:
---

Assignee: Xintong Song

> No debug log describes the host of a TM before any task is deployed to it  in 
> YARN mode 
> 
>
> Key: FLINK-15499
> URL: https://issues.apache.org/jira/browse/FLINK-15499
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Xintong Song
>Priority: Major
>
> When troubleshooting FLINK-15456, I noticed a TM hang in starting and not 
> able to register to RM. However, there is no debug log on which host the TM 
> located on and thus I can hardly find the logs of the problematic TM.
> I think we should print the host name when starting a TM, i.e. in this logs
> "TaskExecutor container_ will be started ...".
> This would make it possible for us to troubleshoot similar problems. (not 
> only for cases that TM hang in starting, but also for cases that TM exits in 
> starting)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15499) No debug log describes the host of a TM before any task is deployed to it in YARN mode

2020-01-07 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010286#comment-17010286
 ] 

Zhu Zhu commented on FLINK-15499:
-

I agree that make this log info would help, especially later when we may have 
TMs with different resources.
I have assigned the ticket to you. Just go ahead.

> No debug log describes the host of a TM before any task is deployed to it  in 
> YARN mode 
> 
>
> Key: FLINK-15499
> URL: https://issues.apache.org/jira/browse/FLINK-15499
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Xintong Song
>Priority: Major
>
> When troubleshooting FLINK-15456, I noticed a TM hang in starting and not 
> able to register to RM. However, there is no debug log on which host the TM 
> located on and thus I can hardly find the logs of the problematic TM.
> I think we should print the host name when starting a TM, i.e. in this logs
> "TaskExecutor container_ will be started ...".
> This would make it possible for us to troubleshoot similar problems. (not 
> only for cases that TM hang in starting, but also for cases that TM exits in 
> starting)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-07 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010312#comment-17010312
 ] 

Zhu Zhu commented on FLINK-15448:
-

I just met another case that one can even no be able to find the host of a 
pending/failed TM in logs (FLINK-15499).
So I think it would be helpful to print the host of a TM not only in the task 
deploying stages. 
Composing the host info into the ResourceID looks to me a better design than 
spreading host around with ResourceID. 
There can be 2 defects though:
1. redundancy logs
2. ResourceID size would double and the size of certain RPCs (like heartbeat) 
may increase. This is a common issue for the work to associate other IDs with 
meanings, like ExecutionAttemptID and IntermediateResultPartitionID
Regarding ResourceID, these 2 defects should not be critical.

[~trohrmann] shall we replace ResourceID with a extended class like 
TaskManagerID? I think using a general ResourceID for both TM/RM/JM is making 
it not that nonintuitive in development. And with it we can also limit the 
change to for the extended class at the moment.

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-07 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010320#comment-17010320
 ] 

Zhu Zhu commented on FLINK-15448:
-

[~xintongsong] agreed that we should take a lot care when making changes on the 
IDs. 
However, since it is not user visible, maybe a FLIP is not really needed. And 
in my mind these changes can be separate pieces. For example,  
ExecutionAttemptID and IntermediateResultPartitionID are not much related to 
the ResourceID changes.
Maybe we can have a umbrella ticket to track all these tasks. Each task then 
should be responsible to carefully design the changes on the ID it is to 
improve. And each task can have a separate ML discussion when necessary.

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15499) No debug log describes the host of a TM before any task is deployed to it in YARN mode

2020-01-07 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-15499:

Fix Version/s: 1.10.0

> No debug log describes the host of a TM before any task is deployed to it  in 
> YARN mode 
> 
>
> Key: FLINK-15499
> URL: https://issues.apache.org/jira/browse/FLINK-15499
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Xintong Song
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When troubleshooting FLINK-15456, I noticed a TM hang in starting and not 
> able to register to RM. However, there is no debug log on which host the TM 
> located on and thus I can hardly find the logs of the problematic TM.
> I think we should print the host name when starting a TM, i.e. in this logs
> "TaskExecutor container_ will be started ...".
> This would make it possible for us to troubleshoot similar problems. (not 
> only for cases that TM hang in starting, but also for cases that TM exits in 
> starting)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15499) No debug log describes the host of a TM before any task is deployed to it in YARN mode

2020-01-07 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010444#comment-17010444
 ] 

Zhu Zhu commented on FLINK-15499:
-

Fixed via:

master:
9df5c80e7e729f49595ef6814462165831fd1307

release-1.10:
1a75bc1048ab7d2b48426a43326f59061ef7a79d

> No debug log describes the host of a TM before any task is deployed to it  in 
> YARN mode 
> 
>
> Key: FLINK-15499
> URL: https://issues.apache.org/jira/browse/FLINK-15499
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Xintong Song
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When troubleshooting FLINK-15456, I noticed a TM hang in starting and not 
> able to register to RM. However, there is no debug log on which host the TM 
> located on and thus I can hardly find the logs of the problematic TM.
> I think we should print the host name when starting a TM, i.e. in this logs
> "TaskExecutor container_ will be started ...".
> This would make it possible for us to troubleshoot similar problems. (not 
> only for cases that TM hang in starting, but also for cases that TM exits in 
> starting)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-15499) No debug log describes the host of a TM before any task is deployed to it in YARN mode

2020-01-07 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu resolved FLINK-15499.
-
Resolution: Fixed

> No debug log describes the host of a TM before any task is deployed to it  in 
> YARN mode 
> 
>
> Key: FLINK-15499
> URL: https://issues.apache.org/jira/browse/FLINK-15499
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Xintong Song
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When troubleshooting FLINK-15456, I noticed a TM hang in starting and not 
> able to register to RM. However, there is no debug log on which host the TM 
> located on and thus I can hardly find the logs of the problematic TM.
> I think we should print the host name when starting a TM, i.e. in this logs
> "TaskExecutor container_ will be started ...".
> This would make it possible for us to troubleshoot similar problems. (not 
> only for cases that TM hang in starting, but also for cases that TM exits in 
> starting)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15456) Job keeps failing on slot allocation timeout due to RM not allocating new TMs for slot requests

2020-01-07 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-15456:

Priority: Critical  (was: Blocker)

> Job keeps failing on slot allocation timeout due to RM not allocating new TMs 
> for slot requests
> ---
>
> Key: FLINK-15456
> URL: https://issues.apache.org/jira/browse/FLINK-15456
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Critical
> Fix For: 1.10.0
>
> Attachments: jm.log, jm_part.log, jm_part2.log, tm_container_07.log
>
>
> As in the attached JM log, the job tried to start 30 TMs but only 29 are 
> registered. So the job fails due to not able to acquire all 30 slots needed 
> in time.
> And when the failover happens and tasks are re-scheduled, the RM will not ask 
> for new TMs even if it cannot fulfill the slot requests. So the job will keep 
> failing for slot allocation timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14058) FLIP-53 Fine Grained Operator Resource Management

2020-01-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-14058.
---
Release Note: 
The memory configs for table operators, including 
 * table.exec.resource.external-buffer-memory, 
 * table.exec.resource.hash-agg.memory, 
 * table.exec.resource.hash-join.memory, and
 * table.exec.resource.sort.memory, 
 are just weight hints now rather than absolute memory requirements. In this 
way, operators would be able to make use of all the managed memory of a slot. 
This helps the task to run more stable if the slot managed memory is limited, 
or more efficient if their are adequate slot managed memory.
  Resolution: Fixed

> FLIP-53 Fine Grained Operator Resource Management
> -
>
> Key: FLINK-14058
> URL: https://issues.apache.org/jira/browse/FLINK-14058
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Xintong Song
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: Umbrella
> Fix For: 1.10.0
>
>
> This is the umbrella issue of 'FLIP-53: Fine Grained Operator Resource 
> Management'.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14058) FLIP-53 Fine Grained Operator Resource Management

2020-01-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010516#comment-17010516
 ] 

Zhu Zhu commented on FLINK-14058:
-

Hi [~lzljs3620320], would you help to verify the release note of FLIP-53?
Looks to me the only visible part to users are the table operator config 
changes.

> FLIP-53 Fine Grained Operator Resource Management
> -
>
> Key: FLINK-14058
> URL: https://issues.apache.org/jira/browse/FLINK-14058
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Xintong Song
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: Umbrella
> Fix For: 1.10.0
>
>
> This is the umbrella issue of 'FLIP-53: Fine Grained Operator Resource 
> Management'.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14058) FLIP-53 Fine Grained Operator Resource Management

2020-01-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14058:

Release Note: 
The memory configs for table operators are weight hints now rather than 
absolute memory requirements. The configs includes
 * table.exec.resource.external-buffer-memory, 
 * table.exec.resource.hash-agg.memory, 
 * table.exec.resource.hash-join.memory, and
 * table.exec.resource.sort.memory.
In this way, operators would be able to make use of all the managed memory of a 
slot. It helps the task to run more stable if the slot managed memory is 
limited, or more efficient if their are adequate slot managed memory.

  was:
The memory configs for table operators, including 
 * table.exec.resource.external-buffer-memory, 
 * table.exec.resource.hash-agg.memory, 
 * table.exec.resource.hash-join.memory, and
 * table.exec.resource.sort.memory, 
 are just weight hints now rather than absolute memory requirements. In this 
way, operators would be able to make use of all the managed memory of a slot. 
This helps the task to run more stable if the slot managed memory is limited, 
or more efficient if their are adequate slot managed memory.


> FLIP-53 Fine Grained Operator Resource Management
> -
>
> Key: FLINK-14058
> URL: https://issues.apache.org/jira/browse/FLINK-14058
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Xintong Song
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: Umbrella
> Fix For: 1.10.0
>
>
> This is the umbrella issue of 'FLIP-53: Fine Grained Operator Resource 
> Management'.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13554) ResourceManager should have a timeout on starting new TaskExecutors.

2020-01-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010522#comment-17010522
 ] 

Zhu Zhu commented on FLINK-13554:
-

This issue is triggered only when a TM is stuck in launching before registering 
to RM. Currently we only see this case in our stability tests which break 
zookeeper and network connections intentionally.
So I agree that we can postpone it as long as we do not encounter this issue in 
production.

> ResourceManager should have a timeout on starting new TaskExecutors.
> 
>
> Key: FLINK-13554
> URL: https://issues.apache.org/jira/browse/FLINK-13554
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Xintong Song
>Priority: Critical
> Fix For: 1.10.0
>
>
> Recently, we encountered a case that one TaskExecutor get stuck during 
> launching on Yarn (without fail), causing that job cannot recover from 
> continuous failovers.
> The reason the TaskExecutor gets stuck is due to our environment problem. The 
> TaskExecutor gets stuck somewhere after the ResourceManager starts the 
> TaskExecutor and waiting for the TaskExecutor to be brought up and register. 
> Later when the slot request timeouts, the job fails over and requests slots 
> from ResourceManager again, the ResourceManager still see a TaskExecutor (the 
> stuck one) is being started and will not request new container from Yarn. 
> Therefore, the job can not recover from failure.
> I think to avoid such unrecoverable status, the ResourceManager need to have 
> a timeout on starting new TaskExecutor. If the starting of TaskExecutor takes 
> too long, it should just fail the TaskExecutor and starts a new one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14058) FLIP-53 Fine Grained Operator Resource Management

2020-01-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14058:

Release Note: 
The memory configs for table operators are weight hints now rather than 
absolute memory requirements. This means that the actual managed memory 
allocated may be smaller or larger than the config values, which depends on the 
actual slot managed memory capacity.
The configs includes
 * table.exec.resource.external-buffer-memory, 
 * table.exec.resource.hash-agg.memory, 
 * table.exec.resource.hash-join.memory, and
 * table.exec.resource.sort.memory.
This ensures that operators do not over allocate memory or leave available 
memory unallocated. It helps the task to run more stable if the slot managed 
memory is limited, or more efficient if their are adequate slot managed memory.


  was:
The memory configs for table operators are weight hints now rather than 
absolute memory requirements. The configs includes
 * table.exec.resource.external-buffer-memory, 
 * table.exec.resource.hash-agg.memory, 
 * table.exec.resource.hash-join.memory, and
 * table.exec.resource.sort.memory.
In this way, operators would be able to make use of all the managed memory of a 
slot. It helps the task to run more stable if the slot managed memory is 
limited, or more efficient if their are adequate slot managed memory.


> FLIP-53 Fine Grained Operator Resource Management
> -
>
> Key: FLINK-14058
> URL: https://issues.apache.org/jira/browse/FLINK-14058
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Xintong Song
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: Umbrella
> Fix For: 1.10.0
>
>
> This is the umbrella issue of 'FLIP-53: Fine Grained Operator Resource 
> Management'.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14058) FLIP-53 Fine Grained Operator Resource Management

2020-01-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010529#comment-17010529
 ] 

Zhu Zhu commented on FLINK-14058:
-

[~lzljs3620320], Thanks a lot!
Updated the release note. Adjusted your suggestion a bit, since the configs 
have default values which take effects even if they are not configured.

> FLIP-53 Fine Grained Operator Resource Management
> -
>
> Key: FLINK-14058
> URL: https://issues.apache.org/jira/browse/FLINK-14058
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Xintong Song
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: Umbrella
> Fix For: 1.10.0
>
>
> This is the umbrella issue of 'FLIP-53: Fine Grained Operator Resource 
> Management'.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14058) FLIP-53 Fine Grained Operator Resource Management

2020-01-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14058:

Release Note: 
The memory configs for table operators are weight hints now rather than 
absolute memory requirements. This means that the actual managed memory 
allocated may be smaller or larger than the config values, which depends on the 
actual slot managed memory capacity.
The configs include
 * table.exec.resource.external-buffer-memory, 
 * table.exec.resource.hash-agg.memory, 
 * table.exec.resource.hash-join.memory, and
 * table.exec.resource.sort.memory.
This ensures that operators do not over allocate memory or leave available 
memory unallocated. It helps the task to run more stable if the slot managed 
memory is limited, or more efficient if their are adequate slot managed memory.


  was:
The memory configs for table operators are weight hints now rather than 
absolute memory requirements. This means that the actual managed memory 
allocated may be smaller or larger than the config values, which depends on the 
actual slot managed memory capacity.
The configs includes
 * table.exec.resource.external-buffer-memory, 
 * table.exec.resource.hash-agg.memory, 
 * table.exec.resource.hash-join.memory, and
 * table.exec.resource.sort.memory.
This ensures that operators do not over allocate memory or leave available 
memory unallocated. It helps the task to run more stable if the slot managed 
memory is limited, or more efficient if their are adequate slot managed memory.



> FLIP-53 Fine Grained Operator Resource Management
> -
>
> Key: FLINK-14058
> URL: https://issues.apache.org/jira/browse/FLINK-14058
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Xintong Song
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: Umbrella
> Fix For: 1.10.0
>
>
> This is the umbrella issue of 'FLIP-53: Fine Grained Operator Resource 
> Management'.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14058) FLIP-53 Fine Grained Operator Resource Management

2020-01-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14058:

Release Note: 
The memory configs for table operators are weight hints now rather than 
absolute memory requirements. This means that the actual managed memory 
allocated may be smaller or larger than the config values, which depends on the 
actual slot managed memory capacity.
The affected configs are {{table.exec.resource.external-buffer-memory}}, 
{{table.exec.resource.hash-agg.memory}}, 
{{table.exec.resource.hash-join.memory}}, and 
{{table.exec.resource.sort.memory}}.
This ensures that operators do not over allocate memory or leave available 
memory unallocated. It helps the task to run more stable if the slot managed 
memory is limited, or more efficient if their are adequate slot managed memory.


  was:
The memory configs for table operators are weight hints now rather than 
absolute memory requirements. This means that the actual managed memory 
allocated may be smaller or larger than the config values, which depends on the 
actual slot managed memory capacity.
The configs include
 * table.exec.resource.external-buffer-memory, 
 * table.exec.resource.hash-agg.memory, 
 * table.exec.resource.hash-join.memory, and
 * table.exec.resource.sort.memory.
This ensures that operators do not over allocate memory or leave available 
memory unallocated. It helps the task to run more stable if the slot managed 
memory is limited, or more efficient if their are adequate slot managed memory.



> FLIP-53 Fine Grained Operator Resource Management
> -
>
> Key: FLINK-14058
> URL: https://issues.apache.org/jira/browse/FLINK-14058
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Xintong Song
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: Umbrella
> Fix For: 1.10.0
>
>
> This is the umbrella issue of 'FLIP-53: Fine Grained Operator Resource 
> Management'.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14058) FLIP-53 Fine Grained Operator Resource Management

2020-01-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14058:

Release Note: 
The memory configs for table operators are weight hints now rather than 
absolute memory requirements. This means that the actual managed memory 
allocated may be smaller or larger than the config values, which depends on the 
actual slot managed memory capacity.
The affected configs are table.exec.resource.external-buffer-memory, 
table.exec.resource.hash-agg.memory, table.exec.resource.hash-join.memory, and 
table.exec.resource.sort.memory.
This ensures that operators do not over allocate memory or leave available 
memory unallocated. It helps the task to run more stable if the slot managed 
memory is limited, or more efficient if their are adequate slot managed memory.


  was:
The memory configs for table operators are weight hints now rather than 
absolute memory requirements. This means that the actual managed memory 
allocated may be smaller or larger than the config values, which depends on the 
actual slot managed memory capacity.
The affected configs are {{table.exec.resource.external-buffer-memory}}, 
{{table.exec.resource.hash-agg.memory}}, 
{{table.exec.resource.hash-join.memory}}, and 
{{table.exec.resource.sort.memory}}.
This ensures that operators do not over allocate memory or leave available 
memory unallocated. It helps the task to run more stable if the slot managed 
memory is limited, or more efficient if their are adequate slot managed memory.



> FLIP-53 Fine Grained Operator Resource Management
> -
>
> Key: FLINK-14058
> URL: https://issues.apache.org/jira/browse/FLINK-14058
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Xintong Song
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: Umbrella
> Fix For: 1.10.0
>
>
> This is the umbrella issue of 'FLIP-53: Fine Grained Operator Resource 
> Management'.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010537#comment-17010537
 ] 

Zhu Zhu commented on FLINK-15448:
-

[~victor-wong] not pretty sure what's the benefit to have different ResourceID 
extends for different deployments?

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14058) FLIP-53 Fine Grained Operator Resource Management

2020-01-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14058:

Release Note: 
The memory configs for table operators are no longer absolute memory 
requirements but weight hints now. This means that the actual managed memory 
allocated can be smaller or larger than the config values, which depends on the 
actual slot managed memory capacity.
The affected configs are table.exec.resource.external-buffer-memory, 
table.exec.resource.hash-agg.memory, table.exec.resource.hash-join.memory, and 
table.exec.resource.sort.memory.
This change ensures that operators do not over allocate memory or leave 
available memory unallocated. It helps the task to be more stable if the slot 
managed memory is limited, or more efficient if the slot has adequate managed 
memory.


  was:
The memory configs for table operators are weight hints now rather than 
absolute memory requirements. This means that the actual managed memory 
allocated may be smaller or larger than the config values, which depends on the 
actual slot managed memory capacity.
The affected configs are table.exec.resource.external-buffer-memory, 
table.exec.resource.hash-agg.memory, table.exec.resource.hash-join.memory, and 
table.exec.resource.sort.memory.
This ensures that operators do not over allocate memory or leave available 
memory unallocated. It helps the task to run more stable if the slot managed 
memory is limited, or more efficient if their are adequate slot managed memory.



> FLIP-53 Fine Grained Operator Resource Management
> -
>
> Key: FLINK-14058
> URL: https://issues.apache.org/jira/browse/FLINK-14058
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Xintong Song
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: Umbrella
> Fix For: 1.10.0
>
>
> This is the umbrella issue of 'FLIP-53: Fine Grained Operator Resource 
> Management'.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15522) Misleading root cause exception when cancelling the job

2020-01-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011522#comment-17011522
 ] 

Zhu Zhu commented on FLINK-15522:
-

I think the reason why this happens is that in {{JobResult#createFrom}} would 
set the latest global failure cause of the ExecutionGraph to be the cause of 
the {{JobResult}} if the job is terminated but not FINISHED. 
So if the job is CANCELED and any global failure had ever happened, the latest 
global failure cause would be shown as the root cause of the canceling.


> Misleading root cause exception when cancelling the job
> ---
>
> Key: FLINK-15522
> URL: https://issues.apache.org/jira/browse/FLINK-15522
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
>
> When cancelling a Flink job, the following stack trace gets displayed
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: d0e8c2026709385166bcc0253c30742e)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at 
> org.apache.flink.streaming.examples.statemachine.StateMachineExample.main(StateMachineExample.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was 
> cancelled.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 18 more
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> No pooled slot available and request to ResourceManager for new slot failed
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

[jira] [Comment Edited] (FLINK-15522) Misleading root cause exception when cancelling the job

2020-01-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011522#comment-17011522
 ] 

Zhu Zhu edited comment on FLINK-15522 at 1/9/20 7:52 AM:
-

I think the reason why this happens is that in {{JobResult#createFrom}} would 
set the latest global failure cause of the ExecutionGraph to be the cause of 
the {{JobResult}} if the job is terminated but not FINISHED. 
So if the job is CANCELED and any global failure had ever happened, the latest 
global failure cause would be shown as the root cause of the canceling.
I think in the canceling case, we can just keep JobResult#serializedThrowable 
to be null so that the root cause would simple be {{JobCancellationException: 
Job was cancelled.}}.



was (Author: zhuzh):
I think the reason why this happens is that in {{JobResult#createFrom}} would 
set the latest global failure cause of the ExecutionGraph to be the cause of 
the {{JobResult}} if the job is terminated but not FINISHED. 
So if the job is CANCELED and any global failure had ever happened, the latest 
global failure cause would be shown as the root cause of the canceling.


> Misleading root cause exception when cancelling the job
> ---
>
> Key: FLINK-15522
> URL: https://issues.apache.org/jira/browse/FLINK-15522
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
>
> When cancelling a Flink job, the following stack trace gets displayed
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: d0e8c2026709385166bcc0253c30742e)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at 
> org.apache.flink.streaming.examples.statemachine.StateMachineExample.main(StateMachineExample.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was 
> cancelled.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 18 more
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> No pooled slot available and request to ResourceManager for new slot failed
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>

[jira] [Comment Edited] (FLINK-15522) Misleading root cause exception when cancelling the job

2020-01-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011522#comment-17011522
 ] 

Zhu Zhu edited comment on FLINK-15522 at 1/9/20 7:53 AM:
-

I think the reason why this happens is that in {{JobResult#createFrom}} would 
set the latest global failure cause of the ExecutionGraph to be the cause of 
the {{JobResult}} if the job is terminated but not FINISHED. 
So if the job is CANCELED and any global failure had ever happened, the latest 
global failure cause would be shown as the root cause of the canceling.
I think in the case that a job is CANCELED, we can just keep 
{{JobResult#serializedThrowable}} to be null so that the root cause would 
simple be {{JobCancellationException: Job was cancelled.}}.



was (Author: zhuzh):
I think the reason why this happens is that in {{JobResult#createFrom}} would 
set the latest global failure cause of the ExecutionGraph to be the cause of 
the {{JobResult}} if the job is terminated but not FINISHED. 
So if the job is CANCELED and any global failure had ever happened, the latest 
global failure cause would be shown as the root cause of the canceling.
I think in the canceling case, we can just keep JobResult#serializedThrowable 
to be null so that the root cause would simple be {{JobCancellationException: 
Job was cancelled.}}.


> Misleading root cause exception when cancelling the job
> ---
>
> Key: FLINK-15522
> URL: https://issues.apache.org/jira/browse/FLINK-15522
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
>
> When cancelling a Flink job, the following stack trace gets displayed
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: d0e8c2026709385166bcc0253c30742e)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at 
> org.apache.flink.streaming.examples.statemachine.StateMachineExample.main(StateMachineExample.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was 
> cancelled.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 18 more
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> No pooled slot available and request to ResourceManager for new slot failed
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  

[jira] [Commented] (FLINK-14163) Execution#producedPartitions is possibly not assigned when used

2020-01-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011647#comment-17011647
 ] 

Zhu Zhu commented on FLINK-14163:
-

Thanks [~ym] for trying out the solutions.
I'd prefer option 3 which simply adds a check to ensure the registration future 
is done on return. 
And I think we also need to update the java docs for 
{{ShuffleMaster#registerPartitionWithProducer}} to ask users to only return 
completed future at the moment.
Fell free to open the PR.

> Execution#producedPartitions is possibly not assigned when used
> ---
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Assignee: Yuan Mei
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14163) Execution#producedPartitions is possibly not assigned when used

2020-01-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011647#comment-17011647
 ] 

Zhu Zhu edited comment on FLINK-14163 at 1/9/20 10:09 AM:
--

Thanks [~ym] for trying out the solutions.
I'd prefer option 3 which simply adds a check to ensure the registration future 
is done on return. 
And I think we also need to update the java docs for 
{{ShuffleMaster#registerPartitionWithProducer}} to ask users to only return 
completed future at the moment.
Feel free to open the PR.


was (Author: zhuzh):
Thanks [~ym] for trying out the solutions.
I'd prefer option 3 which simply adds a check to ensure the registration future 
is done on return. 
And I think we also need to update the java docs for 
{{ShuffleMaster#registerPartitionWithProducer}} to ask users to only return 
completed future at the moment.
Fell free to open the PR.

> Execution#producedPartitions is possibly not assigned when used
> ---
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Assignee: Yuan Mei
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14163) Execution#producedPartitions is possibly not assigned when used

2020-01-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011692#comment-17011692
 ] 

Zhu Zhu commented on FLINK-14163:
-

[~chesnay] The deployment issue can happen with ng scheduler since it does not 
wait for the future to complete in 
{{DefaultScheduler#assignResourceOrHandleError}}.
The calling chain of deployment is DefaultScheduler#deployTaskSafe(...) -> 
DefaultExecutionVertexOperations#deploy(...) -> ExecutionVertex#deploy() -> 
Execution#deploy().

> Execution#producedPartitions is possibly not assigned when used
> ---
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Assignee: Yuan Mei
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14163) Execution#producedPartitions is possibly not assigned when used

2020-01-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011878#comment-17011878
 ] 

Zhu Zhu commented on FLINK-14163:
-

Thanks [~chesnay] for the update! Then we are now taking the option to fix all 
the problematic usages of {{Execution#producedPartitions}}.
I think the partition leak could also happen with the legacy scheduler if a 
task is cancelled/failed before its partition registration is completed. Shall 
we also fix it since legacy scheduler is the only scheduler in 1.9 and still 
available to users in 1.10?

> Execution#producedPartitions is possibly not assigned when used
> ---
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Assignee: Yuan Mei
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14163) Execution#producedPartitions is possibly not assigned when used

2020-01-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011944#comment-17011944
 ] 

Zhu Zhu commented on FLINK-14163:
-

Oh I understand what your mean. 
Previously [~azagrebin] & Till had raised the concern that calling get on the 
partition registration future would block the main thread in a vague way. (see 
this 
[comment|https://issues.apache.org/jira/browse/FLINK-14163?focusedCommentId=17009710&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17009710]).
 That's why we had decided to require all 
{{ShuffleMaster#registerPartitionWithProducer}} implementations to return 
completed future only at the moment.
If we would allow to call {{get()}} on the future, looks to me it's even 
simpler to call it in {{Execution#registerProducedPartitions(...)}}, so that 
neither the deployment racing nor the partition leak would happen in this way.


> Execution#producedPartitions is possibly not assigned when used
> ---
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Assignee: Yuan Mei
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14163) Execution#producedPartitions is possibly not assigned when used

2020-01-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011944#comment-17011944
 ] 

Zhu Zhu edited comment on FLINK-14163 at 1/9/20 3:23 PM:
-

Oh I understand what your mean. 
Previously [~azagrebin] & Till had raised the concern that calling {{get()}} on 
the partition registration future would block the main thread in a vague way. 
(see this 
[comment|https://issues.apache.org/jira/browse/FLINK-14163?focusedCommentId=17009710&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17009710]).
 That's why we had decided to require all 
{{ShuffleMaster#registerPartitionWithProducer}} implementations to return 
completed future only at the moment.
If we would allow to call {{get()}} on the future, looks to me it's even 
simpler to call it in {{Execution#registerProducedPartitions(...)}}, so that 
neither the deployment racing nor the partition leak would happen in this way.



was (Author: zhuzh):
Oh I understand what your mean. 
Previously [~azagrebin] & Till had raised the concern that calling get on the 
partition registration future would block the main thread in a vague way. (see 
this 
[comment|https://issues.apache.org/jira/browse/FLINK-14163?focusedCommentId=17009710&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17009710]).
 That's why we had decided to require all 
{{ShuffleMaster#registerPartitionWithProducer}} implementations to return 
completed future only at the moment.
If we would allow to call {{get()}} on the future, looks to me it's even 
simpler to call it in {{Execution#registerProducedPartitions(...)}}, so that 
neither the deployment racing nor the partition leak would happen in this way.


> Execution#producedPartitions is possibly not assigned when used
> ---
>
> Key: FLINK-14163
> URL: https://issues.apache.org/jira/browse/FLINK-14163
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Zhu Zhu
>Assignee: Yuan Mei
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15522) Misleading root cause exception when cancelling the job

2020-01-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17012396#comment-17012396
 ] 

Zhu Zhu commented on FLINK-15522:
-

I will take this issue then and open a PR soon.

> Misleading root cause exception when cancelling the job
> ---
>
> Key: FLINK-15522
> URL: https://issues.apache.org/jira/browse/FLINK-15522
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
>
> When cancelling a Flink job, the following stack trace gets displayed
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: d0e8c2026709385166bcc0253c30742e)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at 
> org.apache.flink.streaming.examples.statemachine.StateMachineExample.main(StateMachineExample.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was 
> cancelled.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 18 more
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> No pooled slot available and request to ResourceManager for new slot failed
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at 

[jira] [Assigned] (FLINK-15522) Misleading root cause exception when cancelling the job

2020-01-09 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-15522:
---

Assignee: Zhu Zhu

> Misleading root cause exception when cancelling the job
> ---
>
> Key: FLINK-15522
> URL: https://issues.apache.org/jira/browse/FLINK-15522
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Zhu Zhu
>Priority: Critical
>
> When cancelling a Flink job, the following stack trace gets displayed
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: d0e8c2026709385166bcc0253c30742e)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at 
> org.apache.flink.streaming.examples.statemachine.StateMachineExample.main(StateMachineExample.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was 
> cancelled.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 18 more
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> No pooled slot available and request to ResourceManager for new slot failed
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(Abstrac

[jira] [Updated] (FLINK-15522) Misleading root cause exception when cancelling the job

2020-01-13 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-15522:

Affects Version/s: 1.9.1

> Misleading root cause exception when cancelling the job
> ---
>
> Key: FLINK-15522
> URL: https://issues.apache.org/jira/browse/FLINK-15522
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.9.1, 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Zhu Zhu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.9.2, 1.10.0, 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When cancelling a Flink job, the following stack trace gets displayed
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: d0e8c2026709385166bcc0253c30742e)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at 
> org.apache.flink.streaming.examples.statemachine.StateMachineExample.main(StateMachineExample.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was 
> cancelled.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 18 more
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> No pooled slot available and request to ResourceManager for new slot failed
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(

[jira] [Closed] (FLINK-15522) Misleading root cause exception when cancelling the job

2020-01-13 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-15522.
---
Fix Version/s: 1.11.0
   1.10.0
   1.9.2
   Resolution: Fixed

Fixed via:

master:
a1a27d01daa859e31d7b8bed1a1f176aa1e5c5ed

release-1.10:
ed57867e342c574907329de45b7227cb60a23007

release-1.9:
c0974b8c7346ef907fd6debafb05ec1ce3326757

> Misleading root cause exception when cancelling the job
> ---
>
> Key: FLINK-15522
> URL: https://issues.apache.org/jira/browse/FLINK-15522
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Zhu Zhu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.9.2, 1.10.0, 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When cancelling a Flink job, the following stack trace gets displayed
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: d0e8c2026709385166bcc0253c30742e)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at 
> org.apache.flink.streaming.examples.statemachine.StateMachineExample.main(StateMachineExample.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was 
> cancelled.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 18 more
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> No pooled slot available and request to ResourceManager for new slot failed
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(Case

[jira] [Commented] (FLINK-15568) RestartPipelinedRegionStrategy: not ensure the EXACTLY_ONCE semantics, in 1.8 versions

2020-01-13 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014212#comment-17014212
 ] 

Zhu Zhu commented on FLINK-15568:
-

Hi [~andrew_lin], RestartPipelinedRegionStrategy is experimental in 1.8 with 
some known issues. It becomes production ready since 1.9 (FLINK-4256).
There is no plan to backport the changes to 1.8 since the changes are huge and 
not bug fixes.

> RestartPipelinedRegionStrategy: not ensure the EXACTLY_ONCE semantics, in 1.8 
> versions
> --
>
> Key: FLINK-15568
> URL: https://issues.apache.org/jira/browse/FLINK-15568
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0, 1.8.1, 1.8.3
>Reporter: Andrew.D.lin
>Priority: Minor
> Attachments: image-2020-01-13-16-40-47-888.png
>
>
> In 1.8* versions, FailoverRegion.java restart method  not restore from latest 
> checkpoint and marked TODO.
> Should we support this feature (region restart) in flink 1.8?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15568) RestartPipelinedRegionStrategy: not ensure the EXACTLY_ONCE semantics, in 1.8 versions

2020-01-13 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014212#comment-17014212
 ] 

Zhu Zhu edited comment on FLINK-15568 at 1/13/20 10:51 AM:
---

Hi [~andrew_lin], RestartPipelinedRegionStrategy is experimental in 1.8 with 
some known issues. It becomes production ready since 1.9 (FLINK-4256). State 
restore is also supported since then.
There is no plan to backport the changes to 1.8 since the changes are huge and 
not bug fixes.


was (Author: zhuzh):
Hi [~andrew_lin], RestartPipelinedRegionStrategy is experimental in 1.8 with 
some known issues. It becomes production ready since 1.9 (FLINK-4256).
There is no plan to backport the changes to 1.8 since the changes are huge and 
not bug fixes.

> RestartPipelinedRegionStrategy: not ensure the EXACTLY_ONCE semantics, in 1.8 
> versions
> --
>
> Key: FLINK-15568
> URL: https://issues.apache.org/jira/browse/FLINK-15568
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0, 1.8.1, 1.8.3
>Reporter: Andrew.D.lin
>Priority: Minor
> Attachments: image-2020-01-13-16-40-47-888.png
>
>
> In 1.8* versions, FailoverRegion.java restart method  not restore from latest 
> checkpoint and marked TODO.
> Should we support this feature (region restart) in flink 1.8?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15568) RestartPipelinedRegionStrategy: not ensure the EXACTLY_ONCE semantics, in 1.8 versions

2020-01-13 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-15568.
---
Resolution: Won't Fix

> RestartPipelinedRegionStrategy: not ensure the EXACTLY_ONCE semantics, in 1.8 
> versions
> --
>
> Key: FLINK-15568
> URL: https://issues.apache.org/jira/browse/FLINK-15568
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0, 1.8.1, 1.8.3
>Reporter: Andrew.D.lin
>Priority: Minor
> Attachments: image-2020-01-13-16-40-47-888.png
>
>
> In 1.8* versions, FailoverRegion.java restart method  not restore from latest 
> checkpoint and marked TODO.
> Should we support this feature (region restart) in flink 1.8?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15568) RestartPipelinedRegionStrategy: not ensure the EXACTLY_ONCE semantics, in 1.8 versions

2020-01-13 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014795#comment-17014795
 ] 

Zhu Zhu commented on FLINK-15568:
-

Sure. Issue is closed.

> RestartPipelinedRegionStrategy: not ensure the EXACTLY_ONCE semantics, in 1.8 
> versions
> --
>
> Key: FLINK-15568
> URL: https://issues.apache.org/jira/browse/FLINK-15568
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0, 1.8.1, 1.8.3
>Reporter: Andrew.D.lin
>Priority: Minor
> Attachments: image-2020-01-13-16-40-47-888.png
>
>
> In 1.8* versions, FailoverRegion.java restart method  not restore from latest 
> checkpoint and marked TODO.
> Should we support this feature (region restart) in flink 1.8?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15307) Subclasses of FailoverStrategy are easily confused with implementation classes of RestartStrategy

2020-01-13 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014799#comment-17014799
 ] 

Zhu Zhu commented on FLINK-15307:
-

The legacy failover strategies will be removed in 1.11, which means most of the 
classes you change will be removed.
So I think it's better to  only change the non-legacy(flip1) failover 
strategies which will be be kept.
B.T.W I had a glance of the PR and looks to me it does not follow the name 
patterns discussed above.

cc: [~gjy]

> Subclasses of FailoverStrategy are easily confused with implementation 
> classes of RestartStrategy
> -
>
> Key: FLINK-15307
> URL: https://issues.apache.org/jira/browse/FLINK-15307
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Andrew.D.lin
>Assignee: Andrew.D.lin
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.11.0
>
> Attachments: image-2019-12-18-14-59-03-181.png
>
>  Time Spent: 10m
>  Remaining Estimate: 24h
>
> Subclasses of RestartStrategy
>  * FailingRestartStrategy
>  * FailureRateRestartStrategy
>  * FixedDelayRestartStrategy
>  * InfiniteDelayRestartStrategy
> Implementation class of FailoverStrategy
>  * AdaptedRestartPipelinedRegionStrategyNG
>  * RestartAllStrategy
>  * RestartIndividualStrategy
>  * RestartPipelinedRegionStrategy
>  
> FailoverStrategy describes how the job computation recovers from task 
> failures.
> I think the following names may be easier to understand and easier to 
> distinguish:
> Implementation class of FailoverStrategy
>  * AdaptedPipelinedRegionFailoverStrategyNG
>  * FailoverAllStrategy
>  * FailoverIndividualStrategy
>  * FailoverPipelinedRegionStrategy
> FailoverStrategy is currently generated by configuration. If we change the 
> name of the implementation class, it will not affect compatibility.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


<    2   3   4   5   6   7   8   9   10   11   >