[ 
https://issues.apache.org/jira/browse/FLINK-31168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748808#comment-17748808
 ] 

Matthias Pohl edited comment on FLINK-31168 at 7/29/23 10:10 AM:
-----------------------------------------------------------------

the suspicion is now that the process destruction is being prevented by JDK17:
{code}
01:09:49,250 [                main] ERROR 
org.apache.flink.runtime.testutils.TestJvmProcess            [] - Failed to 
forcibly destroy process
java.lang.reflect.InaccessibleObjectException: Unable to make public 
java.lang.Process java.lang.ProcessImpl.destroyForcibly() accessible: module 
java.base does not "opens java.lang" to unnamed module @19dc67c2
        at 
java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
 ~[?:?]
        at 
java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
 ~[?:?]
        at java.lang.reflect.Method.checkCanSetAccessible(Method.java:199) 
~[?:?]
        at java.lang.reflect.Method.setAccessible(Method.java:193) ~[?:?]
        at 
org.apache.flink.runtime.testutils.TestJvmProcess.destroy(TestJvmProcess.java:214)
 ~[flink-runtime-1.18-SNAPSHOT-tests.jar:1.18-SNAPSHOT]
        at 
org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure(JobManagerHAProcessFailureRecoveryITCase.java:367)
 ~[test-classes/:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
        [...]
{code}
The {{TestJvmProcess}} class actually tries to destroy the process forcefully 
first through reflection (see 
[apache/flink:org.apache.flink.runtime.testutils.TestJvmProcess:214ff|https://github.com/apache/flink/blob/2940c02c986e3d70708187091bf006806bb90dff/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java#L214])
 and calls {{destroy}} afterwards if the forced approach didn't work without 
waiting for the process to finish. That could mean that the process gets 
destroyed eventually but has enough time to finish the job in the first run 
(i.e. cleaning the jobgraph up at the end) before the second JM process is 
started.

Two interesting observations, though:
* Running the test under JDK17 locally multiple times seems to create daemon 
child processes under the intellij process (which means that some process is 
still not properly destroyed)
* The dispatcher logs of the first run don't reveal the finishing of the job. I 
have to check whether the {{Process}} class stops sending the logs to the pipe 
before triggering the process destruction (that would explain that the logs are 
not complete)


was (Author: mapohl):
the suspicion is now that JDK17 prevents the test from destroying the process:
{code}
01:09:49,250 [                main] ERROR 
org.apache.flink.runtime.testutils.TestJvmProcess            [] - Failed to 
forcibly destroy process
java.lang.reflect.InaccessibleObjectException: Unable to make public 
java.lang.Process java.lang.ProcessImpl.destroyForcibly() accessible: module 
java.base does not "opens java.lang" to unnamed module @19dc67c2
        at 
java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
 ~[?:?]
        at 
java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
 ~[?:?]
        at java.lang.reflect.Method.checkCanSetAccessible(Method.java:199) 
~[?:?]
        at java.lang.reflect.Method.setAccessible(Method.java:193) ~[?:?]
        at 
org.apache.flink.runtime.testutils.TestJvmProcess.destroy(TestJvmProcess.java:214)
 ~[flink-runtime-1.18-SNAPSHOT-tests.jar:1.18-SNAPSHOT]
        at 
org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure(JobManagerHAProcessFailureRecoveryITCase.java:367)
 ~[test-classes/:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
        [...]
{code}
The {{TestJvmProcess}} class actually tries to destroy the process forcefully 
first through reflection (see 
[apache/flink:org.apache.flink.runtime.testutils.TestJvmProcess:214ff|https://github.com/apache/flink/blob/2940c02c986e3d70708187091bf006806bb90dff/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java#L214])
 and calls {{destroy}} afterwards if the forced approach didn't work without 
waiting for the process to finish. That could mean that the process gets 
destroyed eventually but has enough time to finish the job in the first run 
(i.e. cleaning the jobgraph up at the end) before the second JM process is 
started.

Two interesting observations, though:
* Running the test under JDK17 locally multiple times seems to create daemon 
child processes under the intellij process (which means that some process is 
still not properly destroyed)
* The dispatcher logs of the first run don't reveal the finishing of the job. I 
have to check whether the {{Process}} class stops sending the logs to the pipe 
before triggering the process destruction (that would explain that the logs are 
not complete)

> JobManagerHAProcessFailureRecoveryITCase failed due to job not being found
> --------------------------------------------------------------------------
>
>                 Key: FLINK-31168
>                 URL: https://issues.apache.org/jira/browse/FLINK-31168
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.15.3, 1.16.1, 1.18.0
>            Reporter: Matthias Pohl
>            Assignee: Matthias Pohl
>            Priority: Blocker
>              Labels: test-stability
>             Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46342&view=logs&j=b0a398c0-685b-599c-eb57-c8c2a771138e&t=747432ad-a576-5911-1e2a-68c6bedc248a&l=12706
> We see this build failure because a job couldn't be found:
> {code}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error while waiting for job to be initialized
>       at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:319)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:958)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
>       at 
> org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase.testJobManagerFailure(JobManagerHAProcessFailureRecoveryITCase.java:235)
>       at 
> org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase$4.run(JobManagerHAProcessFailureRecoveryITCase.java:336)
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error while waiting for job to be initialized
>       at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>       at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
>       ... 4 more
> Caused by: java.lang.RuntimeException: Error while waiting for job to be 
> initialized
>       at 
> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
>       at 
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
>       at 
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
>       at 
> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
>       at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
>       at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
>       at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
>       at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.rest.util.RestClientException: 
> [org.apache.flink.runtime.rest.NotFoundException: Job 
> 865dcd87f4828dbeb3d93eb52e2636b1 not found
>       at 
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:99)
>       at 
> java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
>       at 
> java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
>       at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>       at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>       at 
> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.lambda$getExecutionGraphInternal$0(DefaultExecutionGraphCache.java:109)
>       at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>       at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>       at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>       at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252)
>       at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>       at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>       at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>       at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>       at 
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387)
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93)
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
>       at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>       at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>       at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>       at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>       at 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
>       at akka.dispatch.OnComplete.internal(Future.scala:299)
>       at akka.dispatch.OnComplete.internal(Future.scala:297)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
>       at 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
>       at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>       at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
>       at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)
>       at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
>       at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
>       at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
>       at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
>       at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>       at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
>       at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
>       at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
>       at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
>       at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could 
> not find Flink job (865dcd87f4828dbeb3d93eb52e2636b1)
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.requestExecutionGraphInfo(Dispatcher.java:840)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>       at akka.actor.Actor.aroundReceive(Actor.scala:537)
>       at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>       ... 5 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to