[ 
https://issues.apache.org/jira/browse/FLINK-38970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18054194#comment-18054194
 ] 

Roman Khachatryan commented on FLINK-38970:
-------------------------------------------

The problem seems to be caused by mainThreadExecutor implementation that's used 
in test: it is a direct executor, meaning that 

callbacks in `Execution#deploy` that are meant to be executed on the "main" 
thread, are executed on the IO/future executor.

For example:
{code:java}
            final CompletableFuture<TaskDeploymentDescriptor> 
taskDeploymentDescriptorFuture =
                    CompletableFuture.supplyAsync(
                                    initOffloadedTaskRestoreRef(
                                            // Passing a copy of the task 
restore from the
                                            // main thread to the I/O executor 
in order to
                                            // avoid synchronization issues
                                            taskRestore, 
maybeOffloadedTaskRestoreCleanupRef),
                                    executor)
                            // back to main thread because this accesses 
execution graph
                            // internals
                            .thenComposeAsync(
                                    tryGetTaskDeploymentDescriptorForSlot(slot),
                                    jobMasterMainThreadExecutor); {code}
When I replace futureExecutor with `new DirectScheduledExecutorService()`, this 
problem goes away.

Alternatively, if I waiting for the 1st future in the main thread also solves 
the problem.

> AdaptiveBatchSchedulerTest is unstable
> --------------------------------------
>
>                 Key: FLINK-38970
>                 URL: https://issues.apache.org/jira/browse/FLINK-38970
>             Project: Flink
>          Issue Type: Bug
>          Components: Tests
>    Affects Versions: 2.3.0
>            Reporter: Roman Khachatryan
>            Priority: Major
>
> When running locally, testAdaptiveBatchScheduler fails 1270 out of 3000 runs 
> with
> {code:java}
> org.opentest4j.AssertionFailedError: 
> expected: 10
>  but was: -1
> Expected :10
> Actual   :-1
>       at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerTest.testAdaptiveBatchScheduler(AdaptiveBatchSchedulerTest.java:144)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:580)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:387)
>       at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
>       at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
>       at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
>       at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
>       at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
>  {code}
> With logging enabled, I see that some operations are executed on the wrong 
> thread:
> {code:java}
> 7083 [pool-5116-thread-1] WARN  
> org.apache.flink.runtime.rpc.MainThreadValidatorUtil [] - Violation of main 
> thread constraint detected: expected 
> <Thread[#5153,ForkJoinPool-2558-worker-1,5,main]> but running in 
> <Thread[#5154,pool-5116-thread-1,5,main]>.
> java.lang.Exception: Violation of main thread constraint detected: expected 
> <Thread[#5153,ForkJoinPool-2558-worker-1,5,main]> but running in 
> <Thread[#5154,pool-5116-thread-1,5,main]>.
>       at 
> org.apache.flink.runtime.rpc.MainThreadValidatorUtil.isRunningInExpectedThread(MainThreadValidatorUtil.java:73)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter$1.execute(ComponentMainThreadExecutorServiceAdapter.java:66)
>  ~[test-classes/:?]
>       at 
> org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.execute(ComponentMainThreadExecutorServiceAdapter.java:128)
>  ~[test-classes/:?]
>       at 
> java.base/java.util.concurrent.CompletableFuture$UniCompletion.claim(CompletableFuture.java:572)
>  ~[?:?]
>       at 
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1147)
>  ~[?:?]
>       at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
>  ~[?:?]
>       at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1773)
>  ~[?:?]
>       at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
>  ~[?:?]
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
>  ~[?:?]
>       at 
> java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317) 
> ~[?:?]
>       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:?]
>       at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>  ~[?:?]
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>  ~[?:?]
>       at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to