Re: [Discuss] FLIP-407: Improve Flink Client performance in interactive scenarios

2024-01-06 Thread xiangyu feng
Hi all,

Thanks for the comments.

If there is no further comment, we will open the voting thread next week.

Regards,
Xiangyu

Zhanghao Chen  于2024年1月3日周三 16:46写道:

> Thanks for driving this effort on improving the interactive use experience
> of Flink. The proposal overall looks good to me.
>
> Best,
> Zhanghao Chen
> 
> From: xiangyu feng 
> Sent: Tuesday, December 26, 2023 16:51
> To: dev@flink.apache.org 
> Subject: [Discuss] FLIP-407: Improve Flink Client performance in
> interactive scenarios
>
> Hi devs,
>
> I'm opening this thread to discuss FLIP-407: Improve Flink Client
> performance in interactive scenarios. The POC test results and design doc
> can be found at: FLIP-407
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-407%3A+Improve+Flink+Client+performance+when+interacting+with+dedicated+Flink+Session+Clusters
> >
> .
>
> Currently, Flink Client is mainly designed for one time interaction with
> the Flink Cluster. All the resources(http connections, threads, ha
> services) and instances(ClusterDescriptor, ClusterClient, RestClient) are
> created and recycled for each interaction. This works well when users do
> not need to interact frequently with Flink Cluster and also saves resource
> usage since resources are recycled immediately after each usage.
>
> However, in OLAP or StreamingWarehouse scenarios, users might submit
> interactive jobs to a dedicated Flink Session Cluster very often. In this
> case, we find that for short queries that can finish in less than 1s in
> Flink Cluster will still have E2E latency greater than 2s. Hence, we
> propose this FLIP to improve the Flink Client performance in this scenario.
> This could also improve the user experience when using session debug mode.
>
> The major change in this FLIP is that there will be a new introduced option
> *'execution.interactive-client'*. When this option is enabled, Flink
> Client will reuse all the necessary resources to improve interactive
> performance, including: HA Services, HTTP connections, threads and all
> kinds of instances related to a long-running Flink Cluster. The default
> value of this option will be false, then Flink Client will behave as
> before.
>
> Also, this FLIP proposed a configurable RetryStrategy when fetching results
> from client-side to Flink Cluster. In interactive scenarios, this can save
> more than 15% of TM CPU usage without performance degradation.
>
> Looking forward to your feedback, thanks.
>
> Best regards,
> Xiangyu
>


[jira] [Created] (FLINK-34008) Taskmanager blocked with deserialization when starting

2024-01-06 Thread chenyuzhi (Jira)
chenyuzhi created FLINK-34008:
-

 Summary: Taskmanager blocked with deserialization when starting
 Key: FLINK-34008
 URL: https://issues.apache.org/jira/browse/FLINK-34008
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.15.2
Reporter: chenyuzhi


When starting taskmanager in kubernetes cluster,  some task will block with 
deserialization. 

However, th jobmanager will not consider the taskmanager heartbeat timeout, so 
the entire Job is in a Block state and cannot be checkpointed.

 

Thread dumb(For taskmanager):

 

 
{code:java}
2024-01-06 11:37:53Full thread dump Java HotSpot(TM) 64-Bit Server VM 
(25.202-b08 mixed mode):
"flink-akka.actor.default-dispatcher-19" #339 prio=5 os_prio=0 
tid=0x7f9ca84d6000 nid=0x5c8 waiting on condition [0x7f9c831fd000]   
java.lang.Thread.State: TIMED_WAITING (parking) at 
sun.misc.Unsafe.park(Native Method)  - parking to wait for  
<0xd7d7d2d0> (a 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) at 
java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)  at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)  at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
   Locked ownable synchronizers:- None
"flink-akka.actor.default-dispatcher-18" #338 prio=5 os_prio=0 
tid=0x7f9c8c144000 nid=0x5c7 waiting on condition [0x7f9c8857d000]   
java.lang.Thread.State: WAITING (parking)   at sun.misc.Unsafe.park(Native 
Method)  - parking to wait for  <0xd7d7d2d0> (a 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) at 
java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)  at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)  at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
   Locked ownable synchronizers:- None
"Attach Listener" #334 daemon prio=9 os_prio=0 tid=0x7f9cbae06000 nid=0x59e 
waiting on condition [0x]   java.lang.Thread.State: RUNNABLE
   Locked ownable synchronizers:- None
"OkHttp WebSocket https://k596.elk.x.netease.com:6443/...; #323 prio=5 
os_prio=0 tid=0x7f9ca9611800 nid=0x551 waiting on condition 
[0x7f9c837fc000]   java.lang.Thread.State: TIMED_WAITING (parking)   at 
sun.misc.Unsafe.park(Native Method)  - parking to wait for  
<0xe7101010> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)   at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
   at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
 at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)   
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:- None
"flink-taskexecutor-io-thread-3" #316 daemon prio=5 os_prio=0 
tid=0x7f9c88584800 nid=0x53c waiting on condition [0x7f9cb0ffc000]   
java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native 
Method)  - parking to wait for  <0xd8027900> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)   at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)  at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)   
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:- None
"flink-taskexecutor-io-thread-2" #312 daemon prio=5 os_prio=0 
tid=0x7f9c9e00a800 nid=0x52d waiting on condition [0x7f9c9dffc000]   
java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native 
Method)  - parking to wait for  <0xd8027900> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)   at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at