从kafka sink 到hbase丢失数据

2021-04-07 文章
Dear 开发者:


目前发现从kafka sink到 hbase 会丢数据,相同的sql ,如果用jdbc方式 来sink 则不会丢失数据,具体建表sql  和任务sql 如下
flink 版本 1.12


源表: 使用canal-json 接入
create table rt_ods.ods_za_log_member_base_info(
MemberId bigint COMMENT '用户ID',
NickName string COMMENT '用户昵称',
proctime as PROCTIME()
) WITH (
'properties.bootstrap.servers' = ':9092',
'connector' = 'kafka',
'format' = 'canal-json',
'topic' = 'ods_user_info',
'properties.group.id' = 'rt_ods-ods_za_log_member_base_info',
'scan.startup.mode' = 'group-offsets'
)


sink 表: hbase 
create table rt_dwd.dwd_za_log_user_info(
memberid STRING,
f1 ROW < 
nick_name string >,
PRIMARY KEY (memberid) NOT ENFORCED
) WITH (
'sink.buffer-flush.max-size' = '0',
'connector' = 'hbase-1.4',
'zookeeper.quorum' = '10.51.3.48:2181',
'sink.buffer-flush.interval' = '3s',
'sink.buffer-flush.max-rows' = '100',
'table-name' = 'dwd_user_info',
'sink.parallelism' = '5'
)


具体sql :
insert into rt_dwd.dwd_za_log_user_info
select
  cast(t1.MemberId as VARCHAR)
  ,ROW(
  t1.NickName
  )
  from rt_ods.ods_za_log_member_base_info t1
 where t1.MemberId is not null 
;


发现sink到 hbase 的时候会丢失数据,如果sink到mysql 则数据写入完整,mysql 建表语句
create table rt_dwd.dwd_za_log_user_info_mysql(
memberid string COMMENT '主键ID',
nick_name string COMMENT '昵称',
primary key(memberid) NOT ENFORCED
) WITH (
'password' = '',
'connector' = 'jdbc',
'sink.buffer-flush.interval' = '3s',
'sink.buffer-flush.max-rows' = '100',
'table-name' = 'dwd_user_info',
'url' = 'jdbc:mysql://x',
'username' = 'xxx'
)


补充:
数据源是百分表, 将这一百张表的binlog数据 以canal-json的格式打入kafka,然后消费并sink 到hbase
不知道是否是hbase sink的参数调整有问题,我把   
 'sink.buffer-flush.interval' = '3s',
'sink.buffer-flush.max-rows' = '100', 都设置成0 也无用



Flink-SQL-Client 啥时候支持GateWay Mode

2020-12-18 文章
Dear developer:


想问下flink-sql client 啥时候支持GateWay Mode呢?


就好像Spark 有spark thrift server ,我们可以通过jdbc方式调用


我在  Apache Flink Home / Flink Improvement Proposals 这个里面看到是有 GateWay Mode 
的计划的,19年7月提的,


想问下这个的具体进度,最近几个版本会有规划吗?


GateWay Mode这个模式很符合业务实际场景, 这样我们可以通过JDBC/Rest API的方式调用,提交SQL;


望各位大神可以透露下进度或者规划

Re:Re: 关于flink-sql 元数据问题

2020-12-14 文章






Hi ,
 的确tableEnv.execute 和tableEnv.executeSql 这两个方法不该一起用
 现在会报另一个错,去掉tableEnv.execute 方法,
代码如下:


final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_RESTART, 
Time.of(DURING_RESTART, TimeUnit.SECONDS)));
env.enableCheckpointing(CHECKPOINT_INTERVAL);
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env,bsSettings);
Catalog catalog = new 
HiveCatalog(CATALOG_NAME,DEFAULTDATABASE,HIVECONFDIR);
tableEnv.registerCatalog(CATALOG_NAME,catalog);
tableEnv.useCatalog("myhive");
tableEnv.executeSql("create table .");


No jobs included in application


目前的场景是想把flink-sql 建表的操作规范到我们自己的平台上,不想通过SQL-client 
或者代码的方式提交建表,这样我们自己可以做一些类似于建表规范,元数据统一管理等一些功能;用户通过在平台上建表,然后调用flink的api来实现建表操作; 
现在已经使用的是Hive-catalog,只是我们关注的是建表操作放到我们自己的平台上;


所以想采用上述代码的方式,通过平台调用,直接建表,但是现在这个应用提交是不成功的(要么报没有算子,要么报没有Jobs);
然后就算上述方式提交成功,好像也没法知道我的这个表建成功与否,只知道应用提交成功没有,不像HTTP请求有对应返回值,好像没有类似的 Rest 
Api的方式来做这个事情













在 2020-12-14 11:42:35,"Rui Li"  写道:
>Hi,
>
>调用tableEnv.executeSql("create table
>.")以后表就已经创建了,不需要再调用tableEnv.execute。execute方法已经deprecate,建议统一使用executeSql哈
>
>On Fri, Dec 11, 2020 at 7:23 PM JasonLee <17610775...@163.com> wrote:
>
>> hi
>> Flink SQL 建的表支持用 hive 的 catalog 来管理元数据,是否可以满足你的需求 ?
>>
>>
>>
>> -
>> Best Wishes
>> JasonLee
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>
>
>-- 
>Best regards!
>Rui Li


关于flink-sql 元数据问题

2020-12-11 文章
开发者好:
   目前想把flink-sql 建表的操作集成到我们自己的平台中,但是发现一个比较尴尬的现象,如果使用Table api 
,应用中只有create 语句的话,那么应用执行会报错,报没有定义算子:The main method caused an error: No 
operators defined in streaming topology. Cannot generate StreamGraph. 
但是,这个表却创建成功了,代码如下:


final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_RESTART, 
Time.of(DURING_RESTART, TimeUnit.SECONDS)));
env.enableCheckpointing(CHECKPOINT_INTERVAL);
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env,bsSettings);
Catalog catalog = new 
HiveCatalog(CATALOG_NAME,DEFAULTDATABASE,HIVECONFDIR);
tableEnv.registerCatalog(CATALOG_NAME,catalog);
tableEnv.useCatalog("myhive");
tableEnv.executeSql("create table .");
tableEnv.execute("test-job");


加上目前flink-sql 没有show create table 功能,在hive中的元数据没法把已经建的表很好的管理,所以想集成到我们自己的元数据管理平台中


想问下各位大佬,如果只有建表操作,有没有对应的API,单独调用flink-sql 的建表操作 ,上述操作除非加个insert 
操作就不报错,但是只是为了建表,加个insert也不行
各位的元数据管理都是咋实现的呢?



关于dataStream 转成kafka流表 新建的表是否可以保存到已存在的catalog里面

2020-12-07 文章
Hi 开发者好:
目前有此场景:
业务已经有埋点上报的数据在kafka ,现在想使用对应的kafka数据 转换成 table, 由于这个 kafka的数据格式不是json格式,所以没法直接使用 
flink-sql 的kafka-connector 用sql DDL的方式建kafka 表;


现在在尝试对kafka的数据做一次 map转换,返回 json格式,然后再基于此json格式 创建 kafka 流表


但是这种做法好像无法把表的元数据持久化,即在其他session无法读取到建的这个表;
是否有对应的方法,通过对kafka流做一定转换 ,然后再转换成 可以持久化到对应 catalog 上的 kafka 表呢


或者是否可以使用 flink-sql 的kafka-connector  
,然后把kafka的数据做一定解析,然后转成指定schema建表呢?元数据能保存到指定catalog上的那种(现在的kafka上的数据是字符串,使用指定的分隔符分割各个字段)



Re:Re:union all 丢失部分数据

2020-11-04 文章









flink 版本是1.11的版本了








在 2020-11-05 00:02:12,"hailongwang" <18868816...@163.com> 写道:
>Hi liaobiao,
>
>
>你的 flink 版本是什么呢?
>根据你的 SQL,如果是版本是 <= 1.10 的话,会根据 MetaDataHander 识别出你的 group by 后面的 key 作为 upsert 
>key,这样就会产生覆盖的情况。
>你看下结果是否是这种情况的?
>
>
>Best,
>Hailong Wang
>
>
>
>
>在 2020-11-04 17:20:23,"夜思流年梦"  写道:
>>开发者好:
>>   目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all  ,但是实际情况发现 union 
>> all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了
>>   如果把union  all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对
>>
>>
>>
>>
>>原sql :
>>
>>
>>insert into dws_
>>
>>
>>select 
>>0 as id
>>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as bigint) as ftime
>>,case 
>>when dept_name like '%XX%' then 'X1'
>>when dept_name = 'xXX' then 'X2'
>>else 'X3' end as paytype
>>,count(orderid) as paynum_h 
>>,round(sum(amt)) as paymoney_h 
>>from dwd_XXX
>>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>>group by
>>DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH'), 
>>case 
>>when dept_name like '%XX%' then 'X1'
>>when dept_name = 'xXX' then 'X2'
>>else 'X3' end ;
>>
>>
>>
>>
>>union all
>>
>>
>>
>>
>>select 0 as id
>>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as int) as ftime
>>,'all' as paytype
>>,count(orderid) as paynum_h  
>>,round(sum(amt)) as paymoney_h  
>>from dwd_XXX
>>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>>group by DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') ;


Re:回复: union all 丢失部分数据

2020-11-04 文章






哦,不好意思,因为我把这条SQL是分成两段了,所以在恢复原SQL的时候没有把 分号去掉 ;


当时在union的时候,中间是不会有分号的,不然也提不上去











在 2020-11-05 10:00:01,"史 正超" <792790...@qq.com> 写道:
>你的union all上面,也就是insert into 的第一条select 末尾有 分号 ‘;’,, 
>这样的话我感觉第二条select是不会执行的。还有另一个问题是你把分号去掉,我感觉数据会被 覆盖的。因为我最近也有使用union all的场景。
>我觉得你应该这样组装 你的sql :
>```sql
>
>Insert into xxx
>Select
>   d1,
>   d2,
>   count(1)
>From (
>   Select * from a
>   Union all
>   Select * from b,
>)
>Group by d1, d2
>
>```
>
>发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>发件人: 夜思流年梦<mailto:liaobiao...@163.com>
>发送时间: 2020年11月4日 18:21
>收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
>主题: union all 丢失部分数据
>
>开发者好:
>   目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all  ,但是实际情况发现 union 
> all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了
>   如果把union  all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对
>
>
>
>
>原sql :
>
>
>insert into dws_
>
>
>select
>0 as id
>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as bigint) as ftime
>,case
>when dept_name like '%XX%' then 'X1'
>when dept_name = 'xXX' then 'X2'
>else 'X3' end as paytype
>,count(orderid) as paynum_h
>,round(sum(amt)) as paymoney_h
>from dwd_XXX
>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>group by
>DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH'),
>case
>when dept_name like '%XX%' then 'X1'
>when dept_name = 'xXX' then 'X2'
>else 'X3' end ;
>
>
>
>
>union all
>
>
>
>
>select 0 as id
>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as int) as ftime
>,'all' as paytype
>,count(orderid) as paynum_h
>,round(sum(amt)) as paymoney_h
>from dwd_XXX
>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>group by DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') ;
>


TUMBLE函数不支持 回撤流

2020-11-03 文章



这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;







原sql 

select 0 as id

, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime

,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then 
memberid else NULL end) as paynum_h 

,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')  
then real_product else 0 end)) as paymoney_h  

from dwd_XXX

where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')

group by TUMBLE(proctime ,interval '1' HOUR);


报错:
 org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
support consuming update and delete changes which is produced by node 
TableSourceScan
发现把kafka建表语句改成 json格式就可以


数据源不是flink-mysql-cdc得来的


是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),


 'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XX',
  'topic' = 'ODS_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'canal-json');


上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
建kafka表的格式,使用的changelog-json:


WITH (
  'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XXX',
  'topic' = 'DWD_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json');  











在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
>Hi,
>能贴一下完整的sql吗,数据源是CDC的数据吗?
>
>> 2020年10月30日 下午2:48,夜思流年梦  写道:
>> 
>> 开发者你好:
>> 现有此场景:
>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>> select 
>> 
>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>> 
>>> ,sum(amt) as paymoney_h  
>> 
>>> from 
>> 
>>> group by TUMBLE(write_time,interval '1' HOUR);
>> 
>> 
>> 报错:
>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
>> support consuming update and delete changes which is produced by node 
>> TableSourceScan
>> 
>> 
>> 
>> 
>> 发现把kafka建表语句改成 json格式就可以
>> 
>> 
>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 





 

Re:Re: TUMBLE函数不支持 回撤流

2020-10-30 文章
原sql

select 0 as id

, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime

,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then 
memberid else NULL end) as paynum_h 

,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')  
then real_product else 0 end)) as paymoney_h  

from dwd_XXX

where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')

group by TUMBLE(proctime ,interval '1' HOUR);


数据源不是flink-mysql-cdc得来的


是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),


 'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XX',
  'topic' = 'ODS_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'canal-json');


上面这个dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
建kafka表的格式,使用的changelog-json:


WITH (
  'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XXX',
  'topic' = 'DWD_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json');  











在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
>Hi,
>能贴一下完整的sql吗,数据源是CDC的数据吗?
>
>> 2020年10月30日 下午2:48,夜思流年梦  写道:
>> 
>> 开发者你好:
>> 现有此场景:
>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>> select 
>> 
>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>> 
>>> ,sum(amt) as paymoney_h  
>> 
>>> from 
>> 
>>> group by TUMBLE(write_time,interval '1' HOUR);
>> 
>> 
>> 报错:
>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
>> support consuming update and delete changes which is produced by node 
>> TableSourceScan
>> 
>> 
>> 
>> 
>> 发现把kafka建表语句改成 json格式就可以
>> 
>> 
>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 


TUMBLE函数不支持 回撤流

2020-10-30 文章
开发者你好:
现有此场景:
求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
select 

> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime

> ,sum(amt) as paymoney_h  

> from 

> group by TUMBLE(write_time,interval '1' HOUR);


报错:
org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support 
consuming update and delete changes which is produced by node TableSourceScan




发现把kafka建表语句改成 json格式就可以


现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊











Re:Re: 关于flink-sql 维表join问题

2020-10-28 文章
批处理的确是可以解决这类问题,只不过不是实时的了,主要是想使用flink-sql解决这类报表问题;
另外问一句, flink-sql 有打算解决这类问题吗?我感觉这个场景还挺多的呢
维表 left join 一张流表, 维表全量数据关联流表,既能获取到实时流表的统计数据,又能保证维表的数据是一个实时更新(或者定期更新)的状态

















在 2020-10-27 17:24:05,"Jark Wu"  写道:
>我觉得这个更像是一个周期性调度的批处理需求。因为你流处理,只能一直读取员工表的增量,没法每天读个全量。
>是不是用 flink batch + 调度更好一点呢?
>
>Best,
>Jark
>
>On Tue, 27 Oct 2020 at 16:08, 夜思流年梦  wrote:
>
>> 目前在准备搞实时数仓:碰到一个问题:
>> 比如统计一个所有员工所有的业绩的报表,这个报表需要关联1个员工维表,4个业绩相关流表;
>> 如果是正常SQL的话是这样join :
>>
>>
>> 维表 left join  流表  1
>> left join 流表 2
>> left join 流表 3
>> left join 流表 4
>>
>>
>> 因为flink-sql 的temporal join 不支持 维表在左边 left join 流表,
>>
>>
>> 故只能 流表在左,维表在右来join
>> 即:select  * from  table a  left join dim_XXX  FOR SYSTEM_TIME AS OF
>> a.proctime as c on a.memberId=c.rowkey
>>
>>
>> 但是这个存在的问题是那些今天没有业绩的员工就没有统计数据,如果只是join一张流表,那么我可以把没有数据的员工在出报表时补空数据,现在的情况是要join4
>> 张流表,那么必须得四张流表都有数据的员工才会有数据,这个就是问题了:最终的报表只有4个流表都有数据的员工。
>>
>>
>> 上次问过一次,上次回答的是双流join,双流join的问题也是一样,只有两者都有数据才会得到最终结果,更何况是员工维表,基本上变化很少。因为有点找不到上次那个邮件了,所以再问一下,这种场景(维表在左
>> left join 流表)有没有比较好的解决方案
>>
>>
>>
>>
>>
>>
>>
>>


关于flink-sql 维表join问题

2020-10-27 文章
目前在准备搞实时数仓:碰到一个问题:
比如统计一个所有员工所有的业绩的报表,这个报表需要关联1个员工维表,4个业绩相关流表; 
如果是正常SQL的话是这样join :


维表 left join  流表  1
left join 流表 2
left join 流表 3
left join 流表 4


因为flink-sql 的temporal join 不支持 维表在左边 left join 流表,


故只能 流表在左,维表在右来join  
即:select  * from  table a  left join dim_XXX  FOR SYSTEM_TIME AS OF a.proctime 
as c on a.memberId=c.rowkey 


但是这个存在的问题是那些今天没有业绩的员工就没有统计数据,如果只是join一张流表,那么我可以把没有数据的员工在出报表时补空数据,现在的情况是要join4 
张流表,那么必须得四张流表都有数据的员工才会有数据,这个就是问题了:最终的报表只有4个流表都有数据的员工。


上次问过一次,上次回答的是双流join,双流join的问题也是一样,只有两者都有数据才会得到最终结果,更何况是员工维表,基本上变化很少。因为有点找不到上次那个邮件了,所以再问一下,这种场景(维表在左
 left join 流表)有没有比较好的解决方案









flinksql 不支持 % 运算

2020-10-26 文章
flink 版本1.11
目前flink-sql 好像不支持取余运算,会报错:
比如:SELECT * FROM Orders WHERE a % 2 = 0
Percent remainder '%' is not allowed under the current SQL conformance level


看了下flink 的issue ,已经有人碰到过了,说是要1.12版本修复




想问下:如果再1.11版本,flink-sql 要怎么操作才能支持 % 运算呢? 可以通过修改配置文件来实现么?比如flink-conf.yaml

关于flink-sql count/sum 数据如何每天重新计算

2020-10-19 文章
现有此场景:
计算每天员工的业绩(只计算当天的)


现在我用flink-sql 的方式,insert into  select current_date, count(1) ,worker from XX  
where writeTime>=current_date  group by worker;  
把数据按天分区的方式先把数据sink到mysql


但是发现落地到mysql的数据把前几天的数据都给算进来了,如何只算今天的数据?
另外还有一个疑惑,如何既计算当天数据,又包含了本月的所有数据?



Re:Re: 关于flink-sql Join Temporal Tables join 维表问题

2020-10-12 文章



你好,我最开始也考虑用双流join,但是双流join 
就会碰到一个问题,就是结果集只会包含今天有订单的员工数据,那么没有订单的员工数据是不会体现到结果集的。主要是需要所有员工今天的订单数量;














在 2020-10-12 15:37:51,"Jark Wu"  写道:
>我理解楼主的场景不是 temporal join 的场景,而是双流 join
>的场景,因为任何一条流的变化,都希望触发对结果的更新,所以讲员工作为右边维度表是不行的。
>
>如果是我理解的这样的话,你可以用 flink-cdc-connectors [1] 去对接员工和订单两个 binlog 流,然后直接
>join,然后聚合订单数。伪代码如下:
>
>create table users (
>  user_id bigint,
>  ...
>) with (
>  connector = mysql-cdc
>  ...
>);
>
>create table orders (
>  order_id bigint,
>  user_id bigint,
>  ...
>) with (
>  connector = mysql-cdc
>  ...
>);
>
>select user_id, count(*) as order_num
>from (select * from users left join orders on users.user_id =
>orders.user_id)
>group by user_id;
>
>
>[1]: https://github.com/ververica/flink-cdc-connectors
>
>On Mon, 12 Oct 2020 at 15:17, caozhen  wrote:
>
>>
>> 我理解这个场景下  员工维表在右边没啥问题。
>>
>> join过程中需要去员工维表拿哪些字段?
>>
>>
>>
>> 夜思流年梦 wrote
>> > 现在有一个场景: 一个员工维表,一个订单表(监听mysql binlog的一个流表),想实时计算出所有员工的订单数;
>> > 目前flink-sql 支持Join Temporal Tables  ,但是官方文档上是这么说的:仅支持带有处理时间的 temporal
>> > tables 的 inner 和 left join。
>> > 而这个场景必须是维表在左边,但实际情况是维表在左边无法进行left join :会报错:ClassCastException:
>> > org.apache.calcite.rel.logical.LogicalProject cannot be cast to
>> > org.apache.calcite.rel.core.TableScan;(如果是流表在左边,然后用temporal join
>> > 一个维表那么没有问题,即:left join 维表 FOR SYSTEM_TIME AS OF proctime)
>> > 想问下各位,碰到这类情况,大家是怎么处理的
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: 关于flink-sql Join Temporal Tables join 维表问题

2020-10-12 文章
是这样子的,比如要统计所有员工的今天的订单数量,如果是订单表left join 员工表的话,那么今天没有订单数量的就无法出现在结果表集合中;
把员工表放在左边left join 订单表的话那么就是所有员工的今天订单数量都会 出现

















在 2020-10-12 15:17:07,"caozhen"  写道:
>
>我理解这个场景下  员工维表在右边没啥问题。
>
>join过程中需要去员工维表拿哪些字段?
>
>
>
>夜思流年梦 wrote
>> 现在有一个场景: 一个员工维表,一个订单表(监听mysql binlog的一个流表),想实时计算出所有员工的订单数;
>> 目前flink-sql 支持Join Temporal Tables  ,但是官方文档上是这么说的:仅支持带有处理时间的 temporal
>> tables 的 inner 和 left join。
>> 而这个场景必须是维表在左边,但实际情况是维表在左边无法进行left join :会报错:ClassCastException:
>> org.apache.calcite.rel.logical.LogicalProject cannot be cast to
>> org.apache.calcite.rel.core.TableScan;(如果是流表在左边,然后用temporal join
>> 一个维表那么没有问题,即:left join 维表 FOR SYSTEM_TIME AS OF proctime)
>> 想问下各位,碰到这类情况,大家是怎么处理的
>
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


关于flink-sql Join Temporal Tables join 维表问题

2020-10-11 文章
现在有一个场景: 一个员工维表,一个订单表(监听mysql binlog的一个流表),想实时计算出所有员工的订单数;
目前flink-sql 支持Join Temporal Tables  ,但是官方文档上是这么说的:仅支持带有处理时间的 temporal tables 的 
inner 和 left join。
而这个场景必须是维表在左边,但实际情况是维表在左边无法进行left join :会报错:ClassCastException: 
org.apache.calcite.rel.logical.LogicalProject cannot be cast to 
org.apache.calcite.rel.core.TableScan;(如果是流表在左边,然后用temporal join 
一个维表那么没有问题,即:left join 维表 FOR SYSTEM_TIME AS OF proctime)
想问下各位,碰到这类情况,大家是怎么处理的