[jira] [Commented] (FLINK-26548) the source parallelism is not set correctly with AdaptiveBatchScheduler

2022-03-09 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503510#comment-17503510
 ] 

zl commented on FLINK-26548:


Hi [~wanglijie95] , can you confirm this problem? 

> the source parallelism is not set correctly with AdaptiveBatchScheduler
> ---
>
> Key: FLINK-26548
> URL: https://issues.apache.org/jira/browse/FLINK-26548
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.15.0
>Reporter: zl
>Priority: Major
> Attachments: image-2022-03-09-19-00-18-396.png
>
>
> When running *_org.apache.flink.table.tpcds.TpcdsTestProgram_* with 
> {_}*AdaptiveBatchScheduler*{_}, I ran into a problem:the num of records sent 
> by the source operator is always 1, and the parallelism of source operator is 
> also 1 even I set 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8.
> !image-2022-03-09-19-00-18-396.png!
> After some research, I found that the operator A is not the actual file 
> reader, it just splits files and assigns splits to downstream tasks for 
> further processing, and the operator B is the actual file reader task. Here, 
> the parallelism of operator B is 64, and the records sent by operator A is 1, 
> this means, operator A assigned all splits to a task of operator B, {*}_the 
> other 63 tasks of operator B is idle_{*}, it is unreasonable.
> In this case,  the parallelism of operator B should be 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_*  and the 
> num of records sent by operator A also should be 
> {*}_jobmanager.adaptive-batch-scheduler.default-source-parallelism_{*}.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26548) the source parallelism is not set correctly with AdaptiveBatchScheduler

2022-03-10 Thread Lijie Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504120#comment-17504120
 ] 

Lijie Wang commented on FLINK-26548:


Hi [~Leo Zhou] , thanks for trying it.

-> and the parallelism of source operator is also 1 even I set 
*_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8

The "Custom File source" is a non-parallel source, it's parallelism must be 1.

 -> the num of records sent by the source operator is always 1

I think this is caused by FLINK-26576, and {{env.getParallelism()}} is *-1* 
when using adaptive batch scheduler. 

 

Currently, the adaptive batch scheduler can't know which operator is the actual 
source. It can only be judged by number of inputs, if there are no inputs, it 
is considered as a source. 
And I think this problem only occurs when using legacy source, because the new 
source(FLIP-27) will not have two operators.

> the source parallelism is not set correctly with AdaptiveBatchScheduler
> ---
>
> Key: FLINK-26548
> URL: https://issues.apache.org/jira/browse/FLINK-26548
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.15.0
>Reporter: zl
>Priority: Major
> Attachments: image-2022-03-09-19-00-18-396.png
>
>
> When running *_org.apache.flink.table.tpcds.TpcdsTestProgram_* with 
> {_}*AdaptiveBatchScheduler*{_}, I ran into a problem:the num of records sent 
> by the source operator is always 1, and the parallelism of source operator is 
> also 1 even I set 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8.
> !image-2022-03-09-19-00-18-396.png!
> After some research, I found that the operator A is not the actual file 
> reader, it just splits files and assigns splits to downstream tasks for 
> further processing, and the operator B is the actual file reader task. Here, 
> the parallelism of operator B is 64, and the records sent by operator A is 1, 
> this means, operator A assigned all splits to a task of operator B, {*}_the 
> other 63 tasks of operator B is idle_{*}, it is unreasonable.
> In this case,  the parallelism of operator B should be 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_*  and the 
> num of records sent by operator A also should be 
> {*}_jobmanager.adaptive-batch-scheduler.default-source-parallelism_{*}.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26548) the source parallelism is not set correctly with AdaptiveBatchScheduler

2022-03-10 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504182#comment-17504182
 ] 

zl commented on FLINK-26548:


Hi [~wanglijie95],  thanks for replying.

 

-> this problem only occurs when using legacy file sources, other sources and 
the new source(FLIP-27) will not have two operators.

yes,  this problem only occurs when using legacy file sources

 

We can solve this problem in this way: before create 
*_{{ContinuousFileMonitoringFunction}}_ ,* we get the value of 
*_default.parallelism_* , if the value is -1 and AdaptiveBatchScheduler is 
enabled, we pass the value of 
*_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* {_}to 
*{{ContinuousFileMonitoringFunction.}}*{_}{{{}Then we need to set the 
parallelism of real source reader to  
*_jobmanager.adaptive-batch-scheduler.default-source-parallelism._*{}}}

> the source parallelism is not set correctly with AdaptiveBatchScheduler
> ---
>
> Key: FLINK-26548
> URL: https://issues.apache.org/jira/browse/FLINK-26548
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.15.0
>Reporter: zl
>Priority: Blocker
> Fix For: 1.15.0
>
> Attachments: image-2022-03-09-19-00-18-396.png
>
>
> When running *_org.apache.flink.table.tpcds.TpcdsTestProgram_* with 
> {_}*AdaptiveBatchScheduler*{_}, I ran into a problem:the num of records sent 
> by the source operator is always 1, and the parallelism of source operator is 
> also 1 even I set 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8.
> !image-2022-03-09-19-00-18-396.png!
> After some research, I found that the operator A is not the actual file 
> reader, it just splits files and assigns splits to downstream tasks for 
> further processing, and the operator B is the actual file reader task. Here, 
> the parallelism of operator B is 64, and the records sent by operator A is 1, 
> this means, operator A assigned all splits to a task of operator B, {*}_the 
> other 63 tasks of operator B is idle_{*}, it is unreasonable.
> In this case,  the parallelism of operator B should be 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_*  and the 
> num of records sent by operator A also should be 
> {*}_jobmanager.adaptive-batch-scheduler.default-source-parallelism_{*}.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26548) the source parallelism is not set correctly with AdaptiveBatchScheduler

2022-03-10 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504192#comment-17504192
 ] 

Zhu Zhu commented on FLINK-26548:
-

Thanks for reporting this issue and offering to fix it! [~Leo Zhou]
I have assigned the ticket to you.

> the source parallelism is not set correctly with AdaptiveBatchScheduler
> ---
>
> Key: FLINK-26548
> URL: https://issues.apache.org/jira/browse/FLINK-26548
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.15.0
>Reporter: zl
>Assignee: zl
>Priority: Blocker
> Fix For: 1.15.0
>
> Attachments: image-2022-03-09-19-00-18-396.png
>
>
> When running *_org.apache.flink.table.tpcds.TpcdsTestProgram_* with 
> {_}*AdaptiveBatchScheduler*{_}, I ran into a problem:the num of records sent 
> by the source operator is always 1, and the parallelism of source operator is 
> also 1 even I set 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8.
> !image-2022-03-09-19-00-18-396.png!
> After some research, I found that the operator A is not the actual file 
> reader, it just splits files and assigns splits to downstream tasks for 
> further processing, and the operator B is the actual file reader task. Here, 
> the parallelism of operator B is 64, and the records sent by operator A is 1, 
> this means, operator A assigned all splits to a task of operator B, {*}_the 
> other 63 tasks of operator B is idle_{*}, it is unreasonable.
> In this case,  the parallelism of operator B should be 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_*  and the 
> num of records sent by operator A also should be 
> {*}_jobmanager.adaptive-batch-scheduler.default-source-parallelism_{*}.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26548) the source parallelism is not set correctly with AdaptiveBatchScheduler

2022-03-10 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504241#comment-17504241
 ] 

zl commented on FLINK-26548:


Hi [~zhuzh] , thanks for assiging this ticket to me. I have fixed this issue 
via [PR-19040|https://github.com/apache/flink/pull/19040].

> the source parallelism is not set correctly with AdaptiveBatchScheduler
> ---
>
> Key: FLINK-26548
> URL: https://issues.apache.org/jira/browse/FLINK-26548
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.15.0
>Reporter: zl
>Assignee: zl
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.15.0
>
> Attachments: image-2022-03-09-19-00-18-396.png
>
>
> When running *_org.apache.flink.table.tpcds.TpcdsTestProgram_* with 
> {_}*AdaptiveBatchScheduler*{_}, I ran into a problem:the num of records sent 
> by the source operator is always 1, and the parallelism of source operator is 
> also 1 even I set 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8.
> !image-2022-03-09-19-00-18-396.png!
> After some research, I found that the operator A is not the actual file 
> reader, it just splits files and assigns splits to downstream tasks for 
> further processing, and the operator B is the actual file reader task. Here, 
> the parallelism of operator B is 64, and the records sent by operator A is 1, 
> this means, operator A assigned all splits to a task of operator B, {*}_the 
> other 63 tasks of operator B is idle_{*}, it is unreasonable.
> In this case,  the parallelism of operator B should be 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_*  and the 
> num of records sent by operator A also should be 
> {*}_jobmanager.adaptive-batch-scheduler.default-source-parallelism_{*}.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26548) the source parallelism is not set correctly with AdaptiveBatchScheduler

2022-03-15 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507075#comment-17507075
 ] 

Zhu Zhu commented on FLINK-26548:
-

As discussed in #19040, a proper fix of this problem would be to rework 
`StreamExecutionEnvironment#createFileInput(...)` to use new sources(FLIP-27). 
We would mark this problem as a known issue of AdaptiveBatchScheduler and 
postpone the fix to 1.16 (possibility will back port it to 1.15.1).

> the source parallelism is not set correctly with AdaptiveBatchScheduler
> ---
>
> Key: FLINK-26548
> URL: https://issues.apache.org/jira/browse/FLINK-26548
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.15.0
>Reporter: zl
>Assignee: zl
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: image-2022-03-09-19-00-18-396.png
>
>
> When running *_org.apache.flink.table.tpcds.TpcdsTestProgram_* with 
> {_}*AdaptiveBatchScheduler*{_}, I ran into a problem:the num of records sent 
> by the source operator is always 1, and the parallelism of source operator is 
> also 1 even I set 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8.
> !image-2022-03-09-19-00-18-396.png!
> After some research, I found that the operator A is not the actual file 
> reader, it just splits files and assigns splits to downstream tasks for 
> further processing, and the operator B is the actual file reader task. Here, 
> the parallelism of operator B is 64, and the records sent by operator A is 1, 
> this means, operator A assigned all splits to a task of operator B, {*}_the 
> other 63 tasks of operator B is idle_{*}, it is unreasonable.
> In this case,  the parallelism of operator B should be 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_*  and the 
> num of records sent by operator A also should be 
> {*}_jobmanager.adaptive-batch-scheduler.default-source-parallelism_{*}.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)