[ 
https://issues.apache.org/jira/browse/FLINK-25937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489997#comment-17489997
 ] 

Gen Luo commented on FLINK-25937:
---------------------------------

The reason has been identified. 

Parallelism of a transformation with default parallelism(-1) is set when 
transforming, using the default parallelism set in the environment. However, in 
SinkExpander#expand, the environment parallelism is set to -1 at the entrance, 
to verify if the parallelism of a expanded transformation is set. The 
environment parallelism will be restored when exiting the method, but at 
present the transform is called within this scope. If the parallelism of a sink 
is not set, the parallelism of the sink transformation and all transformations 
expanded from it will not be handled, so the JobGraph generated will have 
vertices with -1 parallelism, causing the assertion failure in 
AdaptiveScheduler.

We can fix the bug by putting the restoring of the environment parallelism 
ahead of transforming the sink transformations. The pull request has been 
created, and has been verified with UpsertKafkaTableITCase.

> SQL Client end-to-end test e2e fails on AZP
> -------------------------------------------
>
>                 Key: FLINK-25937
>                 URL: https://issues.apache.org/jira/browse/FLINK-25937
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Coordination, 
> Table SQL / API
>    Affects Versions: 1.15.0
>            Reporter: Till Rohrmann
>            Assignee: Gen Luo
>            Priority: Blocker
>              Labels: pull-request-available, test-stability
>
> The {{SQL Client end-to-end test}} e2e tests fails on AZP when using the 
> {{AdaptiveScheduler}} because the scheduler expects that the parallelism is 
> set for all vertices:
> {code}
> Feb 03 03:45:13 org.apache.flink.runtime.client.JobInitializationException: 
> Could not start the JobMaster.
> Feb 03 03:45:13       at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
> Feb 03 03:45:13       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> Feb 03 03:45:13       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> Feb 03 03:45:13       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> Feb 03 03:45:13       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
> Feb 03 03:45:13       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> Feb 03 03:45:13       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> Feb 03 03:45:13       at java.lang.Thread.run(Thread.java:748)
> Feb 03 03:45:13 Caused by: java.util.concurrent.CompletionException: 
> java.lang.IllegalStateException: The adaptive scheduler expects the 
> parallelism being set for each JobVertex (violated JobVertex: 
> f74b775b58627a33e46b8c155b320255).
> Feb 03 03:45:13       at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> Feb 03 03:45:13       at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> Feb 03 03:45:13       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> Feb 03 03:45:13       ... 3 more
> Feb 03 03:45:13 Caused by: java.lang.IllegalStateException: The adaptive 
> scheduler expects the parallelism being set for each JobVertex (violated 
> JobVertex: f74b775b58627a33e46b8c155b320255).
> Feb 03 03:45:13       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> Feb 03 03:45:13       at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.assertPreconditions(AdaptiveScheduler.java:296)
> Feb 03 03:45:13       at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.<init>(AdaptiveScheduler.java:230)
> Feb 03 03:45:13       at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerFactory.createInstance(AdaptiveSchedulerFactory.java:122)
> Feb 03 03:45:13       at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:115)
> Feb 03 03:45:13       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
> Feb 03 03:45:13       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:322)
> Feb 03 03:45:13       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106)
> Feb 03 03:45:13       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94)
> Feb 03 03:45:13       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
> Feb 03 03:45:13       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> Feb 03 03:45:13       ... 3 more
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30662&view=logs&j=fb37c667-81b7-5c22-dd91-846535e99a97&t=39a035c3-c65e-573c-fb66-104c66c28912&l=5782



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to