XComp opened a new pull request, #24252:
URL: https://github.com/apache/flink/pull/24252

   ## What is the purpose of the change
   
   There was a test failure in [this 
build](https://github.com/XComp/flink/actions/runs/7745486791/job/21121947504#step:14:7309)
 which was testing FLINK-34333 (the 1.18 backport of FLINK-34007). 
`KubernetesLeaderElectorITCase.testMultipleKubernetesLeaderElectors` failed 
with the following error:
   ```
   18:38:52,290 [KubernetesLeaderElector-ExecutorService-thread-1] ERROR 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - 
Exception occurred while releasing lock 'ConfigMapLock: default - 
kubernetesleaderelectoritcase-configmap-2e3f2dee-c41e-4f6f-830e-426018db34
   a7 (406ed8e6-5cc6-4777-81b3-2ca706b83d72)' on cancel
   io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  
for kind: [ConfigMap]  with name: 
[kubernetesleaderelectoritcase-configmap-2e3f2dee-c41e-4f6f-830e-426018db34a7]  
in namespace: [default]  failed.
           at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:159)
 ~[kubernetes-client-api-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.requireFromServer(BaseOperation.java:194)
 ~[kubernetes-client-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.get(BaseOperation.java:148)
 ~[kubernetes-client-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.get(BaseOperation.java:97)
 ~[kubernetes-client-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ResourceLock.get(ResourceLock.java:49)
 ~[kubernetes-client-api-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.release(LeaderElector.java:148)
 ~[kubernetes-client-api-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.stopLeading(LeaderElector.java:126)
 ~[kubernetes-client-api-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$null$1(LeaderElector.java:96)
 ~[kubernetes-client-api-6.9.2.jar:?]
           at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_402]
           at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792)
 ~[?:1.8.0_402]
           at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153)
 ~[?:1.8.0_402]
           at 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$start$2(LeaderElector.java:95)
 ~[kubernetes-client-api-6.9.2.jar:?]
           at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_402]
           at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_402]
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
~[?:1.8.0_402]
           at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 
~[?:1.8.0_402]
           at 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$acquire$4(LeaderElector.java:173)
 ~[kubernetes-client-api-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$loop$8(LeaderElector.java:282)
 ~[kubernetes-client-api-6.9.2.jar:?]
           at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
 [?:1.8.0_402]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_402]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_402]
           at java.lang.Thread.run(Thread.java:750) [?:1.8.0_402]
   Caused by: java.io.InterruptedIOException
           at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:494)
 ~[kubernetes-client-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:524)
 ~[kubernetes-client-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleGet(OperationSupport.java:467)
 ~[kubernetes-client-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleGet(BaseOperation.java:791)
 ~[kubernetes-client-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.requireFromServer(BaseOperation.java:192)
 ~[kubernetes-client-6.9.2.jar:?]
           ... 20 more
   Caused by: java.lang.InterruptedException
           at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347) 
~[?:1.8.0_402]
           at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 
~[?:1.8.0_402]
           at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:491)
 ~[kubernetes-client-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:524)
 ~[kubernetes-client-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleGet(OperationSupport.java:467)
 ~[kubernetes-client-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleGet(BaseOperation.java:791)
 ~[kubernetes-client-6.9.2.jar:?]
           at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.requireFromServer(BaseOperation.java:192)
 ~[kubernetes-client-6.9.2.jar:?]
           ... 20 more
   18:38:52,382 [ForkJoinPool-1-worker-1] ERROR 
org.apache.flink.util.TestLoggerExtension                    [] - 
   
--------------------------------------------------------------------------------
   Test 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElectorITCase.testMultipleKubernetesLeaderElectors[testMultipleKubernetesLeaderElectors()]
 failed with:
   java.lang.IllegalStateException: All tasks that handle the leadership 
revocation should have been executed.
           at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
           at 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector.stop(KubernetesLeaderElector.java:152)
           at 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElectorITCase.testMultipleKubernetesLeaderElectors(KubernetesLeaderElectorITCase.java:124)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
           at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
           at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
           at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
           at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
           at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
           at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
           at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
           at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
           at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
           at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
           at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
           at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
           at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
           at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
           at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
           at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
           at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
           at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
           at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
           at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
           at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
           at 
org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185)
           at 
org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.executeNonConcurrentTasks(ForkJoinPoolHierarchicalTestExecutorService.java:155)
           at 
org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.invokeAll(ForkJoinPoolHierarchicalTestExecutorService.java:135)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
           at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
           at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
           at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
           at 
org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185)
           at 
org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService.invokeAll(ForkJoinPoolHierarchicalTestExecutorService.java:129)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
           at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
           at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
           at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
           at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
           at 
org.junit.platform.engine.support.hierarchical.ForkJoinPoolHierarchicalTestExecutorService$ExclusiveTask.compute(ForkJoinPoolHierarchicalTestExecutorService.java:185)
           at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
           at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
           at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
           at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
           at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
   ```
   
   My reasoning here: It looks like the `shutdownNow` call on the 
executorService is too aggressive. It tries to interrupt the thread. If the 
interruption happens while still processing the `LeaderElector#stopLeading` 
call, it might be that the leadership loss event is not sent to the client. The 
Precondition check that follow with the task always being empty seems to cause 
the test to fail which results in the interruption of the HttpResponse related 
to releasing the `ConfigMapLock` (that's the `InterruptedIOException` we see 
which happens in a separate `CachedSingleThreadScheduler` within the internally 
used HttpClient).
   
   ## Brief change log
   
   * Replace `#shutdownNow` with `ExecutorUtils#shutdownGracefully`
   * Make sure that only the current leadership event is properly handled by 
introducing a state in the `KubernetesLeaderElector`. All subsequent (if any 
appear) should be ignored.
   
   ## Verifying this change
   
   Unfortunately, I couldn't come up with a test scenario that controls the 
`#stopLeading` callback within the `LeaderElector`.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to