你好,
请问一下为什么要设置128并行度,这个数值有点太大了,出于什么考虑设置的
在 2021-08-03 14:02:53,"Chenyu Zheng" 写道:
开发者您好,
我正在尝试在Kubernetes上部署Flink 1.12.2,使用的是native
application部署模式。但是在测试中发现,当将作业并行度调大之后,各种timeout时有发生。根据监控看,JM和TM容器的cpu和内存都没有使用到k8s给分配的量。
在尝试调大akka.ask.timeout至1分钟,和heartbeat.timeout至2分钟之后,各种超时现象得以缓解。
Taskmanager timeout:
java.util.concurrent.CompletionException:
org.apache.flink.client.deployment.application.ApplicationExecutionException:
Could not execute application.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
~[?:1.8.0_282]
更正,这个是akka timeout exception
java.util.concurrent.CompletionException:
org.apache.flink.client.deployment.application.ApplicationExecutionException:
Could not execute application.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
~[?:1.8.0_282
AKKA timeout
java.util.concurrent.CompletionException:
org.apache.flink.client.deployment.application.ApplicationExecutionException:
Could not execute application.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
~[?:1.8.0_282]
开发者您好,
我正在尝试在Kubernetes上部署Flink 1.12.2, 使用的是native
application部署模式。但是在测试中发现,当将作业并行度调大之后,各种timeout时有发生。根据监控看,JM和TM容器的cpu和内存都没有使用到k8s给分配的量。
在尝试调大akka.ask.timeout至1分钟,和heartbeat.timeout至2分钟之后,各种超时现象得以缓解。
我的问题是,当设置较大并行度(比如128)时,akka超时和心跳超时的各种现象都是正常的吗?如果不正常,需要用什么方式去troubleshot问题的根源呢?另外单纯一味调大各个组件的超时时
非常感谢,受益良多。
best regards
在 2021-08-02 17:33:14,"Tony Wei" 写道:
>你好,
>
>如果我沒有理解錯你的應用場景的話,你想達成的結果應該是類似這篇討論 [1] 裡提到的問題對吧?
>從最新的 flink 文檔 [2] 中來看應該無法透過你期望的 on duplicate key 語句來實現,
>或許可以嘗試在 SELECT 語句上達成,舉例來說你可以在原有的 select 語句之外多添加 group by,如下:
>
>insert into t select a, last_value(b ignore nul
谢谢你,受教了
Caizhi Weng 于 2021年8月2日周一 19:28写道:
> Hi!
>
> shimin huang 说的可能是原本的 at least once 的 checkpoint 机制,这种 checkpoint 原本就是不对齐的。
>
> Flink 1.13 完善了 exactly once 条件下的不对齐 checkpoint 机制,因此这是能保证 exactly once
> 的。实现原理简单来说就是把还没处理的数据一起写到 state 里,下次恢复的时候把这些还没处理的数据也恢复出来接着处理。
>
> 张锴 于2021年8月2日周一 下午7:20写道
Hi!
shimin huang 说的可能是原本的 at least once 的 checkpoint 机制,这种 checkpoint 原本就是不对齐的。
Flink 1.13 完善了 exactly once 条件下的不对齐 checkpoint 机制,因此这是能保证 exactly once
的。实现原理简单来说就是把还没处理的数据一起写到 state 里,下次恢复的时候把这些还没处理的数据也恢复出来接着处理。
张锴 于2021年8月2日周一 下午7:20写道:
> 这个原理能说明一下吗,咋做到的
>
> 东东 于2021年8月2日周一 下午7:16写道:
>
> > 对
这个原理能说明一下吗,咋做到的
东东 于2021年8月2日周一 下午7:16写道:
> 对于每一个subtask,边界仍然是清晰的,所以精确一次可以保证,只不过ck会变大。
>
> 在 2021-08-02 18:53:11,"张锴" 写道:
> >flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N +
> >1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次?
>
对于每一个subtask,边界仍然是清晰的,所以精确一次可以保证,只不过ck会变大。
在 2021-08-02 18:53:11,"张锴" 写道:
>flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N +
>1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次?
从topic B实时写到hive,这个job需要配置 isolation.level 为
read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。
在 2021-08-02 19:00:13,"Jim Chen" 写道:
>我不太懂,下游的isolation.level是不是read_committed是啥意思。
>我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
>B实时写到hive上,然后在hive表中,根据partitionId和offse
我不太懂,下游的isolation.level是不是read_committed是啥意思。
我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了
东东 于2021年8月2日周一 下午6:20写道:
> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
>
>
> 在 2021-08-02 18:14:27,"Jim Chen" 写道:
> >Hi
不可以,会存在重复消费的问题,如果buffer没有对齐的话,job重启,那么这些buffer的数据就会清空,然后相关的subtask会重新消费一遍。
张锴 于2021年8月2日周一 下午6:53写道:
> flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N +
> 1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次?
>
flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N +
1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次?
下游如何发现重复数据的,下游的isolation.level是不是read_committed
在 2021-08-02 18:14:27,"Jim Chen" 写道:
>Hi 刘建刚,
>我使用了stop with savepoint,但是还是发现,下游有重复数据。
>停止命令:
>/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
>-yid application_1625497885855_703064 \
>-p
>hdfs://ztcluster/flink_realtime_warehouse/checkpoi
Hi 刘建刚,
我使用了stop with savepoint,但是还是发现,下游有重复数据。
停止命令:
/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
-yid application_1625497885855_703064 \
-p
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
\
-d 55e7ebb6fa38faaba61b4b9a7cd89827
重启命令:
/home/datadev/flink-1
可以自定义 KafkaAppender,然后可以从 System.getProperty("log.file")
获取你要的信息维度数据,比如这个可以提取到作业 application id,container_id,是 jobmanager 还是
taskmanager,还可以根据配置只提取想要的级别日志,最后将打的依赖放到 lib 目录下即可
yujianbo <15205029...@163.com> 于2021年6月15日周二 下午7:34写道:
> 最新详细配置,可以看看我的博客:
> https://blog.csdn.net/weixin_44500374/article/
補充一下,從代碼上來看, last_value 原本的實現似乎就是獲取 last "non-null" value 了。
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LastValueAggFunction.java
Tony Wei 於 2021年8月2日 週一 下午5:33寫道:
> 你好,
>
> 如果我沒有理解錯你的應用場景的話,你想達
你好,
如果我沒有理解錯你的應用場景的話,你想達成的結果應該是類似這篇討論 [1] 裡提到的問題對吧?
從最新的 flink 文檔 [2] 中來看應該無法透過你期望的 on duplicate key 語句來實現,
或許可以嘗試在 SELECT 語句上達成,舉例來說你可以在原有的 select 語句之外多添加 group by,如下:
insert into t select a, last_value(b ignore nulls) as b, last_value(c
> ignore nulls) as c from $(original_select_statement) gro
你好,我们用的1.11版本。
需求:table t 有三个字段(a,b,c)
我们的场景只想根据主键a更新部分字段b,其余的字段c原有的值保持不变,
例如mysql 支持:
insert into t(a,b) select 1,2 on duplicate key update b=2;
主键重复的时候只更新字段b,字段c的值不变。
但是flink sql 目前只支持全字段更新:insert into t(a,b,c) select 1,2,3 。
我在sql-client测试了一下:insert into t(a,b) select 1,2 on duplicate key up
cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
Jim Chen 于2021年8月2日周一 下午2:33写道:
> 我是通过savepoint的方式重启的,命令如下:
>
> cancel command:
>
> /home/datadev/flink-1.12.
用的什么版本这个应该已经在1.13中修复了https://issues.apache.org/jira/browse/FLINK-18726
不行的话可以在ddl中限制列的数量
--
发件人:Ye Chen
发送时间:2021年8月2日(星期一) 11:37
收件人:user-zh ; silence
主 题:Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
你好,我试了一下,如果
需求:现有table t 三个字段
CREATE TABLE t (
abigint,
bbigint,
cbigint,
PRIMARY KEY (a) NOT ENFORCED
) WITH (
...
);
我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变,
例如mysql 支持:insert into t(a,b) select 1,2 on duplicate key update b=2;
主键重复的时候只更新字段b,字段c的值不变。但是flink sql 目前只支持全字段更新:insert into t(a
hive 1.1.0
-- --
??:
"user-zh"
退订
25 matches
Mail list logo