Re: [Discuss] FLIP-407: Improve Flink Client performance in interactive scenarios
Hi all, Thanks for the comments. If there is no further comment, we will open the voting thread next week. Regards, Xiangyu Zhanghao Chen 于2024年1月3日周三 16:46写道: > Thanks for driving this effort on improving the interactive use experience > of Flink. The proposal overall looks good to me. > > Best, > Zhanghao Chen > > From: xiangyu feng > Sent: Tuesday, December 26, 2023 16:51 > To: dev@flink.apache.org > Subject: [Discuss] FLIP-407: Improve Flink Client performance in > interactive scenarios > > Hi devs, > > I'm opening this thread to discuss FLIP-407: Improve Flink Client > performance in interactive scenarios. The POC test results and design doc > can be found at: FLIP-407 > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-407%3A+Improve+Flink+Client+performance+when+interacting+with+dedicated+Flink+Session+Clusters > > > . > > Currently, Flink Client is mainly designed for one time interaction with > the Flink Cluster. All the resources(http connections, threads, ha > services) and instances(ClusterDescriptor, ClusterClient, RestClient) are > created and recycled for each interaction. This works well when users do > not need to interact frequently with Flink Cluster and also saves resource > usage since resources are recycled immediately after each usage. > > However, in OLAP or StreamingWarehouse scenarios, users might submit > interactive jobs to a dedicated Flink Session Cluster very often. In this > case, we find that for short queries that can finish in less than 1s in > Flink Cluster will still have E2E latency greater than 2s. Hence, we > propose this FLIP to improve the Flink Client performance in this scenario. > This could also improve the user experience when using session debug mode. > > The major change in this FLIP is that there will be a new introduced option > *'execution.interactive-client'*. When this option is enabled, Flink > Client will reuse all the necessary resources to improve interactive > performance, including: HA Services, HTTP connections, threads and all > kinds of instances related to a long-running Flink Cluster. The default > value of this option will be false, then Flink Client will behave as > before. > > Also, this FLIP proposed a configurable RetryStrategy when fetching results > from client-side to Flink Cluster. In interactive scenarios, this can save > more than 15% of TM CPU usage without performance degradation. > > Looking forward to your feedback, thanks. > > Best regards, > Xiangyu >
[jira] [Created] (FLINK-34008) Taskmanager blocked with deserialization when starting
chenyuzhi created FLINK-34008: - Summary: Taskmanager blocked with deserialization when starting Key: FLINK-34008 URL: https://issues.apache.org/jira/browse/FLINK-34008 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.15.2 Reporter: chenyuzhi When starting taskmanager in kubernetes cluster, some task will block with deserialization. However, th jobmanager will not consider the taskmanager heartbeat timeout, so the entire Job is in a Block state and cannot be checkpointed. Thread dumb(For taskmanager): {code:java} 2024-01-06 11:37:53Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.202-b08 mixed mode): "flink-akka.actor.default-dispatcher-19" #339 prio=5 os_prio=0 tid=0x7f9ca84d6000 nid=0x5c8 waiting on condition [0x7f9c831fd000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xd7d7d2d0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Locked ownable synchronizers:- None "flink-akka.actor.default-dispatcher-18" #338 prio=5 os_prio=0 tid=0x7f9c8c144000 nid=0x5c7 waiting on condition [0x7f9c8857d000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xd7d7d2d0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Locked ownable synchronizers:- None "Attach Listener" #334 daemon prio=9 os_prio=0 tid=0x7f9cbae06000 nid=0x59e waiting on condition [0x] java.lang.Thread.State: RUNNABLE Locked ownable synchronizers:- None "OkHttp WebSocket https://k596.elk.x.netease.com:6443/...; #323 prio=5 os_prio=0 tid=0x7f9ca9611800 nid=0x551 waiting on condition [0x7f9c837fc000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xe7101010> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Locked ownable synchronizers:- None "flink-taskexecutor-io-thread-3" #316 daemon prio=5 os_prio=0 tid=0x7f9c88584800 nid=0x53c waiting on condition [0x7f9cb0ffc000] java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xd8027900> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Locked ownable synchronizers:- None "flink-taskexecutor-io-thread-2" #312 daemon prio=5 os_prio=0 tid=0x7f9c9e00a800 nid=0x52d waiting on condition [0x7f9c9dffc000] java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xd8027900> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at