Re: Unaligned Checkpoint

2022-06-12 Thread Zhanghao Chen
你好,

Unaligned checkpoint 是个底层特性,要使用的话只要设置 Flink 参数 
execution.checkpointing.unaligned = true 就行,在 SQL client 中,可以使用 SET "key" = 
"value" 的语法设置 Flink 参数的值。

Unaligned checkpoint 较之 aligned checkpoint 主要的改变在于

  *   unaligned cp 在输入缓冲区收到第一个 cp barrier 
的时候立即触发快照并直接输出至下游;代价是快照需要记录缓冲区中的数据来保证一致性,产生更多 io 并增大 cp 大小。
  *   aligned cp 在算子收到最后一个 cp barrier 完成 barrier 对齐后才触发快照,barrier 对齐期间较早收到 
barrier 的 input channel 会被阻塞,在反压时阻塞时间会显著增加,导致 cp 速度变慢;好处是 barrier 
对齐的过程使得快照不需要记录缓冲等待队列中的数据就可以保证一致性。

Best,
Zhanghao Chen

From: 小昌同学 
Sent: Saturday, June 11, 2022 17:18
To: user-zh@flink.apache.org 
Subject: Unaligned Checkpoint

大佬们可以说说Unaligned Checkpoint的实现吗  看了不少文档 没有太看懂  我如果想在sql里面实现  这个该怎么设置啊  请大佬们指教


| |
小昌同学
|
|
ccc0606fight...@163.com
|


Re: unsubscribe; 退订

2022-06-12 Thread Zhanghao Chen
你好, 退订请发送任意消息至 user-zh-unsubscr...@flink.apache.org。

Best,

Zhanghao Chen

From: chenshu...@foxmail.com 
Sent: Sunday, June 12, 2022 11:44
To: user-zh 
Subject: unsubscribe; 退订

unsubscribe
退订



chenshu...@foxmail.com


退订

2022-06-12 Thread 方锡锐
退订

退订

2022-06-12 Thread Kaiqiang


发自我的iPhone

Flink k8s HA 手动删除作业deployment导致的异常

2022-06-12 Thread m18814122325
Flink version: 1.15.0

deploy mode: Native k8s application




问题现象:

我以Native 
k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。




kubectl delete deployment flink-bdra-sql-application-job -n  
bdra-dev-flink-standalone




kubectl get configMap -n bdra-dev-flink-standalone




NAME
 DATA   AGE

flink-bdra-sql-application-job--config-map  
2  13m

flink-bdra-sql-application-job-cluster-config-map   
 1  13m







我有以下疑问:

1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink 
k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

2.基于k8s做HA的Flink job id皆为。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。




重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)

flink run-application --target kubernetes-application -c CalculateUv 
-Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p 
-Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 
-Dkubernetes.namespace=bdra-dev-flink-standalone 
-Dkubernetes.service-account=bdra-dev-flink-standalone-sa 
-Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 
-Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 
-Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m 
-Dstate.backend=filesystem 
-Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
 
-Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
 
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 -Dhigh-availability.storageDir=file:///opt/flink/log/recovery 
-Ds3.access-key=* -Ds3.secret-key=* 
-Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
 -Dmetrics.reporter.influxdb.scheme=http 
-Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086 
-Dmetrics.reporter.influxdb.db=flink_metrics 
-Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 
-Dkubernetes.rest-service.exposed.type=ClusterIP 
-Dkubernetes.config.file=kube_config 
-Dkubernetes.pod-template-file=pod-template.yaml 
local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar




重启后自动从ConfigMap中恢复。

2022-06-10 20:20:52,592 INFO  
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - 
Successfully recovered 1 persisted job graphs.

2022-06-10 20:20:52,654 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .

2022-06-10 20:20:53,552 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 
pods from previous attempts, current attempt id is 1.

2022-06-10 20:20:53,552 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Recovered 0 workers from previous attempt.

2022-06-10 20:20:55,352 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5.

2022-06-10 20:20:55,370 INFO  org.apache.flink.client.ClientUtils   
   [] - Starting program (detached: false)

2022-06-10 20:20:55,394 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_2 .

2022-06-10 20:20:55,438 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Initializing job 
'insert-into_default_catalog.default_database.buy_cnt_per_hour' 
().

2022-06-10 20:20:55,477 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Using restart back off time strategy 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, 
backoffTimeMS=1000) for 
insert-into_default_catalog.default_database.buy_cnt_per_hour 
().

2022-06-10 20:20:55,558 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
Recovering checkpoints from 
KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job--config-map'}.

2022-06-10 20:20:55,572 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
Found 1 checkpoints in 
KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job--config-map'}.

2022-06-10 20:20:55,572 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [

Re: Flink k8s HA 手动删除作业deployment导致的异常

2022-06-12 Thread Zhanghao Chen
1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink 
k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置 ownerreference,因此如果想在保留 
HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。

2.基于k8s做HA的Flink job id皆为。

开启 HA 的 Application mode 的 Flink job id 
皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA 
服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在 JM 
生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的 job id 
(一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp 
相关联,导致作业从全新状态恢复。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。

可以看下官方的博客文章: 
https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅 JIRA 
设计文档:https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink


Best,
Zhanghao Chen

From: m18814122325 
Sent: Sunday, June 12, 2022 22:45
To: user-zh@flink.apache.org 
Subject: Flink k8s HA 手动删除作业deployment导致的异常

Flink version: 1.15.0

deploy mode: Native k8s application




问题现象:

我以Native 
k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。




kubectl delete deployment flink-bdra-sql-application-job -n  
bdra-dev-flink-standalone




kubectl get configMap -n bdra-dev-flink-standalone




NAME
 DATA   AGE

flink-bdra-sql-application-job--config-map  
2  13m

flink-bdra-sql-application-job-cluster-config-map   
 1  13m







我有以下疑问:

1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink 
k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

2.基于k8s做HA的Flink job id皆为。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。




重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)

flink run-application --target kubernetes-application -c CalculateUv 
-Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p 
-Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 
-Dkubernetes.namespace=bdra-dev-flink-standalone 
-Dkubernetes.service-account=bdra-dev-flink-standalone-sa 
-Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 
-Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 
-Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m 
-Dstate.backend=filesystem 
-Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
 
-Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
 
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 -Dhigh-availability.storageDir=file:///opt/flink/log/recovery 
-Ds3.access-key=* -Ds3.secret-key=* 
-Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
 -Dmetrics.reporter.influxdb.scheme=http 
-Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086 
-Dmetrics.reporter.influxdb.db=flink_metrics 
-Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 
-Dkubernetes.rest-service.exposed.type=ClusterIP 
-Dkubernetes.config.file=kube_config 
-Dkubernetes.pod-template-file=pod-template.yaml 
local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar




重启后自动从ConfigMap中恢复。

2022-06-10 20:20:52,592 INFO  
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - 
Successfully recovered 1 persisted job graphs.

2022-06-10 20:20:52,654 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .

2022-06-10 20:20:53,552 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 
pods from previous attempts, current attempt id is 1.

2022-06-10 20:20:53,552 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Recovered 0 workers from previous attempt.

2022-06-10 20:20:55,352 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5.

2022-06-10 20:20:55,370 INFO  org.apache.flink.client.ClientUtils   
   [] - Starting program (detached: false)

2022-06-10 20:20:55,394 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_2 .

2022-06-10 20:20:55,438 INFO  

Re: Flink k8s HA 手动删除作业deployment导致的异常

2022-06-12 Thread Yang Wang
Zhanghao的回答已经非常全面了,我再补充小点,删除Deployment保留HA ConfigMap是预期内的行为,文档里面有说明[1]
之所以这样设计有两点原因:
(1.) 任务可能会被重启,但使用相同的cluster-id,并且希望从之前的checkpoint恢复
(2.) 单纯的删除ConfigMap会导致存储在DFS(e.g. HDFS、S3、OSS)上面的HA数据泄露

[1].
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up


Best,
Yang

Zhanghao Chen  于2022年6月13日周一 07:53写道:

> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
>
> 是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置
> ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。
>
> 2.基于k8s做HA的Flink job id皆为。
>
> 开启 HA 的 Application mode 的 Flink job id
> 皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA
> 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在
> JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的
> job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp
> 相关联,导致作业从全新状态恢复。
>
> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
>
> 可以看下官方的博客文章: 
> https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅
> JIRA 设计文档:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>
>
> Best,
> Zhanghao Chen
> 
> From: m18814122325 
> Sent: Sunday, June 12, 2022 22:45
> To: user-zh@flink.apache.org 
> Subject: Flink k8s HA 手动删除作业deployment导致的异常
>
> Flink version: 1.15.0
>
> deploy mode: Native k8s application
>
>
>
>
> 问题现象:
>
> 我以Native
> k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
> 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。
>
>
>
>
> kubectl delete deployment flink-bdra-sql-application-job -n
> bdra-dev-flink-standalone
>
>
>
>
> kubectl get configMap -n bdra-dev-flink-standalone
>
>
>
>
> NAME
>DATA   AGE
>
> flink-bdra-sql-application-job--config-map
> 2  13m
>
> flink-bdra-sql-application-job-cluster-config-map
>   1  13m
>
>
>
>
>
>
>
> 我有以下疑问:
>
> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
>
> 2.基于k8s做HA的Flink job id皆为。
>
> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
>
>
>
>
> 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)
>
> flink run-application --target kubernetes-application -c CalculateUv
> -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p
> -Dkubernetes.container.image=
> acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20
> -Dkubernetes.namespace=bdra-dev-flink-standalone
> -Dkubernetes.service-account=bdra-dev-flink-standalone-sa
> -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2
> -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8
> -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m
> -Dstate.backend=filesystem
> -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
> -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> -Dhigh-availability.storageDir=file:///opt/flink/log/recovery
> -Ds3.access-key=* -Ds3.secret-key=*
> -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
> -Dmetrics.reporter.influxdb.scheme=http
> -Dmetrics.reporter.influxdb.host=influxdb
> -Dmetrics.reporter.influxdb.port=8086
> -Dmetrics.reporter.influxdb.db=flink_metrics
> -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80
> -Dkubernetes.rest-service.exposed.type=ClusterIP
> -Dkubernetes.config.file=kube_config
> -Dkubernetes.pod-template-file=pod-template.yaml
> local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar
>
>
>
>
> 重启后自动从ConfigMap中恢复。
>
> 2022-06-10 20:20:52,592 INFO
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> [] - Successfully recovered 1 persisted job graphs.
>
> 2022-06-10 20:20:52,654 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting
> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> at akka://flink/user/rpc/dispatcher_1 .
>
> 2022-06-10 20:20:53,552 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered
> 0 pods from previous attempts, current attempt id is 1.
>
> 2022-06-10 20:20:53,552 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Recovered 0 workers from previous attempt.
>
> 2022-06-10 20:20:55,352 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> org.apache.flink.

(无主题)

2022-06-12 Thread 李国辉

退订