Re:eventTime语义一些设备的数据总是迟到被丢弃
你好, 设备数据迟到多久?maxOutofOrderness设置一个合适的值,让迟到的数据到达窗口后再计算。 best regards 在 2021-08-03 15:15:37,"zwoi" <318666...@qq.com.INVALID> 写道: >hi > 我的设备数据是这样的, 设备id id(设备的唯一标识), 时间戳 time,要处理的指标 value, > 在eventTime语义下watermark 生成方式为new >Watermark(Math.max(time, currentMaxTimestamp) -maxOutofOrderness), > 我需要对设备数据 做 keyby(id) >分组后再计算,但总有几个设备数据迟到,导致这几个设备数据就一直计算不到,请问有什么解决办法吗?
Re:几个Flink 1.12. 2超时问题
你好, 请问一下为什么要设置128并行度,这个数值有点太大了,出于什么考虑设置的 在 2021-08-03 14:02:53,"Chenyu Zheng" 写道: 开发者您好, 我正在尝试在Kubernetes上部署Flink 1.12.2,使用的是native application部署模式。但是在测试中发现,当将作业并行度调大之后,各种timeout时有发生。根据监控看,JM和TM容器的cpu和内存都没有使用到k8s给分配的量。 在尝试调大akka.ask.timeout至1分钟,和heartbeat.timeout至2分钟之后,各种超时现象得以缓解。 我的问题是,当设置较大并行度(比如128)时,akka超时和心跳超时的各种现象都是正常的吗?如果不正常,需要用什么方式去troubleshot问题的根源呢?另外单纯一味调大各个组件的超时时间,会带来什么负面作用呢? 附件中有akka超时的jobmanager日志,TaskManager心跳超时日志稍后会发上来。 谢谢!
Re:Re: 回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
非常感谢,受益良多。 best regards 在 2021-08-02 17:33:14,"Tony Wei" 写道: >你好, > >如果我沒有理解錯你的應用場景的話,你想達成的結果應該是類似這篇討論 [1] 裡提到的問題對吧? >從最新的 flink 文檔 [2] 中來看應該無法透過你期望的 on duplicate key 語句來實現, >或許可以嘗試在 SELECT 語句上達成,舉例來說你可以在原有的 select 語句之外多添加 group by,如下: > >insert into t select a, last_value(b ignore nulls) as b, last_value(c >> ignore nulls) as c from $(original_select_statement) group by a; > > >不過目前 last_value 似乎不支持 ignore nulls,你可以考慮自己實現一個 UDAF 來達成。 >另外,這樣的做法也會造成 flink state 不斷增長 (由於 group by 的緣故),所以需要多加小心,比如適當的配置 state ttl。 > >best regards, > >[1] >https://stackoverflow.com/questions/48144641/mysql-using-on-duplicate-key-update-coalesce >[2] >https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/insert/#syntax > >Ye Chen 於 2021年8月2日 週一 下午4:08寫道: > >> 你好,我们用的1.11版本。 >> >> 需求:table t 有三个字段(a,b,c) >> 我们的场景只想根据主键a更新部分字段b,其余的字段c原有的值保持不变, >> >> 例如mysql 支持: >> insert into t(a,b) select 1,2 on duplicate key update b=2; >> 主键重复的时候只更新字段b,字段c的值不变。 >> 但是flink sql 目前只支持全字段更新:insert into t(a,b,c) select 1,2,3 。 >> 我在sql-client测试了一下:insert into t(a,b) select 1,2 on duplicate key update >> b=2; 会报错 不支持 on duplicate key update >> 同时也测试了一下:insert into t(a,b) select 1,2 也会报错,字段数量不匹配; >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.flink.table.api.ValidationException: Column types of query >> result and sink for registered table 'default_catalog.default_database.t' >> do not match. >> Cause: Different number of columns. >> 我查看了https://issues.apache.org/jira/browse/FLINK-18726 >> 使用最新版的1.13 sql-client测试了一下 insert into t(a,b) select 1,2 >> ,相比1.11版本报错,1.13可以执行成功.但是发现和文档中描述的一样,字段c会插入null。如果原有的一条数据是1,2,3执行sql后会变成1,2,null,会造成字段c的数据丢失,这是不允许的。 >> 请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理? >> >> 在 2021-08-02 15:39:09,"silence" 写道: >> >用的什么版本这个应该已经在1.13中修复了https://issues.apache.org/jira/browse/FLINK-18726 >> >不行的话可以在ddl中限制列的数量 >> > >> > >> >-- >> >发件人:Ye Chen >> >发送时间:2021年8月2日(星期一) 11:37 >> >收件人:user-zh ; silence >> >主 题:Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? >> > >> >你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错: >> >[ERROR] Could not execute SQL statement. Reason: >> >org.apache.flink.table.api.ValidationException: Column types of query >> result and sink for registered table 'default_catalog.default_database.t' >> do not match. >> >Cause: Different number of columns. >> >我们的需求是想根据主键更新部分字段 >> >- >> >需求:现有table >> >CREATE TABLE t ( >> > abigint, >> > bbigint, >> > cbigint, >> > PRIMARY KEY (a) NOT ENFORCED >> >) WITH ( >> >... >> >); >> >我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变, >> >例如mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key >> update b='4';主键重复的时候只更新字段b,字段c的值不变。 >> >我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 >> >请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理? >> > >> > >> >在 2021-08-02 10:47:55,"silence" 写道: >> >>如果只想更新部分字段的话可以试下 >> >>insert into t(a,b) select a,b from x >> >> >> >> >> >>-- >> >>发件人:Ye Chen >> >>发送时间:2021年7月30日(星期五) 17:57 >> >>收件人:user-zh >> >>主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? >> >> >> >>现有table >> >>CREATE TABLE t ( >> >> abigint, >> >> bbigint, >> >> cbigint, >> >> PRIMARY KEY (a) NOT ENFORCED >> >>) WITH ( >> >>... >> >>); >> >> >> >> >> >>我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如 >> >>mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key >> update b='4'; >> >>主键重复的时候只更新字段b,字段c的值不变 >> >> >> >> >> >>我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 >> >>请问这种部分字段更新的场景 使用flink sql应该怎么处理? >> >> >> >> >> > >> > >> > >>
Re:回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
你好,我们用的1.11版本。 需求:table t 有三个字段(a,b,c) 我们的场景只想根据主键a更新部分字段b,其余的字段c原有的值保持不变, 例如mysql 支持: insert into t(a,b) select 1,2 on duplicate key update b=2; 主键重复的时候只更新字段b,字段c的值不变。 但是flink sql 目前只支持全字段更新:insert into t(a,b,c) select 1,2,3 。 我在sql-client测试了一下:insert into t(a,b) select 1,2 on duplicate key update b=2; 会报错 不支持 on duplicate key update 同时也测试了一下:insert into t(a,b) select 1,2 也会报错,字段数量不匹配; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.t' do not match. Cause: Different number of columns. 我查看了https://issues.apache.org/jira/browse/FLINK-18726 使用最新版的1.13 sql-client测试了一下 insert into t(a,b) select 1,2 ,相比1.11版本报错,1.13可以执行成功.但是发现和文档中描述的一样,字段c会插入null。如果原有的一条数据是1,2,3执行sql后会变成1,2,null,会造成字段c的数据丢失,这是不允许的。 请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理? 在 2021-08-02 15:39:09,"silence" 写道: >用的什么版本这个应该已经在1.13中修复了https://issues.apache.org/jira/browse/FLINK-18726 >不行的话可以在ddl中限制列的数量 > > >------ >发件人:Ye Chen >发送时间:2021年8月2日(星期一) 11:37 >收件人:user-zh ; silence >主 题:Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? > >你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错: >[ERROR] Could not execute SQL statement. Reason: >org.apache.flink.table.api.ValidationException: Column types of query result >and sink for registered table 'default_catalog.default_database.t' do not >match. >Cause: Different number of columns. >我们的需求是想根据主键更新部分字段 >- >需求:现有table >CREATE TABLE t ( > abigint, > bbigint, > cbigint, > PRIMARY KEY (a) NOT ENFORCED >) WITH ( >... >); >我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变, >例如mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key update >b='4';主键重复的时候只更新字段b,字段c的值不变。 >我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 >请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理? > > >在 2021-08-02 10:47:55,"silence" 写道: >>如果只想更新部分字段的话可以试下 >>insert into t(a,b) select a,b from x >> >> >>-- >>发件人:Ye Chen >>发送时间:2021年7月30日(星期五) 17:57 >>收件人:user-zh >>主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? >> >>现有table >>CREATE TABLE t ( >> abigint, >> bbigint, >> cbigint, >> PRIMARY KEY (a) NOT ENFORCED >>) WITH ( >>... >>); >> >> >>我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如 >>mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key update >>b='4'; >>主键重复的时候只更新字段b,字段c的值不变 >> >> >>我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 >>请问这种部分字段更新的场景 使用flink sql应该怎么处理? >> >> > > >
Re:Re: 回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
需求:现有table t 三个字段 CREATE TABLE t ( abigint, bbigint, cbigint, PRIMARY KEY (a) NOT ENFORCED ) WITH ( ... ); 我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变, 例如mysql 支持:insert into t(a,b) select 1,2 on duplicate key update b=2; 主键重复的时候只更新字段b,字段c的值不变。但是flink sql 目前只支持全字段更新:insert into t(a,b,c) select 1,2,3 我在sql-client测试了一下:insert into t(a,b) select 1,2 on duplicate key update b=2; 会报错 不支持 on duplicate key update同时也测试了一下:insert into t(a,b) select 1,2 也会报错,字段数量不匹配;[ERROR] Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.t' do not match.Cause: Different number of columns.请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理? 在 2021-08-02 14:35:04,"Shengkai Fang" 写道: >Flink 暂时不支持这个功能,可能需要自己改一下 jdbc connector 相关的代码. > >但是这个报错很奇怪..你 sql 咋写的 > >Ye Chen 于2021年8月2日周一 上午11:37写道: > >> 你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错: >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.flink.table.api.ValidationException: Column types of query >> result and sink for registered table 'default_catalog.default_database.t' >> do not match. >> Cause: Different number of columns. >> 我们的需求是想根据主键更新部分字段 >> >> - >> >> 需求:现有table >> CREATE TABLE t ( >> abigint, >> bbigint, >> cbigint, >> PRIMARY KEY (a) NOT ENFORCED >> ) WITH ( >> ... >> ); >> 我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变, >> 例如mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key >> update b='4';主键重复的时候只更新字段b,字段c的值不变。 >> 我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 >> 请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理? >> >> >> >> >> >> >> >> 在 2021-08-02 10:47:55,"silence" 写道: >> >如果只想更新部分字段的话可以试下 >> >insert into t(a,b) select a,b from x >> > >> > >> >-- >> >发件人:Ye Chen >> >发送时间:2021年7月30日(星期五) 17:57 >> >收件人:user-zh >> >主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? >> > >> >现有table >> >CREATE TABLE t ( >> > abigint, >> > bbigint, >> > cbigint, >> > PRIMARY KEY (a) NOT ENFORCED >> >) WITH ( >> >... >> >); >> > >> > >> >我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如 >> >mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key >> update b='4'; >> >主键重复的时候只更新字段b,字段c的值不变 >> > >> > >> >我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 >> >请问这种部分字段更新的场景 使用flink sql应该怎么处理? >> > >> > >>
Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.t' do not match. Cause: Different number of columns. 我们的需求是想根据主键更新部分字段 - 需求:现有table CREATE TABLE t ( abigint, bbigint, cbigint, PRIMARY KEY (a) NOT ENFORCED ) WITH ( ... ); 我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变, 例如mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key update b='4';主键重复的时候只更新字段b,字段c的值不变。 我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理? 在 2021-08-02 10:47:55,"silence" 写道: >如果只想更新部分字段的话可以试下 >insert into t(a,b) select a,b from x > > >------ >发件人:Ye Chen >发送时间:2021年7月30日(星期五) 17:57 >收件人:user-zh >主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? > >现有table >CREATE TABLE t ( > abigint, > bbigint, > cbigint, > PRIMARY KEY (a) NOT ENFORCED >) WITH ( >... >); > > >我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如 >mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key update >b='4'; >主键重复的时候只更新字段b,字段c的值不变 > > >我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 >请问这种部分字段更新的场景 使用flink sql应该怎么处理? > >
Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
你好,upsert是全字段更新,必须指定所有的字段值。我们的需求是想根据主键更新部分字段,其余字段不变。 现有table CREATE TABLE t ( abigint, bbigint, cbigint, PRIMARY KEY (a) NOT ENFORCED ) WITH ( ... ); 我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变,例如 mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key update b='4';主键重复的时候只更新字段b,字段c的值不变。 我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 请问这种部分字段更新的场景 使用flink sql应该怎么处理? 在 2021-08-02 10:08:28,"silence" 写道: >你在你的sink ddl定义了主键会自动的按主键进行upsert的 >参考https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#idempotent-writes > > >------ >发件人:Ye Chen >发送时间:2021年7月30日(星期五) 17:57 >收件人:user-zh >主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? > >现有table >CREATE TABLE t ( > abigint, > bbigint, > cbigint, > PRIMARY KEY (a) NOT ENFORCED >) WITH ( >... >); > > >我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如 >mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key update >b='4'; >主键重复的时候只更新字段b,字段c的值不变 > > >我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 >请问这种部分字段更新的场景 使用flink sql应该怎么处理? > >
场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
现有table CREATE TABLE t ( abigint, bbigint, cbigint, PRIMARY KEY (a) NOT ENFORCED ) WITH ( ... ); 我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如 mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key update b='4'; 主键重复的时候只更新字段b,字段c的值不变 我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 请问这种部分字段更新的场景 使用flink sql应该怎么处理?
Re:Re:SqlValidatorException: No match found for function signature prod()
应该是继承scalaFunction ? 在 2021-02-22 10:25:31,"xiaoyue" <18242988...@163.com> 写道: >捞一下自己,在线等大佬们的回复 _(:з」∠)_ > > > > > > > >在 2021-02-20 13:14:18,"xiaoyue" <18242988...@163.com> 写道: > >我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function >signature prod(),请求大佬帮忙看看_(:з」∠)_ > >以下是代码: >- >... > stableEnv.createTemporarySystemFunction("prod", > ProductAggregateFunction.class); > Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as > yldrate from queryData group by pf_id"); >... >- >@FunctionHint( >input = @DataTypeHint("Double"), >output = @DataTypeHint("Double") >) >public class ProductAggregateFunction extends AggregateFunctionProduct> { > > >@Override >public Double getValue(Product acc) { >return acc.prod; >} >@Override >public Product createAccumulator() { >return new Product(); >} >public void accumulate(Product acc, Double iValue) { >acc.prod *= iValue; >} >public void retract(Product acc, Double iValue) { >acc.prod /= iValue; >} >public void merge(Product acc, Iterable it) { >for (Product p : it) { >accumulate(acc, p.prod); >} >} >public void resetAccumulator(Product acc) { >acc.prod = 1D; >} >} > > > > > >
Re:flink生成大宽表
可以用多流join,但是数据延迟会导致join不上,可以侧输出处理下,看业务需求 在 2021-02-22 11:05:46,"liujian" <13597820...@qq.com> 写道: >Hi: > 大家好,有3张实时的表,相互关联可以形成大宽表,如何一张都会更新,那么我该如何实现流处理,我目标表放到kudu上 > > 我的理解: > 直接使用jdbc-connecter将三张表读取,然后join,再写入,会不会有什么问题
Re:Flink SQL并发度问题
并发度的设置有优先级,客户端级别小于算子级别,所以上游source算子单独设置并发度会生效,而下游仍然是客户端级别的并发度。 在 2021-02-20 18:23:52,"guaishushu1...@163.com" 写道: > 这几天研究了flink table 转化为stream node 的源码,发现是某个算子的并发度取决于上一个算子的并发度。 >但是在实际测试过程中发现使用window aggregate 语句时候 该算子的并发度和上游的source不一致 和我cli 命令配置的并发度一致 >这是为什么呢? > > >guaishushu1...@163.com
自定义partition,使用遇到问题,附代码
各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。 //自定义partition public class customPartitioner extends FlinkKafkaPartitioner { @Override public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) { return 0; } } DataStream stream = 。。。 FlinkKafkaProducer myProducer = new FlinkKafkaProducer<>( "test_topic", new SimpleStringSchema(), properties, new customPartitioner() ); stream.addSink(myProducer); //上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java: 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】 //去掉new customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数 查看FlinkKafkaProducer源码如下,我上面的写法有问题么? public FlinkKafkaProducer( String topicId, SerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner) { this( topicId, serializationSchema, producerConfig, customPartitioner.orElse(null), Semantic.AT_LEAST_ONCE, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); }
Re:多流join的场景如何优化
双流join或者多流join从技术上是可以实现你这个场景的,网上有很多成熟的案例。 但是要考虑具体的业务需求,比如数据是否能在规定时间到达,未到达如何处理,如果因为多流join造成数据缺失或者延迟,对业务影响比较大的话还不如继续用维表。 在 2021-01-26 11:30:56,"hl9...@126.com" 写道: >请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。 > >电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段): >market_act(营销活动): >{act_id:营销活动id,start_time:活动开始时间,end_time:活动结束时间,shop_id:活动的门店} >new_member(新增会员): {member_id:新会员id,act_id:吸引会员的营销活动id,create_time:新会员生成时间} >orders(订单):{order_no:订单号,amt:订单金额,member_id:会员id,create_time:订单时间,shop_id:下单的门店} > >需求:按活动统计活动期间新会员产生的订单金额 >伪sql: >select act_id,count(1) as order_num,sum(amt) as order_amt >from orders t1 >inner join new_member t2 on t1.member_id=t2.member_id >inner join market_act t3 on t2.act_id=t3.act_id >where t1.create_time between t3.start_time and t3.end_time ; > >目前做法: >将 market_act 和 new_member 两个维表消息放到redis缓存, >flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动, >是则输出{act_id,order_no,amt,member_id},然后sink到db。 > >我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算? > > > >hl9...@126.com
Re:Flink 并行度问题
@jacob hi, TaskManager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,并且 Slot 只对内存隔离,没有对 CPU 隔离。 而slot 和并行度的关系是:Slot 是指 TaskManager 最大能并发执行的能力,parallelism 是指 TaskManager 实际使用的并发能力。 个人见解,并行度的设置一般无需考虑CPU。 在 2021-01-22 16:18:32,"Jacob" <17691150...@163.com> 写道: >使用Flink以来,一直有一个问题困扰着。 > > >Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。 > >比如Flink消费kafka >topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。 > > >如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ? > >在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢? > > > > >- >Thanks! >Jacob >-- >Sent from: http://apache-flink.147419.n8.nabble.com/
Re:flink消费kafka出现GC问题
@nick 我们之前也遇到的过GC导致任务挂掉问题,后来排查发现flink代码写的有问题,在@close方法中关闭数据库连接,但是事实上@close方法未起作用,导致资源未释放OOM。 hope this can help you At 2021-01-22 14:18:51, "nick" wrote: >org.apache.flink.runtime.JobException: Recovery is suppressed by >NoRestartBackoffTimeStrategy > at >org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) >~[flink-dist_2.11-1.12.0.jar:1.12.0] > at >org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) >~[flink-dist_2.11-1.12.0.jar:1.12.0] > at >org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224) >~[flink-dist_2.11-1.12.0.jar:1.12.0] > at >org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217) >~[flink-dist_2.11-1.12.0.jar:1.12.0] > at >org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208) >~[flink-dist_2.11-1.12.0.jar:1.12.0] > at >org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610) >~[flink-dist_2.11-1.12.0.jar:1.12.0] > at >org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) >~[flink-dist_2.11-1.12.0.jar:1.12.0] > at >org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419) >~[flink-dist_2.11-1.12.0.jar:1.12.0] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >~[?:1.8.0_271] > at >sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >~[?:1.8.0_271] > at >sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >~[?:1.8.0_271] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_271] > at >org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286) >~[flink-dist_2.11-1.12.0.jar:1.12.0] > at >org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201) >~[flink-dist_2.11-1.12.0.jar:1.12.0] > at >org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >~[flink-dist_2.11-1.12.0.jar:1.12.0] > at >org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) >~[flink-dist_2.11-1.12.0.jar:1.12.0] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at >akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >[flink-dist_2.11-1.12.0.jar:1.12.0] > at >akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >[flink-dist_2.11-1.12.0.jar:1.12.0] >Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded > at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) ~[?:1.8.0_271] > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_271] > at >org.apache.flink.kafka.shaded.org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) >~[?:?] > at >org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) >~[?:?] > at