Re:eventTime语义一些设备的数据总是迟到被丢弃

2021-08-03 Thread Ye Chen
你好,
设备数据迟到多久?maxOutofOrderness设置一个合适的值,让迟到的数据到达窗口后再计算。



best regards







在 2021-08-03 15:15:37,"zwoi" <318666...@qq.com.INVALID> 写道:
>hi
>   我的设备数据是这样的, 设备id id(设备的唯一标识), 时间戳 time,要处理的指标 value,
>   在eventTime语义下watermark 生成方式为new 
>Watermark(Math.max(time, currentMaxTimestamp) -maxOutofOrderness),
>   我需要对设备数据 做 keyby(id) 
>分组后再计算,但总有几个设备数据迟到,导致这几个设备数据就一直计算不到,请问有什么解决办法吗?


Re:几个Flink 1.12. 2超时问题

2021-08-03 Thread Ye Chen
你好,
请问一下为什么要设置128并行度,这个数值有点太大了,出于什么考虑设置的






在 2021-08-03 14:02:53,"Chenyu Zheng"  写道:

开发者您好,

 

我正在尝试在Kubernetes上部署Flink 1.12.2,使用的是native 
application部署模式。但是在测试中发现,当将作业并行度调大之后,各种timeout时有发生。根据监控看,JM和TM容器的cpu和内存都没有使用到k8s给分配的量。

 

在尝试调大akka.ask.timeout至1分钟,和heartbeat.timeout至2分钟之后,各种超时现象得以缓解。

 

我的问题是,当设置较大并行度(比如128)时,akka超时和心跳超时的各种现象都是正常的吗?如果不正常,需要用什么方式去troubleshot问题的根源呢?另外单纯一味调大各个组件的超时时间,会带来什么负面作用呢?

 

附件中有akka超时的jobmanager日志,TaskManager心跳超时日志稍后会发上来。

 

谢谢!

 

Re:Re: 回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 Thread Ye Chen
非常感谢,受益良多。

best regards











在 2021-08-02 17:33:14,"Tony Wei"  写道:
>你好,
>
>如果我沒有理解錯你的應用場景的話,你想達成的結果應該是類似這篇討論 [1] 裡提到的問題對吧?
>從最新的 flink 文檔 [2] 中來看應該無法透過你期望的 on duplicate key 語句來實現,
>或許可以嘗試在 SELECT 語句上達成,舉例來說你可以在原有的 select 語句之外多添加 group by,如下:
>
>insert into t select a, last_value(b ignore nulls) as b, last_value(c
>> ignore nulls) as c from $(original_select_statement) group by a;
>
>
>不過目前 last_value 似乎不支持 ignore nulls,你可以考慮自己實現一個 UDAF 來達成。
>另外,這樣的做法也會造成 flink state 不斷增長 (由於 group by 的緣故),所以需要多加小心,比如適當的配置 state ttl。
>
>best regards,
>
>[1]
>https://stackoverflow.com/questions/48144641/mysql-using-on-duplicate-key-update-coalesce
>[2]
>https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/insert/#syntax
>
>Ye Chen  於 2021年8月2日 週一 下午4:08寫道:
>
>> 你好,我们用的1.11版本。
>>
>> 需求:table t 有三个字段(a,b,c)
>> 我们的场景只想根据主键a更新部分字段b,其余的字段c原有的值保持不变,
>>
>> 例如mysql 支持:
>> insert into t(a,b) select 1,2 on duplicate key update b=2;
>> 主键重复的时候只更新字段b,字段c的值不变。
>> 但是flink sql 目前只支持全字段更新:insert into t(a,b,c) select 1,2,3 。
>> 我在sql-client测试了一下:insert into t(a,b) select 1,2 on duplicate key update
>> b=2;  会报错 不支持 on duplicate key update
>> 同时也测试了一下:insert into t(a,b) select 1,2  也会报错,字段数量不匹配;
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.table.api.ValidationException: Column types of query
>> result and sink for registered table 'default_catalog.default_database.t'
>> do not match.
>> Cause: Different number of columns.
>> 我查看了https://issues.apache.org/jira/browse/FLINK-18726
>> 使用最新版的1.13  sql-client测试了一下 insert into t(a,b) select 1,2
>> ,相比1.11版本报错,1.13可以执行成功.但是发现和文档中描述的一样,字段c会插入null。如果原有的一条数据是1,2,3执行sql后会变成1,2,null,会造成字段c的数据丢失,这是不允许的。
>> 请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理?
>>
>> 在 2021-08-02 15:39:09,"silence"  写道:
>> >用的什么版本这个应该已经在1.13中修复了https://issues.apache.org/jira/browse/FLINK-18726
>> >不行的话可以在ddl中限制列的数量
>> >
>> >
>> >--
>> >发件人:Ye Chen 
>> >发送时间:2021年8月2日(星期一) 11:37
>> >收件人:user-zh ; silence 
>> >主 题:Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
>> >
>> >你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错:
>> >[ERROR] Could not execute SQL statement. Reason:
>> >org.apache.flink.table.api.ValidationException: Column types of query
>> result and sink for registered table 'default_catalog.default_database.t'
>> do not match.
>> >Cause: Different number of columns.
>> >我们的需求是想根据主键更新部分字段
>> >-
>> >需求:现有table
>> >CREATE TABLE t (
>> > abigint,
>> > bbigint,
>> > cbigint,
>> >  PRIMARY KEY (a) NOT ENFORCED
>> >) WITH (
>> >...
>> >);
>> >我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变,
>> >例如mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key
>> update b='4';主键重复的时候只更新字段b,字段c的值不变。
>> >我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
>> >请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理?
>> >
>> >
>> >在 2021-08-02 10:47:55,"silence"  写道:
>> >>如果只想更新部分字段的话可以试下
>> >>insert into t(a,b) select a,b from x
>> >>
>> >>
>> >>--
>> >>发件人:Ye Chen 
>> >>发送时间:2021年7月30日(星期五) 17:57
>> >>收件人:user-zh 
>> >>主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
>> >>
>> >>现有table
>> >>CREATE TABLE t (
>> >> abigint,
>> >> bbigint,
>> >> cbigint,
>> >>  PRIMARY KEY (a) NOT ENFORCED
>> >>) WITH (
>> >>...
>> >>);
>> >>
>> >>
>> >>我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如
>> >>mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key
>> update b='4';
>> >>主键重复的时候只更新字段b,字段c的值不变
>> >>
>> >>
>> >>我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
>> >>请问这种部分字段更新的场景 使用flink sql应该怎么处理?
>> >>
>> >>
>> >
>> >
>> >
>>


Re:回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 Thread Ye Chen
你好,我们用的1.11版本。

需求:table t 有三个字段(a,b,c)
我们的场景只想根据主键a更新部分字段b,其余的字段c原有的值保持不变,

例如mysql 支持:
insert into t(a,b) select 1,2 on duplicate key update b=2;  
主键重复的时候只更新字段b,字段c的值不变。
但是flink sql 目前只支持全字段更新:insert into t(a,b,c) select 1,2,3 。
我在sql-client测试了一下:insert into t(a,b) select 1,2 on duplicate key update b=2;  
会报错 不支持 on duplicate key update
同时也测试了一下:insert into t(a,b) select 1,2  也会报错,字段数量不匹配;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Column types of query result 
and sink for registered table 'default_catalog.default_database.t' do not match.
Cause: Different number of columns.
我查看了https://issues.apache.org/jira/browse/FLINK-18726
使用最新版的1.13  sql-client测试了一下 insert into t(a,b) select 1,2 
,相比1.11版本报错,1.13可以执行成功.但是发现和文档中描述的一样,字段c会插入null。如果原有的一条数据是1,2,3执行sql后会变成1,2,null,会造成字段c的数据丢失,这是不允许的。
请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理?

在 2021-08-02 15:39:09,"silence"  写道:
>用的什么版本这个应该已经在1.13中修复了https://issues.apache.org/jira/browse/FLINK-18726
>不行的话可以在ddl中限制列的数量
>
>
>------
>发件人:Ye Chen 
>发送时间:2021年8月2日(星期一) 11:37
>收件人:user-zh ; silence 
>主 题:Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
>
>你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错:
>[ERROR] Could not execute SQL statement. Reason:
>org.apache.flink.table.api.ValidationException: Column types of query result 
>and sink for registered table 'default_catalog.default_database.t' do not 
>match.
>Cause: Different number of columns.
>我们的需求是想根据主键更新部分字段
>-
>需求:现有table 
>CREATE TABLE t (
> abigint,
> bbigint,
> cbigint,
>  PRIMARY KEY (a) NOT ENFORCED
>) WITH (
>...
>);
>我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变,
>例如mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key update 
>b='4';主键重复的时候只更新字段b,字段c的值不变。
>我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
>请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理?
>
>
>在 2021-08-02 10:47:55,"silence"  写道:
>>如果只想更新部分字段的话可以试下
>>insert into t(a,b) select a,b from x
>>
>>
>>--
>>发件人:Ye Chen 
>>发送时间:2021年7月30日(星期五) 17:57
>>收件人:user-zh 
>>主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
>>
>>现有table 
>>CREATE TABLE t (
>> abigint,
>> bbigint,
>> cbigint,
>>  PRIMARY KEY (a) NOT ENFORCED
>>) WITH (
>>...
>>);
>>
>>
>>我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如
>>mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key update 
>>b='4';
>>主键重复的时候只更新字段b,字段c的值不变
>>
>>
>>我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
>>请问这种部分字段更新的场景 使用flink sql应该怎么处理?
>>
>>
>
>
>


Re:Re: 回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 Thread Ye Chen
需求:现有table t 三个字段
CREATE TABLE t (
 abigint,
 bbigint,
 cbigint,
 PRIMARY KEY (a) NOT ENFORCED
) WITH (
...
);
我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变,
例如mysql 支持:insert into t(a,b) select 1,2 on duplicate key update b=2;  
主键重复的时候只更新字段b,字段c的值不变。但是flink sql 目前只支持全字段更新:insert into t(a,b,c) select 1,2,3 
我在sql-client测试了一下:insert into t(a,b) select 1,2 on duplicate key update b=2;  
会报错 不支持 on duplicate key update同时也测试了一下:insert into t(a,b) select 1,2  
也会报错,字段数量不匹配;[ERROR] Could not execute SQL statement. 
Reason:org.apache.flink.table.api.ValidationException: Column types of query 
result and sink for registered table 'default_catalog.default_database.t' do 
not match.Cause: Different number of columns.请问这种根据主键更新部分字段的场景 使用flink 
sql应该怎么处理?















在 2021-08-02 14:35:04,"Shengkai Fang"  写道:
>Flink 暂时不支持这个功能,可能需要自己改一下 jdbc connector 相关的代码.
>
>但是这个报错很奇怪..你 sql 咋写的
>
>Ye Chen  于2021年8月2日周一 上午11:37写道:
>
>> 你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错:
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.table.api.ValidationException: Column types of query
>> result and sink for registered table 'default_catalog.default_database.t'
>> do not match.
>> Cause: Different number of columns.
>> 我们的需求是想根据主键更新部分字段
>>
>> -
>>
>> 需求:现有table
>> CREATE TABLE t (
>>  abigint,
>>  bbigint,
>>  cbigint,
>>   PRIMARY KEY (a) NOT ENFORCED
>> ) WITH (
>> ...
>> );
>> 我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变,
>> 例如mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key
>> update b='4';主键重复的时候只更新字段b,字段c的值不变。
>> 我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
>> 请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理?
>>
>>
>>
>>
>>
>>
>>
>> 在 2021-08-02 10:47:55,"silence"  写道:
>> >如果只想更新部分字段的话可以试下
>> >insert into t(a,b) select a,b from x
>> >
>> >
>> >--
>> >发件人:Ye Chen 
>> >发送时间:2021年7月30日(星期五) 17:57
>> >收件人:user-zh 
>> >主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
>> >
>> >现有table
>> >CREATE TABLE t (
>> > abigint,
>> > bbigint,
>> > cbigint,
>> >  PRIMARY KEY (a) NOT ENFORCED
>> >) WITH (
>> >...
>> >);
>> >
>> >
>> >我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如
>> >mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key
>> update b='4';
>> >主键重复的时候只更新字段b,字段c的值不变
>> >
>> >
>> >我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
>> >请问这种部分字段更新的场景 使用flink sql应该怎么处理?
>> >
>> >
>>


Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-01 Thread Ye Chen
你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Column types of query result 
and sink for registered table 'default_catalog.default_database.t' do not match.
Cause: Different number of columns.
我们的需求是想根据主键更新部分字段

-

需求:现有table 
CREATE TABLE t (
 abigint,
 bbigint,
 cbigint,
  PRIMARY KEY (a) NOT ENFORCED
) WITH (
...
);
我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变,
例如mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key update 
b='4';主键重复的时候只更新字段b,字段c的值不变。
我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理?







在 2021-08-02 10:47:55,"silence"  写道:
>如果只想更新部分字段的话可以试下
>insert into t(a,b) select a,b from x
>
>
>------
>发件人:Ye Chen 
>发送时间:2021年7月30日(星期五) 17:57
>收件人:user-zh 
>主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
>
>现有table 
>CREATE TABLE t (
> abigint,
> bbigint,
> cbigint,
>  PRIMARY KEY (a) NOT ENFORCED
>) WITH (
>...
>);
>
>
>我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如
>mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key update 
>b='4';
>主键重复的时候只更新字段b,字段c的值不变
>
>
>我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
>请问这种部分字段更新的场景 使用flink sql应该怎么处理?
>
>


Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-01 Thread Ye Chen
你好,upsert是全字段更新,必须指定所有的字段值。我们的需求是想根据主键更新部分字段,其余字段不变。






现有table 
CREATE TABLE t (
 abigint,
 bbigint,
 cbigint,
  PRIMARY KEY (a) NOT ENFORCED
) WITH (
...
);
我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变,例如
mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key update 
b='4';主键重复的时候只更新字段b,字段c的值不变。
我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
请问这种部分字段更新的场景 使用flink sql应该怎么处理?





在 2021-08-02 10:08:28,"silence"  写道:
>你在你的sink ddl定义了主键会自动的按主键进行upsert的
>参考https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#idempotent-writes
>
>
>------
>发件人:Ye Chen 
>发送时间:2021年7月30日(星期五) 17:57
>收件人:user-zh 
>主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
>
>现有table 
>CREATE TABLE t (
> abigint,
> bbigint,
> cbigint,
>  PRIMARY KEY (a) NOT ENFORCED
>) WITH (
>...
>);
>
>
>我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如
>mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key update 
>b='4';
>主键重复的时候只更新字段b,字段c的值不变
>
>
>我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
>请问这种部分字段更新的场景 使用flink sql应该怎么处理?
>
>


场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-07-30 Thread Ye Chen
现有table 
CREATE TABLE t (
 abigint,
 bbigint,
 cbigint,
  PRIMARY KEY (a) NOT ENFORCED
) WITH (
...
);


我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如
mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key update 
b='4';
主键重复的时候只更新字段b,字段c的值不变


我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
请问这种部分字段更新的场景 使用flink sql应该怎么处理?



Re:Re:SqlValidatorException: No match found for function signature prod()

2021-02-21 Thread Ye Chen
应该是继承scalaFunction ?

















在 2021-02-22 10:25:31,"xiaoyue" <18242988...@163.com> 写道:
>捞一下自己,在线等大佬们的回复 _(:з」∠)_
>
>
>
>
>
>
>
>在 2021-02-20 13:14:18,"xiaoyue" <18242988...@163.com> 写道:
>
>我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function 
>signature prod(),请求大佬帮忙看看_(:з」∠)_
>
>以下是代码:
>-
>...
>  stableEnv.createTemporarySystemFunction("prod", 
> ProductAggregateFunction.class);
>  Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as 
> yldrate from queryData group by pf_id");
>...
>-
>@FunctionHint(
>input = @DataTypeHint("Double"),
>output = @DataTypeHint("Double")
>)
>public class ProductAggregateFunction extends AggregateFunctionProduct> {
>
>
>@Override
>public Double getValue(Product acc) {
>return acc.prod;
>}
>@Override
>public Product createAccumulator() {
>return new Product();
>}
>public void accumulate(Product acc, Double iValue) {
>acc.prod *= iValue;
>}
>public void retract(Product acc, Double iValue) {
>acc.prod /= iValue;
>}
>public void merge(Product acc, Iterable it) {
>for (Product p : it) {
>accumulate(acc, p.prod);
>}
>}
>public void resetAccumulator(Product acc) {
>acc.prod = 1D;
>}
>}
>
>
>
>
>
> 


Re:flink生成大宽表

2021-02-21 Thread Ye Chen
可以用多流join,但是数据延迟会导致join不上,可以侧输出处理下,看业务需求

















在 2021-02-22 11:05:46,"liujian" <13597820...@qq.com> 写道:
>Hi:
>  大家好,有3张实时的表,相互关联可以形成大宽表,如何一张都会更新,那么我该如何实现流处理,我目标表放到kudu上
> 
>  我的理解:
>   直接使用jdbc-connecter将三张表读取,然后join,再写入,会不会有什么问题


Re:Flink SQL并发度问题

2021-02-20 Thread Ye Chen
并发度的设置有优先级,客户端级别小于算子级别,所以上游source算子单独设置并发度会生效,而下游仍然是客户端级别的并发度。

















在 2021-02-20 18:23:52,"guaishushu1...@163.com"  写道:
> 这几天研究了flink table 转化为stream node 的源码,发现是某个算子的并发度取决于上一个算子的并发度。
>但是在实际测试过程中发现使用window aggregate 语句时候 该算子的并发度和上游的source不一致 和我cli 命令配置的并发度一致 
>这是为什么呢?
>
>
>guaishushu1...@163.com


自定义partition,使用遇到问题,附代码

2021-02-19 Thread Ye Chen
各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
//自定义partition
public class customPartitioner extends FlinkKafkaPartitioner {
@Override
public int partition(String record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
return 0;
}
}


DataStream stream = 。。。
FlinkKafkaProducer myProducer = new FlinkKafkaProducer<>(
"test_topic",
new SimpleStringSchema(),
properties,
new customPartitioner()
);
stream.addSink(myProducer);


//上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java: 
无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
//去掉new 
customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数




查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
public FlinkKafkaProducer(
String topicId,
SerializationSchema serializationSchema,
Properties producerConfig,
Optional> customPartitioner) {
this(
topicId,
serializationSchema,
producerConfig,
customPartitioner.orElse(null),
Semantic.AT_LEAST_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}





Re:多流join的场景如何优化

2021-01-25 Thread Ye Chen
双流join或者多流join从技术上是可以实现你这个场景的,网上有很多成熟的案例。
但是要考虑具体的业务需求,比如数据是否能在规定时间到达,未到达如何处理,如果因为多流join造成数据缺失或者延迟,对业务影响比较大的话还不如继续用维表。

















在 2021-01-26 11:30:56,"hl9...@126.com"  写道:
>请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。
>
>电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段):
>market_act(营销活动): 
>{act_id:营销活动id,start_time:活动开始时间,end_time:活动结束时间,shop_id:活动的门店}
>new_member(新增会员): {member_id:新会员id,act_id:吸引会员的营销活动id,create_time:新会员生成时间}
>orders(订单):{order_no:订单号,amt:订单金额,member_id:会员id,create_time:订单时间,shop_id:下单的门店}
>
>需求:按活动统计活动期间新会员产生的订单金额
>伪sql: 
>select act_id,count(1) as order_num,sum(amt) as order_amt 
>from orders t1 
>inner join new_member t2 on t1.member_id=t2.member_id
>inner join market_act t3 on t2.act_id=t3.act_id 
>where t1.create_time between t3.start_time and t3.end_time ;
>
>目前做法:
>将 market_act 和 new_member 两个维表消息放到redis缓存,
>flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动,
>是则输出{act_id,order_no,amt,member_id},然后sink到db。
>
>我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算?
>
>
>
>hl9...@126.com


Re:Flink 并行度问题

2021-01-22 Thread Ye Chen
@jacob
hi, TaskManager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,并且 Slot 只对内存隔离,没有对 CPU 隔离。
而slot 和并行度的关系是:Slot 是指 TaskManager 最大能并发执行的能力,parallelism 是指 TaskManager 
实际使用的并发能力。
个人见解,并行度的设置一般无需考虑CPU。







在 2021-01-22 16:18:32,"Jacob" <17691150...@163.com> 写道:
>使用Flink以来,一直有一个问题困扰着。
>
>
>Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。
>
>比如Flink消费kafka
>topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。
>
>
>如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ?
>
>在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢?
>
>
>
>
>-
>Thanks!
>Jacob
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:flink消费kafka出现GC问题

2021-01-21 Thread Ye Chen
@nick  
我们之前也遇到的过GC导致任务挂掉问题,后来排查发现flink代码写的有问题,在@close方法中关闭数据库连接,但是事实上@close方法未起作用,导致资源未释放OOM。

hope this can help you














At 2021-01-22 14:18:51, "nick"  wrote:
>org.apache.flink.runtime.JobException: Recovery is suppressed by
>NoRestartBackoffTimeStrategy
>   at
>org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at
>org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at
>org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at
>org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at
>org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at
>org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at
>org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at
>org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>~[?:1.8.0_271]
>   at
>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>~[?:1.8.0_271]
>   at
>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>~[?:1.8.0_271]
>   at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_271]
>   at
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at
>org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>~[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at
>akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>   at
>akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>[flink-dist_2.11-1.12.0.jar:1.12.0]
>Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) ~[?:1.8.0_271]
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_271]
>   at
>org.apache.flink.kafka.shaded.org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
>~[?:?]
>   at
>org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
>~[?:?]
>   at