[jira] [Created] (FLINK-35123) Flink Kubernetes Operator should not do deleteHAData

2024-04-16 Thread Fei Feng (Jira)
Fei Feng created FLINK-35123:


 Summary: Flink Kubernetes Operator should not do deleteHAData 
 Key: FLINK-35123
 URL: https://issues.apache.org/jira/browse/FLINK-35123
 Project: Flink
  Issue Type: Technical Debt
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.8.0, kubernetes-operator-1.7.0
Reporter: Fei Feng
 Attachments: image-2024-04-16-15-56-33-426.png

we use flink HA based on zookeeper. when a lots of FlinkDeployment was 
deleting, operator will be spend to many time in cleanHaData. the jstack show 
that reconcile thread was hang on disconnect with zookeeper. this made deleting 
flinkdeployment was slowly. 

!image-2024-04-16-15-56-33-426.png|width=502,height=263!

 

I don't understand why flink kubernetes operator need cleanHAdata , as 
[~aitozi] comment in PR  [FLINK-26336 Call cancel on deletion & clean up 
configmaps as well 
#28|https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815968841]
{quote}it's a bit of out of scope of the operator responsibility or ability
{quote}
and I'm totally agree with his point. 

and I want to know why we call don't call RestClusterClient#shutDownCluster 
interface, which is

1. more graceful and reasonable (operator need not care whether flink app 
enable ha or not) 2. compatible across flink versions .   

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34728) operator does not need to upload and download the jar when deploying session job

2024-03-19 Thread Fei Feng (Jira)
Fei Feng created FLINK-34728:


 Summary: operator does not need to upload and download the jar 
when deploying session job
 Key: FLINK-34728
 URL: https://issues.apache.org/jira/browse/FLINK-34728
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes, Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0, kubernetes-operator-1.6.0, 
kubernetes-operator-1.5.0
Reporter: Fei Feng


By reading the source code of the sessionjob's first reconcilition in the 
session mode of the flink kubernetes operator, a clear single point of 
bottleneck can be identified. When submitting a session job, the operator needs 
to first [download the job jar from the 
jarURL|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L827]
 to the local storage of kubernetes pod , then [upload the jar to the job 
manager through the `/jars/upload` rest api 
|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L842],
 and finally call the `/jars/:jarid/run` rest api to launch the job.

In this process, the operator needs to first download the jar and then upload 
the jar. When multiple jobs are submitted to the session cluster 
simultaneously, the operator can become a single point of bottleneck, which may 
be limited by the network traffic or other resource constraints of the operator 
pod.

We can modify the job submission process in the session mode. The jobmanager 
can provide a `/jars/run` rest api that supports self-downloading the job jar, 
and the operator only needs to send a rest request to submit the job, without 
download and upload the job jar. In this way, the submission pressure of the 
operator can be distributed to each job manager. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34726) Flink Kubernetes Operator has some room for optimizing performance.

2024-03-18 Thread Fei Feng (Jira)
Fei Feng created FLINK-34726:


 Summary: Flink Kubernetes Operator has some room for optimizing 
performance.
 Key: FLINK-34726
 URL: https://issues.apache.org/jira/browse/FLINK-34726
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0, kubernetes-operator-1.6.0, 
kubernetes-operator-1.5.0
Reporter: Fei Feng
 Attachments: operator_no_submit_no_kill.flamegraph.html

When there is a huge number of FlinkDeployment and FlinkSessionJob in a 
kubernetes cluster, there will be a significant delay between event submit into 
reconcile thread pool and  event is processed. 

this is our test:we give operator enough resource(cpu: 10core, memory: 20g, 
reconcile thread pool  size was 200 ) and we deployed 1 jobs firstly (one 
FlinkDeployment and one SessionJob per job) , then we do submit/delete job 
tests. we found that 
1. it cost about 2min between create new FlinkDeployment and FlinkSessionJob CR 
to k8s and the flink job submited to jobmanager.
2. it cost about 1min between delete a FlinkDeployment and FlinkSessionJob CR  
and the flink job and session cluster cleared.

 

I use async-profiler to get flamegraph when  there is a huge number 
FlinkDeployment and FlinkSessionJob. I found two obvious areas for optimization

1. For Flinkdeployment: in the observe step, we call 
AbstractFlinkService.getClusterInfo/listJobs/getTaskManagerInfo , every time we 
call these method we need create RestClusterClient/ send requests/ close, I 
think we should reuse RestClusterClient as much as possible to avoid frequently 
creating objects to reduce GC pressure

2. For FlinkSessionJob (This issue is more obvious): in the whole reconcile 
loop, we call getSecondaryResource 5 times to get FlinkDeployement resource 
info. Based on my current understanding of the Flink Operator, I think we do 
not need to call it 5 times in a single reconcile loop, calling it once is 
enough. If yes, we cloud save 30% cpu usage (every getSecondaryResource cost 6% 
cpu usage)

[^operator_no_submit_no_kill.flamegraph.html]

I hope we can discuss solutions to address this problem together. I'm very 
willing to optimize and resolve this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-03 Thread Fei Feng (Jira)
Fei Feng created FLINK-34566:


 Summary: Flink Kubernetes Operator reconciliation parallelism 
setting not work
 Key: FLINK-34566
 URL: https://issues.apache.org/jira/browse/FLINK-34566
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
Reporter: Fei Feng
 Attachments: image-2024-03-04-10-58-37-679.png, 
image-2024-03-04-11-17-22-877.png

After upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , we can 
not enlarge reconciliation parallelism , and the maximum reconciliation 
parallelism was 10. This results FlinkDeployment and SessionJob 's 
reconciliation delay about 10-20 seconds where we have a large scale  flink 
session cluster and flink jobs。
 

After investigating and validating, I found the reason is the logic for 
reconciliation thread pool creation in JOSDK has changed significantly between 
this two version. 

v4.3.0: 
reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
was same as corePoolSize), so we pass the reconciliation thread and get a 
thread pool that matches our expectations.


!image-2024-03-04-10-58-37-679.png|width=628,height=115!

[https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]

 

but in v4.2.0:

the reconciliation thread pool was created as a customer executor which we can 
pass corePoolSize and maximumPoolSize to create this thread pool.The problem is 
that we only set the maximumPoolSize of the thread pool, while, the 
corePoolSize of the thread pool is defaulted to 10. This causes thread pool 
size was only 10 and majority of events would be placed in the workQueue for a 
while.  

!image-2024-03-04-11-17-22-877.png|width=594,height=117!

https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31482) support count jobmanager-failed failover times

2023-03-15 Thread Fei Feng (Jira)
Fei Feng created FLINK-31482:


 Summary: support count jobmanager-failed failover times
 Key: FLINK-31482
 URL: https://issues.apache.org/jira/browse/FLINK-31482
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Metrics
Affects Versions: 1.16.1
Reporter: Fei Feng


we have a  metric `numRestarts` which indicate how many times a job failover , 
but we don't have a metric indicate the job recover from ha ( high 
availability).

there are two problems:

1. when a  jobmanager process crashed , we have no way of knowing that 
jobmanager is crash and job was recovered from metric system 

2. when a new jobmanager become leader, the  `numRestarts`  will started from 
zero, 
Sometimes misleading our users。most user think that whether failover because of 
a JM failure or because of a job failure, these failover is same , the effect, 
at least, is the same.
 
I suggest we can 
1. add new metric that indicate how many time the job was recovered from ha
2. metric `numRestarts` also count the times recover from ha  
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-24689) Add log file's time info in loglist

2021-10-28 Thread Fei Feng (Jira)
Fei Feng created FLINK-24689:


 Summary: Add log file's time info in loglist
 Key: FLINK-24689
 URL: https://issues.apache.org/jira/browse/FLINK-24689
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.14.0, 1.12.2
Reporter: Fei Feng
 Fix For: 1.14.0


Flink web ui support show all log file info in log dir, but only has file name 
and file size now。 When I search and locate problem, I don't now which one log 
file that I should open(for example: taskmanager.log rotated and retained multi 
history log file)。

If there is a time info (for example mtime) in loglist's view, it will guide me 
to choose correct log file and analyse problem exactly and quickly。or I must 
open every log file inefficiently

If no one made this improvement , I will be happy to fix it

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-10735) flink on yarn close container exception

2018-10-31 Thread Fei Feng (JIRA)
Fei Feng created FLINK-10735:


 Summary: flink on yarn close container exception
 Key: FLINK-10735
 URL: https://issues.apache.org/jira/browse/FLINK-10735
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.6.2
Reporter: Fei Feng


flink on yarn with detached mode, when cancle flink job,yarn resource release 
very slow

Log:

18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job 
32F01A0FC50EFE8F4794AD0C45678EC4: xxx switched from state RUNNING to CANCELLING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) 
(0807b5f291f897ac4545dbfdb8ec3448) switched from RUNNING to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) 
(a56a70eb6807dacf18fbf272ee6160e2) switched from RUNNING to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) 
(a2eca3dc06087dfdaf3fd0200e545cc4) switched from RUNNING to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: 
(TumblingGroupWindow('w$, 'ROWTIME, 360.millis)), select: 
(SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, 
SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, 
EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) 
(9fa3592c74eda97124a033b6afea6c87) switched from RUNNING to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from RUNNING 
to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from RUNNING 
to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from RUNNING 
to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3)