Re: web ui中能查看到job失败的原因吗?

2022-04-22 Thread Hangxiang Yu
JobManager的log里应该能看到root cause?

On Thu, Apr 21, 2022 at 5:54 PM weishishuo...@163.com 
wrote:

>
> 我提交一个postgresql cdc 同步数据到 mysql jdbc sink的job,过了一会儿就失败了,点击job的链接,web
> ui界面的状态是FAILED,但是异常信息不明确
> ```
> 2022-04-21 17:30:50
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> ...
> Caused by: org.apache.flink.util.FlinkException: Execution
> d0dfc8446e24da751e93560c07f5d7f3 is unexpectedly no longer running on task
> executor 192.168.211.4:9730-fa3d22.
> at
> org.apache.flink.runtime.jobmaster.JobMaster$1.onMissingDeploymentsOf(JobMaster.java:248)
> ... 34 more
> ```
> 不知道root cause是什么,web 界面是否可以查到呢?还是到哪里去查看呢?
>
>
>
>
> weishishuo...@163.com
>


Re: Some question with Flink state

2022-05-23 Thread Hangxiang Yu
Hello,
All states will not be shared in different parallelisms.
BTW, English questions could be sent to u...@flink.apache.org.

Best,
Hangxiang.

On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:

>
> Hi everyone
>I was used Flink keyed-state in my Project.But I found some questions
> that make me confused.
>when I used value-state in multi parallelism  the value is not I wanted.
>So I guess that value-state is in every parallelism. every parallelism
> saved their only value  which means the value is Thread-Level
>But when I used map-state,the value is correctly. I mean the map-state
> was shared by every parallelism.
>   looking forward to your reply
>
>
> lxk7...@163.com
>


Re: Re: Some question with Flink state

2022-05-23 Thread Hangxiang Yu
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;

On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:

> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re: ​请问是否有计划合并blink gemini到flink的计划

2022-07-13 Thread Hangxiang Yu
你好,是从阿里云产品的Blink迁移到Flink吗?这个迁移过程可以参考[1],Gemini在“实时计算Flink版”中的介绍和使用方式可以参考[2][3]。
关于合并到社区,短期内没有相关计划。

[1] https://help.aliyun.com/document_detail/421043.html
[2] https://help.aliyun.com/document_detail/414255.html
[3] https://help.aliyun.com/document_detail/414256.html

Best,
Hangxiang.

On Wed, Jul 13, 2022 at 3:40 PM 蔡荣  wrote:

> 看到一篇文章《数据处理能力相差 2.4 倍?Flink 使用 RocksDB 和 Gemini 的性能对比实验》,
> https://developer.aliyun.com/article/770793,
> 请问是否有计划合并gemini到flink的计划?
>
>


Re: Flink作业修改时State的兼容性

2022-07-31 Thread Hangxiang Yu
hello,目前开源版本仅支持DataStream作业有限的更改,如DS作业加减字段等[1];
阿里云的Flink版本支持了SQL作业修改后的兼容性判断、大部分SQL算子修改的兼容性、比社区更快的状态迁移速度(几乎不阻塞作业启动);

Best,
Hangxiang.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#supported-data-types-for-schema-evolution

On Sat, Jul 30, 2022 at 2:30 PM tison  wrote:

> 这个同样可以看两份材料
>
> *
>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/
> * https://zhuanlan.zhihu.com/p/119305376
>
> 简单说来只是改并发的话 Key state 比较好迁移,operator state 各有各的特性。如果明显改了逻辑就需要手动跑出新状态。
>
> Best,
> tison.
>
>
> m18814122325  于2022年7月30日周六 14:21写道:
>
> > 当Flink作业在业务变更修改时,在用checkpoint或者savepoint进行状态恢复时,兼容性如何?有相关文档吗?
> >
> > 我在阿里云找到SQL修改和兼容性的文档https://help.aliyun.com/document_detail/403317.html。
> > 但不知道文档说明的是阿里云里的Flink版本还是开源版本。
>


Re: flink自动重启出错

2022-08-23 Thread Hangxiang Yu
是DS作业吗?可以share下使用state的部分吗?

On Sat, Aug 20, 2022 at 3:35 PM Jason_H  wrote:

> 您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>  回复的原邮件 
> | 发件人 | Michael Ran |
> | 发送日期 | 2022年8月20日 15:31 |
> | 收件人 | tsreape...@gmail.com |
> | 主题 | 回复:flink自动重启出错 |
> 改过任务吗?
>
>
>
> | |
> greemqq...@163.com
> |
> |
> 邮箱:greemqq...@163.com
> |
>
>
>
>
>  回复的原邮件 
> | 发件人 | Jason_H |
> | 日期 | 2022年08月19日 11:52 |
> | 收件人 | flink中文邮件组 |
> | 抄送至 | |
> | 主题 | flink自动重启出错 |
> cause by: java.lang.RuntimeException: Error while getting state
> org.apache.flink.util.StateMigrationException: For heap backends, the new
> state serializer must not be incompatible with the old state serializer
>
> 大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法?
> 强调:作业是新的,没有基于之前的作业的ck进行重启。
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |



-- 
Best,
Hangxiang.


Re: 关于Flink state初始化的问题

2022-08-25 Thread Hangxiang Yu
open确实是初始化的时候就会调用的;
第一次调用是null是说statAccumulator是null还是statAccumulator.value()是null,后者的话是正常可能会出现的;
这里的写法看起来有点问题,一般用value方法取出来可以先判断下,然后对value state的更新用update方法;

On Fri, Aug 26, 2022 at 10:25 AM 曲洋  wrote:

> 各位好,
>
>  
> 我想请教一个问题,我的Flink应用中会在state里边存储一些指标,比如一年的总数,然后当任务各种原因断掉的时候,我希望可以通过入参的方式直接调节这个state,但是遇到了一个问题,如下:
> 我重写了RichMapFunction,yearTotal
> 这个指标是通过命令行传进来的,然后我希望初始化到state里边,但是我发现,open方法第一次调用的时候state都是null,然后这个参数就进不来
> 所以我想问下这个场景怎么办,还有open方法的生命周期是怎么样的,我本以为是map第一次打开的时候就会调用,结果好像不是
> public static class AccumulateAmounts extends RichMapFunction BlueAccumulaterInitState> {
> private transient ValueState
> statAccumulator;
>
>
> @Override
> public BlueAccumulaterInitState map(v2bean currentAccumulator)
> throws Exception {
>
>
> BlueAccumulaterInitState stat = (statAccumulator.value() !=
> null) ? statAccumulator.value() : new BlueAccumulaterInitState();
> Long yearIncrement = year.equals(stat.getYear()) ?
> stat.getYearMetric() + 1L : 1L;
> stat.setYearMetric(yearIncrement);
>
>
> statAccumulator.update(stat);
> return stat;
> }
>
>
> @Override
> public void open(Configuration config) {
> ValueStateDescriptor descriptor =
> new ValueStateDescriptor<>(
> "total",
> TypeInformation.of(new
> TypeHint() {
> }));
> statAccumulator = getRuntimeContext().getState(descriptor);
> ExecutionConfig.GlobalJobParameters globalParams =
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> Configuration globConf = (Configuration) globalParams;
> long yearTotal =
> globConf.getLong(ConfigOptions.key("year").longType().noDefaultValue());
> statAccumulator.value().setYearMetric(yearTotal);
>
>
>
> }
> }



-- 
Best,
Hangxiang.


Re: 基于savepoint重启作业无法保证端到端一致性

2022-08-26 Thread Hangxiang Yu
flink会保证自身的exactly once语义,端到端的exactly once的语义是需要source和sink保证幂等的;
你用的kafka是哪个版本?

On Fri, Aug 26, 2022 at 4:08 PM 杨扬  wrote:

> 各位好!
> 目前有一flink作业,source与sink均为kafka。
> 在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。
> 现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。
>
> 想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢?
>
>
>
>
>
>
>

-- 
Best,
Hangxiang.


Re: Flink sql从ck恢复,统计数据波动问题

2022-10-09 Thread Hangxiang Yu
是什么值下跌呢?哪个metric吗?

On Mon, Oct 10, 2022 at 1:34 PM 天下五帝东  wrote:

> Hi:
> 各位大佬们,flink任务从状态恢复时,统计值有一个明显的下跌,等ck打完后,统计值才会上升,这种情况有什么好的解决方案吗?



-- 
Best,
Hangxiang.


Re: flink状态恢复

2023-01-15 Thread Hangxiang Yu
Hi,
Flink目前仅支持DataStream作业对POJO类型加减字段,及符合AVRO规则的状态迁移恢复[1];
如果是其他类型,或者是sql作业,目前社区版本还尚不支持从旧状态恢复,只能无状态启动;

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#supported-data-types-for-schema-evolution

On Thu, Jan 12, 2023 at 10:32 AM 小昌同学  wrote:

> 我有一个flink任务运行很久了,统计的指标是一些聚合值,但是现在业务想要增加字段,请问一下这个场景大家是怎么处理的啊
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |



-- 
Best,
Hangxiang.


Re: Dear All,关于State processor API,有一些疑惑

2023-01-18 Thread Hangxiang Yu
1. 可以的,state processor api主要就是做这个事儿的
2. 我目前看起来SQL作业似乎没有很好的方式获取uid,一种是打开debug日志,从
StreamGraphHasherV2#generateDeterministicHash的debug日志里可以获取到

On Wed, Jan 18, 2023 at 2:39 PM ying lin  wrote:

> Flink版本:1.13.6
> 最近在做一些关于Rocksdb State通过state process api 读写的poc,有一些疑惑想请教一下:
>
> 1. 如果现在已有一个Kafka SourceA -> 自定义有状态算子B -> HDFS File Sink的Flink
> 程序,那么状态主要存在于Kafka Source算子A以及有状态算子opereratorB,
> 是否可以通过state process api
>
> 将这A、B两个算子的状态读写加工处理后写成一个新的savepoint,再将新的任务基于这个新的savepoint拉起,从而达到将原本任务的状态迁移到新的任务的目的。这种做法可行吗?
>
> 2. 我看到state process api中都要求指定operator uid, 如果是一个Flink
> SQL任务,operator的uid要如何确定?主要的场景是,现在存在一些Flink
> SQL任务,后续可能会做一定的改造,但是原来有一部分状态不希望丢弃,但是在调用state process api时需要指定uid。
>


-- 
Best,
Hangxiang.


Re: CheckpointedFunction 与 KeyedState

2023-05-05 Thread Hangxiang Yu
Hi, 在 initializeState 里初始化 State 是 OK 的,但是尽量不要在 initializeState 和
snapshotState 里访问 KeyedState,最好是在实际的 Function 比如这里的 FlatMap 里;
原因是 KeyedState 的访问是和 Current Key 绑定的,实际的 Function 在 Process 之前会隐式的 set
Current key ,因此是会保证每次 KeyedState 的操作是对确定的 KV 进行的;
而 initializeState 和 snapshotState 里是没有框架隐性 set 的,相当于会对某一个不确定的 key 去update
value了,如果一定要在这里做就需要拿到 KeyContext 自己去 set,不过不建议这么做;

On Fri, May 5, 2023 at 10:58 PM sjf0115  wrote:

> CheckpointedFunction 接口的 initializeState 方法提供了访问
> FunctionInitializationContext 的能力,而 FunctionInitializationContext 不仅提供了访问
> OperatorStateStore 的能力,也提供了 KeyedStateStore 的能力。一般常见的是通过
> CheckpointedFunction 来实现操作 OperatorState,但是也可以通过如下代码来获取 KeyedState:
> ```java
> context.getKeyedStateStore().getState(stateDescriptor);
> ```
> 想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗?
> ```java
> public static class TemperatureAlertFlatMapFunction extends
> RichFlatMapFunction, Tuple3>
> implements CheckpointedFunction  {
> // 温度差报警阈值
> private double threshold;
> // 上一次温度
> private ValueState lastTemperatureState;
> private Double lastTemperature;
> public TemperatureAlertFlatMapFunction(double threshold) {
> this.threshold = threshold;
> }
>
>
> @Override
> public void flatMap(Tuple2 sensor,
> Collector> out) throws Exception {
> String sensorId = sensor.f0;
> // 当前温度
> double temperature = sensor.f1;
> // 保存当前温度
> lastTemperature = temperature;
> // 是否是第一次上报的温度
> if (Objects.equals(lastTemperature, null)) {
> return;
> }
> double diff = Math.abs(temperature - lastTemperature);
> if (diff > threshold) {
> // 温度变化超过阈值则输出
> out.collect(Tuple3.of(sensorId, temperature, diff));
> }
> }
>
>
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
> // 获取最新的温度之后更新保存上一次温度的状态
> if (!Objects.equals(lastTemperature, null)) {
> lastTemperatureState.update(lastTemperature);
> }
> }
>
>
> @Override
> public void initializeState(FunctionInitializationContext context)
> throws Exception {
> ValueStateDescriptor stateDescriptor = new
> ValueStateDescriptor<>("lastTemperature", Double.class);
> lastTemperatureState =
> context.getKeyedStateStore().getState(stateDescriptor);
> if (context.isRestored()) {
> lastTemperature = lastTemperatureState.value();
> }
> }
> }
> ```
>
>

-- 
Best,
Hangxiang.


Re: Failed to initialize delegation token receiver s3

2023-05-09 Thread Hangxiang Yu
Hi,这个应该是FLINK-31839已经确定的ISSUE,在1.17.1中已经修复了,可以参考:
https://issues.apache.org/jira/browse/FLINK-31839

On Sat, May 6, 2023 at 5:00 PM maker_d...@foxmail.com <
maker_d...@foxmail.com> wrote:

> flink version:flink-1.17.0
> k8s application模式模式
>
> 已经在flink-conf中禁用delegation token:
> security.delegation.tokens.enabled: false
>
> 程序原本是1.13版本开发,正常使用,升级flink版本为1.17.0之后无法启动。
> 起初没有禁用delegation token,JobManager无法启动,禁用delegation
> token后JobManager可以正常启动,TaskManager报错如下:
>
> 2023-05-06 16:52:45,720 INFO
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository
> [] - Delegation token receiver s3 loaded and initialized
> 2023-05-06 16:52:45,722 INFO
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository
> [] - Delegation token receiver s3 loaded and initialized
> 2023-05-06 16:52:45,723 ERROR
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository
> [] - Failed to initialize delegation token receiver s3
> java.lang.IllegalStateException: Delegation token receiver with service
> name {} has multiple implementations [s3]
> at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.lambda$loadReceivers$0(DelegationTokenReceiverRepository.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at java.util.Iterator.forEachRemaining(Unknown Source) ~[?:?]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.loadReceivers(DelegationTokenReceiverRepository.java:98)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.(DelegationTokenReceiverRepository.java:60)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:245)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:293)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:486)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:530)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:530)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.runTaskManagerSecurely(KubernetesTaskExecutorRunner.java:66)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.main(KubernetesTaskExecutorRunner.java:46)
> [flink-dist-1.17.0.jar:1.17.0]
> 2023-05-06 16:52:45,729 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -
> Terminating TaskManagerRunner with exit code 1.
> org.apache.flink.util.FlinkException: Failed to start the
> TaskManagerRunner.
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:488)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:530)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:530)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.runTaskManagerSecurely(KubernetesTaskExecutorRunner.java:66)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.main(KubernetesTaskExecutorRunner.java:46)
> [flink-dist-1.17.0.jar:1.17.0]
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> java.lang.IllegalStateException: Delegation token receiver with service
> name {} has multiple implementations [s3]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.lambda$loadReceivers$0(DelegationTokenReceiverRepository.java:93)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at java.util.Iterator.forEachRemaining(Unknown Source) ~[?:?]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.loadReceivers(DelegationTokenReceiverRepository.java:98)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.(Delega

Re: Flink广播流状态清理策略不生效

2023-05-14 Thread Hangxiang Yu
Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里
对
State TTL 的描述;

On Mon, May 15, 2023 at 11:05 AM lxk  wrote:

> flink版本:1.14
> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。
> 在主程序中,我设置了状态过期策略:
>SingleOutputStreamOperator baiduStream =
> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data,
> AdvertiseClick.class)).name("BaiDuAdClick");
> MapStateDescriptor baiduInfoMap = new
> MapStateDescriptor<>("advertiseInfo", String.class, AdvertiseClick.class);
> StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.days(7))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .cleanupFullSnapshot()
> .cleanupIncrementally(200, true)
> .build();
> baiduInfoMap.enableTimeToLive(ttlConfig);
> 在BroadcastProcessFunction中,我也设置了状态清除策略:
> public void open(Configuration parameters) throws Exception {
> jedisClusterSink = Ad_MyRedisUtil.getJedisClient();
> baiduInfoDesc = new MapStateDescriptor AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class);
> StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.days(7))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .cleanupFullSnapshot()
> .cleanupIncrementally(200, true)
> .build();
> baiduInfoDesc.enableTimeToLive(ttlConfig);
>
> }
> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。
>
>
> https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg
>
>
>
>
> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。



-- 
Best,
Hangxiang.


Re: Re: Flink广播流状态清理策略不生效

2023-05-15 Thread Hangxiang Yu
Hi, 可以参考这个 Ticket ,就是讨论要给 Broadcast State 加 TTL 的,当时应该没有继续深入讨论:
https://issues.apache.org/jira/browse/FLINK-13721
方便的话你可以在 Ticket 下面也分享下你的使用场景、观察到的现象吗?也可以在 Ticket 下 Vote for this issue.
我这边也会帮忙一起看下

On Mon, May 15, 2023 at 1:41 PM lxk  wrote:

> 这么看来,广播流好像不适合在生产中使用,状态会无限止的增长。这块官方有计划增加ttl功能吗。
> 或者使用广播流的时候有没有什么能够手动清理状态的方法?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-05-15 11:28:54,"Hangxiang Yu"  写道:
> >Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里
> ><
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl
> >对
> >State TTL 的描述;
> >
> >On Mon, May 15, 2023 at 11:05 AM lxk  wrote:
> >
> >> flink版本:1.14
> >> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。
> >> 在主程序中,我设置了状态过期策略:
> >>SingleOutputStreamOperator baiduStream =
> >> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data,
> >> AdvertiseClick.class)).name("BaiDuAdClick");
> >> MapStateDescriptor baiduInfoMap = new
> >> MapStateDescriptor<>("advertiseInfo", String.class,
> AdvertiseClick.class);
> >> StateTtlConfig ttlConfig = StateTtlConfig
> >> .newBuilder(Time.days(7))
> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >> .cleanupFullSnapshot()
> >> .cleanupIncrementally(200, true)
> >> .build();
> >> baiduInfoMap.enableTimeToLive(ttlConfig);
> >> 在BroadcastProcessFunction中,我也设置了状态清除策略:
> >> public void open(Configuration parameters) throws Exception {
> >> jedisClusterSink = Ad_MyRedisUtil.getJedisClient();
> >> baiduInfoDesc = new MapStateDescriptor >> AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class);
> >> StateTtlConfig ttlConfig = StateTtlConfig
> >> .newBuilder(Time.days(7))
> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >> .cleanupFullSnapshot()
> >> .cleanupIncrementally(200, true)
> >> .build();
> >> baiduInfoDesc.enableTimeToLive(ttlConfig);
> >>
> >> }
> >> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。
> >>
> >>
> >> https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg
> >>
> >>
> >>
> >>
> >> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。
> >
> >
> >
> >--
> >Best,
> >Hangxiang.
>


-- 
Best,
Hangxiang.


Re: flink on yarn rocksdb内存超用

2023-06-07 Thread Hangxiang Yu
Hi, 目前对RocksDB使用的内存是没有严格限制住的,可以参考这个 ticket:
https://issues.apache.org/jira/browse/FLINK-15532
如果要定位到内存使用情况,可以先看一些粗的Metrics:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics
如果要再细致定位到单 instance 内部 RocksDB 的详细内存使用情况,可能需要用 malloc
的prof工具了,比如Jemalloc的Jeprof:
https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling

On Wed, Jun 7, 2023 at 4:58 PM crazy <2463829...@qq.com.invalid> wrote:

> Hi, 大佬们好,
>      请教下有个应用使用的flink1.13.5 on
> yarn,状态后端用的是rocksdb,任务运行一段时间就会内存超用,把overhead调大一些好像能缓解一些,请问有描述这类问题的相关issue吗?如何定位是哪部分内存超了呢?感谢
>
>
> crazy
> 2463829...@qq.com
>
>
>
>  



-- 
Best,
Hangxiang.


Re: 求flink作业各个算子的延迟指标

2023-06-12 Thread Hangxiang Yu
[.[.]]..latency
这个应该可以满足需求?也可以设置不同的粒度。
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#io

On Mon, Jun 12, 2023 at 5:05 PM casel.chen  wrote:

> 想统计数据经过flink作业各个算子的延迟指标,目前社区开源版能实现吗?



-- 
Best,
Hangxiang.


Re: flink sql作业状态跨存储系统迁移问题

2023-08-01 Thread Hangxiang Yu
Hi, 我理解可以有两种方式:
1. 设定从某个存储集群上恢复并向另一个存储集群上快照,即设置[1]为 HDFS地址,[2] 为后面的对象存储地址
2. 还是在HDFS集群上启停作业,设置 savepoint 目录[3]到对象存储

关于 state processor api,目前 sql 作业确实操作起来比较困难,只能从日志里获取 uid 等信息,以及理解 sql
实际产生的状态才能使用;

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#execution-savepoint-path
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#state-checkpoints-dir
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#state-savepoints-dir

On Sat, Jul 29, 2023 at 11:09 AM casel.chen  wrote:

> 我们要将当前在Hadoop Yarn上运行的flink
> sql作业迁移到K8S上,状态存储介质要从HDFS更换到对象存储,以便作业能够从之前保存点恢复,升级对用户无感。
> 又因为flink作业状态文件内容中包含有绝对路径,所以不能通过物理直接复制文件的办法实现。
>
>
> 查了一下官网flink state processor api目前读取状态需要传参uid和flink状态类型,但问题是flink
> sql作业的uid是自动生成的,状态类型我们也无法得知,请问有没有遍历目录下保存的所有状态并将其另存到另一个文件系统目录下的API ? 感觉state
> processor api更适合stream api写的作业,sql作业几乎无法处理。是这样么?



-- 
Best,
Hangxiang.


user-zh@flink.apache.org

2023-09-06 Thread Hangxiang Yu
Hi,
https://flink-learning.org.cn/article/detail/c1db8bc157c72069979e411cd99714fd
这篇文章中有一些关于 Flink RocksDB write buffer 和 block cache 内存计算的理论和实例讲解,可以参考下

On Fri, Sep 1, 2023 at 2:56 PM crazy <2463829...@qq.com.invalid> wrote:

> 大佬们好,
>       flink1.13.5
> statebackends基于rocksdb,请问下在托管模式和非托管模式下,实际内存的使用中,block cache的上限,write
> buffer总的上限是如何计算的? 
>       感谢!
>
>
> crazy
> 2463829...@qq.com
>
>
>
>  



-- 
Best,
Hangxiang.


Re: 1.17.1 - Interval join的时候发生NPE

2023-09-24 Thread Hangxiang Yu
Hi, 请问下是 SQL 作业还是 DataStream 作业,可以提供一些可复现的关键 SQL 或代码吗

On Sat, Sep 23, 2023 at 3:59 PM Phoes Huang  wrote:

> Hi,
>
> 单机本地开发执行,遇到该问题,请问有人遇过并解决吗?
>
> 2023-09-23 13:52:03.989 INFO
> [flink-akka.actor.default-dispatcher-9][Execution.java:1445] - Interval
> Join (19/20)
> (ff8e25fb94208d3c27f549a1e24757ea_e8388ada9c03cfdb1446bb3ccfbd461b_18_0)
> switched from RUNNING to FAILED on d569c5db-6882-496b-9e92-8a40bb631784 @
> localhost (dataPort=-1).
> java.lang.NullPointerException: null
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:149)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:39)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.lambda$logRemoval$1(ChangelogKeyGroupedPriorityQueue.java:153)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.lambda$serialize$4(AbstractStateChangeLogger.java:184)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.serializeRaw(AbstractStateChangeLogger.java:193)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.serialize(AbstractStateChangeLogger.java:178)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.log(AbstractStateChangeLogger.java:151)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.valueElementRemoved(AbstractStateChangeLogger.java:125)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.logRemoval(ChangelogKeyGroupedPriorityQueue.java:153)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.poll(ChangelogKeyGroupedPriorityQueue.java:69)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:301)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:609)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:618)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitWatermark(StreamTwoInputProcessorFactory.java:268)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:115)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> ~[flink-streaming-java-1

Re: Cannot find metata file metadats in directory

2023-09-30 Thread Hangxiang Yu
Hi,
How did you point out the checkpoint path you restored from ?

Seems that you are trying to restore from a not completed or failed
checkpoint.

On Thu, Sep 28, 2023 at 6:09 PM rui chen  wrote:

> When we use 1.13.2,we have the following error:
> FileNotFoundException: Cannot find metata file metadats in directory
> 'hdfs://xx/f408dbe327f9e5053e76d7b5323d6e81/chk-173'.
>


-- 
Best,
Hangxiang.