关于flink的state
各位好, 我想请教两个问题: 1) flink的state是否可以在不同的算子共享,比如,第一个map我有一个state,到了第二个map我继续拿到这个state? 2) flink的state有没有不需要keyby的,因为想统计一个总量,还没有合适的key可以选择?
【flink native k8s】HA配置 taskmanager pod一直重启
【问题描述】 启用HA配置之后,taskmanager pod一直处于创建-停止-创建的过程,无法启动任务 1. 任务配置和启动过程 a) 修改conf/flink.yaml配置文件,增加HA配置 kubernetes.cluster-id: realtime-monitor high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: file:///opt/flink/checkpoint/recovery/monitor // 这是一个NFS路径,以pvc挂载到pod b) 先通过以下命令创建一个无状态部署,建立一个session集群 ./bin/kubernetes-session.sh \ -Dkubernetes.secrets=cdn-res-bd-keystore:/opt/flink/kafka/res/keystore/bd,cdn-res-bd-truststore:/opt/flink/kafka/res/truststore/bd,cdn-res-bj-keystore://opt/flink/kafka/res/keystore/bj,cdn-res-bj-truststore:/opt/flink/kafka/res/truststore/bj \ -Dkubernetes.pod-template-file=./conf/pod-template.yaml \ -Dkubernetes.cluster-id=realtime-monitor \ -Dkubernetes.jobmanager.service-account=wuzhiheng \ -Dkubernetes.namespace=monitor \ -Dtaskmanager.numberOfTaskSlots=6 \ -Dtaskmanager.memory.process.size=8192m \ -Djobmanager.memory.process.size=2048m c) 最后通过web ui提交一个jar包任务,jobmanager 出现如下日志 2022-08-29 23:49:04,150 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Pod realtime-monitor-taskmanager-1-13 is created. 2022-08-29 23:49:04,152 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Pod realtime-monitor-taskmanager-1-12 is created. 2022-08-29 23:49:04,161 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Received new TaskManager pod: realtime-monitor-taskmanager-1-12 2022-08-29 23:49:04,162 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requested worker realtime-monitor-taskmanager-1-12 with resource spec WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, numSlots=6}. 2022-08-29 23:49:04,162 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Received new TaskManager pod: realtime-monitor-taskmanager-1-13 2022-08-29 23:49:04,162 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requested worker realtime-monitor-taskmanager-1-13 with resource spec WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, numSlots=6}. 2022-08-29 23:49:07,176 WARN org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Reaching max start worker failure rate: 12 events detected in the recent interval, reaching the threshold 10.00. 2022-08-29 23:49:07,176 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Will not retry creating worker in 3000 ms. 2022-08-29 23:49:07,176 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker realtime-monitor-taskmanager-1-12 with resource spec WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, numSlots=6} was requested in current attempt and has not registered. Current pending count after removing: 1. 2022-08-29 23:49:07,176 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker realtime-monitor-taskmanager-1-12 is terminated. Diagnostics: Pod terminated, container termination statuses: [flink-main-container(exitCode=1, reason=Error, message=null)], pod status: Failed(reason=null, message=null) 2022-08-29 23:49:07,176 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, numSlots=6}, current pending count: 2. 2022-08-29 23:49:07,514 WARN org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Reaching max start worker failure rate: 13 events detected in the recent interval, reaching the threshold 10.00. 2022-08-29 23:49:07,514 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker realtime-monitor-taskmanager-1-13 with resource spec WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, numSlots=6} was requested in current attempt and has not registered. Current pending count after removing: 1. 2022-08-29 23:49:07,514 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker realtime-monitor-taskmanager-1-13 is terminated. Diagnostics: Pod terminated, container termination statuses: [flink-main-container(exitCode=1, reason=Error, message=null)], pod status: Failed(reason=null, message=null) 2022-08-29 23:49:07,515 INFO
Re: Re: 关于Flink state初始化的问题
你应该没有正确理解 state 的使用 我们一般在程序里面是用的是 KeyedState , 也就是和 key 伴随的。 基于上面,所以 open() 里面只能对 state 进行初始化, 但是没有办法设置 state 的 value,因为这时候没有 key ; 另外一方面,也不会在 map() 的时候去 new state (可以认为 state 是一个大的 Map,你 map 的时候只是操作其中的一个 key)。 回到你的需求,你应该在 open() 的时候保存相关信息到类变量里面,当 map() 的时候再去 update state。 Zhiwen Sun On Fri, Aug 26, 2022 at 1:55 PM 曲洋 wrote: > > 对的,是后者,statAccumulator.value()是null,就是map方法中取值就成null了,但是open中命名初始化了,这个是因为map太快了吗,state没初始化完就开始拿了吗. > 嗯嗯,我现在改成先进行判断 > > > > > > > > > > > > > > > > > > 在 2022-08-26 11:22:43,"Hangxiang Yu" 写道: > >open确实是初始化的时候就会调用的; > > >第一次调用是null是说statAccumulator是null还是statAccumulator.value()是null,后者的话是正常可能会出现的; > >这里的写法看起来有点问题,一般用value方法取出来可以先判断下,然后对value state的更新用update方法; > > > >On Fri, Aug 26, 2022 at 10:25 AM 曲洋 wrote: > > > >> 各位好, > >> > >> > 我想请教一个问题,我的Flink应用中会在state里边存储一些指标,比如一年的总数,然后当任务各种原因断掉的时候,我希望可以通过入参的方式直接调节这个state,但是遇到了一个问题,如下: > >> 我重写了RichMapFunction,yearTotal > >> > 这个指标是通过命令行传进来的,然后我希望初始化到state里边,但是我发现,open方法第一次调用的时候state都是null,然后这个参数就进不来 > >> 所以我想问下这个场景怎么办,还有open方法的生命周期是怎么样的,我本以为是map第一次打开的时候就会调用,结果好像不是 > >> public static class AccumulateAmounts extends RichMapFunction >> BlueAccumulaterInitState> { > >> private transient ValueState > >> statAccumulator; > >> > >> > >> @Override > >> public BlueAccumulaterInitState map(v2bean currentAccumulator) > >> throws Exception { > >> > >> > >> BlueAccumulaterInitState stat = (statAccumulator.value() != > >> null) ? statAccumulator.value() : new BlueAccumulaterInitState(); > >> Long yearIncrement = year.equals(stat.getYear()) ? > >> stat.getYearMetric() + 1L : 1L; > >> stat.setYearMetric(yearIncrement); > >> > >> > >> statAccumulator.update(stat); > >> return stat; > >> } > >> > >> > >> @Override > >> public void open(Configuration config) { > >> ValueStateDescriptor descriptor = > >> new ValueStateDescriptor<>( > >> "total", > >> TypeInformation.of(new > >> TypeHint() { > >> })); > >> statAccumulator = getRuntimeContext().getState(descriptor); > >> ExecutionConfig.GlobalJobParameters globalParams = > >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); > >> Configuration globConf = (Configuration) globalParams; > >> long yearTotal = > >> globConf.getLong(ConfigOptions.key("year").longType().noDefaultValue()); > >> statAccumulator.value().setYearMetric(yearTotal); > >> > >> > >> > >> } > >> } > > > > > > > >-- > >Best, > >Hangxiang. >
Re: 退订
Hi,退订发送任意内容至邮箱user-zh-unsubscr...@flink.apache.org 即可. Best, Yuxin hihl 于2022年8月29日周一 14:05写道: > >
Re: 退订
Hi,退订发送任意内容至邮箱user-zh-unsubscr...@flink.apache.org 即可. Best, Yuxin 廖启发 于2022年8月29日周一 15:52写道: > 退订 >
flink作业生成保存点失败
有一个线上flink作业在人为主动创建保存点时失败,作业有两个算子:从kafka读取数据和写到mongodb,都是48个并行度,出错后查看到写mongodb算子一共48个task,完成了45个,还有3个tasks超时(超时时长设为3分钟),正常情况下完成一次checkpoint要4秒,状态大小只有23.7kb。出错后,查看作业日志如下。在创建保存点失败后作业周期性的检查点生成也都失败了(每个算子各有3个tasks超时)。使用的是FileStateBackend,DFS用的是阿里云oss。请问出错会是因为什么原因造成的? +5 [2022-08-29 15:38:32] content: 2022-08-29 15:38:32,617 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler [] - Failed to transfer file from TaskExecutor sqrc-session-prod-taskmanager-1-30. +6 [2022-08-29 15:38:32] content: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor. +7 [2022-08-29 15:38:32] content: at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2064) ~[flink-dist_2.12-1.13.2.jar:1.13.2] +8 [2022-08-29 15:38:32] content: at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_312] +9 [2022-08-29 15:38:32] content: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_312] +10 [2022-08-29 15:38:32] content: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_312] +11 [2022-08-29 15:38:32] content: at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] +12 [2022-08-29 15:38:32] content: Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor. +13 [2022-08-29 15:38:32] content: ... 5 more +14 [2022-08-29 15:38:32] content: 2022-08-29 15:38:32,617 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler [] - Unhandled exception. +15 [2022-08-29 15:38:32] content: org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor. +16 [2022-08-29 15:38:32] content: at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2064) ~[flink-dist_2.12-1.13.2.jar:1.13.2] +17 [2022-08-29 15:38:32] content: at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_312] +18 [2022-08-29 15:38:32] content: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_312] +19 [2022-08-29 15:38:32] content: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_312] +20 [2022-08-29 15:38:32] content: at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
退订
退订