关于flink的state

2022-08-29 文章 曲洋
各位好,
   我想请教两个问题:
1) flink的state是否可以在不同的算子共享,比如,第一个map我有一个state,到了第二个map我继续拿到这个state?
2) flink的state有没有不需要keyby的,因为想统计一个总量,还没有合适的key可以选择?



【flink native k8s】HA配置 taskmanager pod一直重启

2022-08-29 文章 Wu,Zhiheng
【问题描述】
启用HA配置之后,taskmanager pod一直处于创建-停止-创建的过程,无法启动任务

1. 任务配置和启动过程

a)  修改conf/flink.yaml配置文件,增加HA配置
kubernetes.cluster-id: realtime-monitor
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///opt/flink/checkpoint/recovery/monitor 
   // 这是一个NFS路径,以pvc挂载到pod

b)  先通过以下命令创建一个无状态部署,建立一个session集群

./bin/kubernetes-session.sh \

-Dkubernetes.secrets=cdn-res-bd-keystore:/opt/flink/kafka/res/keystore/bd,cdn-res-bd-truststore:/opt/flink/kafka/res/truststore/bd,cdn-res-bj-keystore://opt/flink/kafka/res/keystore/bj,cdn-res-bj-truststore:/opt/flink/kafka/res/truststore/bj
 \

-Dkubernetes.pod-template-file=./conf/pod-template.yaml \

-Dkubernetes.cluster-id=realtime-monitor \

-Dkubernetes.jobmanager.service-account=wuzhiheng \

-Dkubernetes.namespace=monitor \

-Dtaskmanager.numberOfTaskSlots=6 \

-Dtaskmanager.memory.process.size=8192m \

-Djobmanager.memory.process.size=2048m

c)  最后通过web ui提交一个jar包任务,jobmanager 出现如下日志

2022-08-29 23:49:04,150 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod 
realtime-monitor-taskmanager-1-13 is created.

2022-08-29 23:49:04,152 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod 
realtime-monitor-taskmanager-1-12 is created.

2022-08-29 23:49:04,161 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received new 
TaskManager pod: realtime-monitor-taskmanager-1-12

2022-08-29 23:49:04,162 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requested worker realtime-monitor-taskmanager-1-12 with resource spec 
WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), 
taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), 
managedMemSize=0 bytes, numSlots=6}.

2022-08-29 23:49:04,162 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received new 
TaskManager pod: realtime-monitor-taskmanager-1-13

2022-08-29 23:49:04,162 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requested worker realtime-monitor-taskmanager-1-13 with resource spec 
WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), 
taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), 
managedMemSize=0 bytes, numSlots=6}.

2022-08-29 23:49:07,176 WARN  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Reaching max start worker failure rate: 12 events detected in the recent 
interval, reaching the threshold 10.00.

2022-08-29 23:49:07,176 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Will 
not retry creating worker in 3000 ms.

2022-08-29 23:49:07,176 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker realtime-monitor-taskmanager-1-12 with resource spec WorkerResourceSpec 
{cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 
bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
numSlots=6} was requested in current attempt and has not registered. Current 
pending count after removing: 1.

2022-08-29 23:49:07,176 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker realtime-monitor-taskmanager-1-12 is terminated. Diagnostics: Pod 
terminated, container termination statuses: [flink-main-container(exitCode=1, 
reason=Error, message=null)], pod status: Failed(reason=null, message=null)

2022-08-29 23:49:07,176 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requesting new worker with resource spec WorkerResourceSpec {cpuCores=6.0, 
taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
numSlots=6}, current pending count: 2.

2022-08-29 23:49:07,514 WARN  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Reaching max start worker failure rate: 13 events detected in the recent 
interval, reaching the threshold 10.00.

2022-08-29 23:49:07,514 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker realtime-monitor-taskmanager-1-13 with resource spec WorkerResourceSpec 
{cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 
bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
numSlots=6} was requested in current attempt and has not registered. Current 
pending count after removing: 1.

2022-08-29 23:49:07,514 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker realtime-monitor-taskmanager-1-13 is terminated. Diagnostics: Pod 
terminated, container termination statuses: [flink-main-container(exitCode=1, 
reason=Error, message=null)], pod status: Failed(reason=null, message=null)

2022-08-29 23:49:07,515 INFO  

Re: Re: 关于Flink state初始化的问题

2022-08-29 文章 Zhiwen Sun
你应该没有正确理解 state 的使用

我们一般在程序里面是用的是 KeyedState , 也就是和 key 伴随的。

基于上面,所以 open() 里面只能对 state 进行初始化, 但是没有办法设置 state 的 value,因为这时候没有 key ;
另外一方面,也不会在 map() 的时候去 new state (可以认为 state 是一个大的 Map,你 map 的时候只是操作其中的一个
key)。

回到你的需求,你应该在 open() 的时候保存相关信息到类变量里面,当 map() 的时候再去 update state。






Zhiwen Sun



On Fri, Aug 26, 2022 at 1:55 PM 曲洋  wrote:

>
> 对的,是后者,statAccumulator.value()是null,就是map方法中取值就成null了,但是open中命名初始化了,这个是因为map太快了吗,state没初始化完就开始拿了吗.
> 嗯嗯,我现在改成先进行判断
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-08-26 11:22:43,"Hangxiang Yu"  写道:
> >open确实是初始化的时候就会调用的;
>
> >第一次调用是null是说statAccumulator是null还是statAccumulator.value()是null,后者的话是正常可能会出现的;
> >这里的写法看起来有点问题,一般用value方法取出来可以先判断下,然后对value state的更新用update方法;
> >
> >On Fri, Aug 26, 2022 at 10:25 AM 曲洋  wrote:
> >
> >> 各位好,
> >>
> >>
> 我想请教一个问题,我的Flink应用中会在state里边存储一些指标,比如一年的总数,然后当任务各种原因断掉的时候,我希望可以通过入参的方式直接调节这个state,但是遇到了一个问题,如下:
> >> 我重写了RichMapFunction,yearTotal
> >>
> 这个指标是通过命令行传进来的,然后我希望初始化到state里边,但是我发现,open方法第一次调用的时候state都是null,然后这个参数就进不来
> >> 所以我想问下这个场景怎么办,还有open方法的生命周期是怎么样的,我本以为是map第一次打开的时候就会调用,结果好像不是
> >> public static class AccumulateAmounts extends RichMapFunction >> BlueAccumulaterInitState> {
> >> private transient ValueState
> >> statAccumulator;
> >>
> >>
> >> @Override
> >> public BlueAccumulaterInitState map(v2bean currentAccumulator)
> >> throws Exception {
> >>
> >>
> >> BlueAccumulaterInitState stat = (statAccumulator.value() !=
> >> null) ? statAccumulator.value() : new BlueAccumulaterInitState();
> >> Long yearIncrement = year.equals(stat.getYear()) ?
> >> stat.getYearMetric() + 1L : 1L;
> >> stat.setYearMetric(yearIncrement);
> >>
> >>
> >> statAccumulator.update(stat);
> >> return stat;
> >> }
> >>
> >>
> >> @Override
> >> public void open(Configuration config) {
> >> ValueStateDescriptor descriptor =
> >> new ValueStateDescriptor<>(
> >> "total",
> >> TypeInformation.of(new
> >> TypeHint() {
> >> }));
> >> statAccumulator = getRuntimeContext().getState(descriptor);
> >> ExecutionConfig.GlobalJobParameters globalParams =
> >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> >> Configuration globConf = (Configuration) globalParams;
> >> long yearTotal =
> >> globConf.getLong(ConfigOptions.key("year").longType().noDefaultValue());
> >> statAccumulator.value().setYearMetric(yearTotal);
> >>
> >>
> >>
> >> }
> >> }
> >
> >
> >
> >--
> >Best,
> >Hangxiang.
>


Re: 退订

2022-08-29 文章 Yuxin Tan
Hi,退订发送任意内容至邮箱user-zh-unsubscr...@flink.apache.org 即可.

Best,
Yuxin


hihl  于2022年8月29日周一 14:05写道:

>
>


Re: 退订

2022-08-29 文章 Yuxin Tan
Hi,退订发送任意内容至邮箱user-zh-unsubscr...@flink.apache.org 即可.

Best,
Yuxin


廖启发  于2022年8月29日周一 15:52写道:

> 退订
>


flink作业生成保存点失败

2022-08-29 文章 casel.chen
有一个线上flink作业在人为主动创建保存点时失败,作业有两个算子:从kafka读取数据和写到mongodb,都是48个并行度,出错后查看到写mongodb算子一共48个task,完成了45个,还有3个tasks超时(超时时长设为3分钟),正常情况下完成一次checkpoint要4秒,状态大小只有23.7kb。出错后,查看作业日志如下。在创建保存点失败后作业周期性的检查点生成也都失败了(每个算子各有3个tasks超时)。使用的是FileStateBackend,DFS用的是阿里云oss。请问出错会是因为什么原因造成的?


+5
[2022-08-29 15:38:32]
content: 
2022-08-29 15:38:32,617 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler 
[] - Failed to transfer file from TaskExecutor 
sqrc-session-prod-taskmanager-1-30.
+6
[2022-08-29 15:38:32]
content: 
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: 
The file STDOUT does not exist on the TaskExecutor.
+7
[2022-08-29 15:38:32]
content: 
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2064)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
+8
[2022-08-29 15:38:32]
content: 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_312]
+9
[2022-08-29 15:38:32]
content: 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_312]
+10
[2022-08-29 15:38:32]
content: 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_312]
+11
[2022-08-29 15:38:32]
content: 
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
+12
[2022-08-29 15:38:32]
content: 
Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not exist 
on the TaskExecutor.
+13
[2022-08-29 15:38:32]
content: 
... 5 more
+14
[2022-08-29 15:38:32]
content: 
2022-08-29 15:38:32,617 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler 
[] - Unhandled exception.
+15
[2022-08-29 15:38:32]
content: 
org.apache.flink.util.FlinkException: The file STDOUT does not exist on the 
TaskExecutor.
+16
[2022-08-29 15:38:32]
content: 
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2064)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
+17
[2022-08-29 15:38:32]
content: 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_312]
+18
[2022-08-29 15:38:32]
content: 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_312]
+19
[2022-08-29 15:38:32]
content: 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_312]
+20
[2022-08-29 15:38:32]
content: 
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]

退订

2022-08-29 文章 廖启发
退订


退订

2022-08-29 文章 hihl