XComp opened a new pull request, #24252: URL: https://github.com/apache/flink/pull/24252
## What is the purpose of the change There was a test failure in [this build](https://github.com/XComp/flink/actions/runs/7745486791/job/21121947504#step:14:7309) which was testing FLINK-34333 (the 1.18 backport of FLINK-34007). `KubernetesLeaderElectorITCase.testMultipleKubernetesLeaderElectors` failed with the following error: ``` 18:38:52,290 [KubernetesLeaderElector-ExecutorService-thread-1] ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while releasing lock 'ConfigMapLock: default - kubernetesleaderelectoritcase-configmap-2e3f2dee-c41e-4f6f-830e-426018db34 a7 (406ed8e6-5cc6-4777-81b3-2ca706b83d72)' on cancel io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [ConfigMap] with name: [kubernetesleaderelectoritcase-configmap-2e3f2dee-c41e-4f6f-830e-426018db34a7] in namespace: [default] failed. at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:159) ~[kubernetes-client-api-6.9.2.jar:?] at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.requireFromServer(BaseOperation.java:194) ~[kubernetes-client-6.9.2.jar:?] at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.get(BaseOperation.java:148) ~[kubernetes-client-6.9.2.jar:?] at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.get(BaseOperation.java:97) ~[kubernetes-client-6.9.2.jar:?] at io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ResourceLock.get(ResourceLock.java:49) ~[kubernetes-client-api-6.9.2.jar:?] at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.release(LeaderElector.java:148) ~[kubernetes-client-api-6.9.2.jar:?] at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.stopLeading(LeaderElector.java:126) ~[kubernetes-client-api-6.9.2.jar:?] at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$null$1(LeaderElector.java:96) ~[kubernetes-client-api-6.9.2.jar:?] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_402] at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) ~[?:1.8.0_402] at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) ~[?:1.8.0_402] at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$start$2(LeaderElector.java:95) ~[kubernetes-client-api-6.9.2.jar:?] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_402] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_402] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_402] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) ~[?:1.8.0_402] at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$acquire$4(LeaderElector.java:173) ~[kubernetes-client-api-6.9.2.jar:?] at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$loop$8(LeaderElector.java:282) ~[kubernetes-client-api-6.9.2.jar:?] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) [?:1.8.0_402] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_402] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_402] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_402] Caused by: java.io.InterruptedIOException at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:494) ~[kubernetes-client-6.9.2.jar:?] at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:524) ~[kubernetes-client-6.9.2.jar:?] at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleGet(OperationSupport.java:467) ~[kubernetes-client-6.9.2.jar:?] at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleGet(BaseOperation.java:791) ~[kubernetes-client-6.9.2.jar:?] at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.requireFromServer(BaseOperation.java:192) ~[kubernetes-client-6.9.2.jar:?] ... 20 more Caused by: java.lang.InterruptedException at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347) ~[?:1.8.0_402] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_402] at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:491) ~[kubernetes-client-6.9.2.jar:?] at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:524) ~[kubernetes-client-6.9.2.jar:?] at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleGet(OperationSupport.java:467) ~[kubernetes-client-6.9.2.jar:?] at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleGet(BaseOperation.java:791) ~[kubernetes-client-6.9.2.jar:?] at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.requireFromServer(BaseOperation.java:192) ~[kubernetes-client-6.9.2.jar:?] ... 20 more 18:38:52,382 [ForkJoinPool-1-worker-1] ERROR org.apache.flink.util.TestLoggerExtension [] - -------------------------------------------------------------------------------- Test org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElectorITCase.testMultipleKubernetesLeaderElectors[testMultipleKubernetesLeaderElectors()] failed with: java.lang.IllegalStateException: All tasks that handle the leadership revocation should have been executed. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector.stop(KubernetesLeaderElector.java:152) at org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElectorITCase.testMultipleKubernetesLeaderElectors(KubernetesLeaderElectorITCase.java:124) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185) at org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.executeNonConcurrentTasks(ForkJoinPoolHierarchicalTestExecutorService.java:155) at org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.invokeAll(ForkJoinPoolHierarchicalTestExecutorService.java:135) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185) at org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.invokeAll(ForkJoinPoolHierarchicalTestExecutorService.java:129) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185) at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ``` My reasoning here: It looks like the `shutdownNow` call on the executorService is too aggressive. It tries to interrupt the thread. If the interruption happens while still processing the `LeaderElector#stopLeading` call, it might be that the leadership loss event is not sent to the client. The Precondition check that follow with the task always being empty seems to cause the test to fail which results in the interruption of the HttpResponse related to releasing the `ConfigMapLock` (that's the `InterruptedIOException` we see which happens in a separate `CachedSingleThreadScheduler` within the internally used HttpClient). ## Brief change log * Replace `#shutdownNow` with `ExecutorUtils#shutdownGracefully` * Make sure that only the current leadership event is properly handled by introducing a state in the `KubernetesLeaderElector`. All subsequent (if any appear) should be ignored. ## Verifying this change Unfortunately, I couldn't come up with a test scenario that controls the `#stopLeading` callback within the `LeaderElector`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org