从kafka sink 到hbase丢失数据
Dear 开发者: 目前发现从kafka sink到 hbase 会丢数据,相同的sql ,如果用jdbc方式 来sink 则不会丢失数据,具体建表sql 和任务sql 如下 flink 版本 1.12 源表: 使用canal-json 接入 create table rt_ods.ods_za_log_member_base_info( MemberId bigint COMMENT '用户ID', NickName string COMMENT '用户昵称', proctime as PROCTIME() ) WITH ( 'properties.bootstrap.servers' = ':9092', 'connector' = 'kafka', 'format' = 'canal-json', 'topic' = 'ods_user_info', 'properties.group.id' = 'rt_ods-ods_za_log_member_base_info', 'scan.startup.mode' = 'group-offsets' ) sink 表: hbase create table rt_dwd.dwd_za_log_user_info( memberid STRING, f1 ROW < nick_name string >, PRIMARY KEY (memberid) NOT ENFORCED ) WITH ( 'sink.buffer-flush.max-size' = '0', 'connector' = 'hbase-1.4', 'zookeeper.quorum' = '10.51.3.48:2181', 'sink.buffer-flush.interval' = '3s', 'sink.buffer-flush.max-rows' = '100', 'table-name' = 'dwd_user_info', 'sink.parallelism' = '5' ) 具体sql : insert into rt_dwd.dwd_za_log_user_info select cast(t1.MemberId as VARCHAR) ,ROW( t1.NickName ) from rt_ods.ods_za_log_member_base_info t1 where t1.MemberId is not null ; 发现sink到 hbase 的时候会丢失数据,如果sink到mysql 则数据写入完整,mysql 建表语句 create table rt_dwd.dwd_za_log_user_info_mysql( memberid string COMMENT '主键ID', nick_name string COMMENT '昵称', primary key(memberid) NOT ENFORCED ) WITH ( 'password' = '', 'connector' = 'jdbc', 'sink.buffer-flush.interval' = '3s', 'sink.buffer-flush.max-rows' = '100', 'table-name' = 'dwd_user_info', 'url' = 'jdbc:mysql://x', 'username' = 'xxx' ) 补充: 数据源是百分表, 将这一百张表的binlog数据 以canal-json的格式打入kafka,然后消费并sink 到hbase 不知道是否是hbase sink的参数调整有问题,我把 'sink.buffer-flush.interval' = '3s', 'sink.buffer-flush.max-rows' = '100', 都设置成0 也无用
Flink-SQL-Client 啥时候支持GateWay Mode
Dear developer: 想问下flink-sql client 啥时候支持GateWay Mode呢? 就好像Spark 有spark thrift server ,我们可以通过jdbc方式调用 我在 Apache Flink Home / Flink Improvement Proposals 这个里面看到是有 GateWay Mode 的计划的,19年7月提的, 想问下这个的具体进度,最近几个版本会有规划吗? GateWay Mode这个模式很符合业务实际场景, 这样我们可以通过JDBC/Rest API的方式调用,提交SQL; 望各位大神可以透露下进度或者规划
Re:Re: 关于flink-sql 元数据问题
Hi , 的确tableEnv.execute 和tableEnv.executeSql 这两个方法不该一起用 现在会报另一个错,去掉tableEnv.execute 方法, 代码如下: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_RESTART, Time.of(DURING_RESTART, TimeUnit.SECONDS))); env.enableCheckpointing(CHECKPOINT_INTERVAL); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings); Catalog catalog = new HiveCatalog(CATALOG_NAME,DEFAULTDATABASE,HIVECONFDIR); tableEnv.registerCatalog(CATALOG_NAME,catalog); tableEnv.useCatalog("myhive"); tableEnv.executeSql("create table ."); No jobs included in application 目前的场景是想把flink-sql 建表的操作规范到我们自己的平台上,不想通过SQL-client 或者代码的方式提交建表,这样我们自己可以做一些类似于建表规范,元数据统一管理等一些功能;用户通过在平台上建表,然后调用flink的api来实现建表操作; 现在已经使用的是Hive-catalog,只是我们关注的是建表操作放到我们自己的平台上; 所以想采用上述代码的方式,通过平台调用,直接建表,但是现在这个应用提交是不成功的(要么报没有算子,要么报没有Jobs); 然后就算上述方式提交成功,好像也没法知道我的这个表建成功与否,只知道应用提交成功没有,不像HTTP请求有对应返回值,好像没有类似的 Rest Api的方式来做这个事情 在 2020-12-14 11:42:35,"Rui Li" 写道: >Hi, > >调用tableEnv.executeSql("create table >.")以后表就已经创建了,不需要再调用tableEnv.execute。execute方法已经deprecate,建议统一使用executeSql哈 > >On Fri, Dec 11, 2020 at 7:23 PM JasonLee <17610775...@163.com> wrote: > >> hi >> Flink SQL 建的表支持用 hive 的 catalog 来管理元数据,是否可以满足你的需求 ? >> >> >> >> - >> Best Wishes >> JasonLee >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> > > >-- >Best regards! >Rui Li
关于flink-sql 元数据问题
开发者好: 目前想把flink-sql 建表的操作集成到我们自己的平台中,但是发现一个比较尴尬的现象,如果使用Table api ,应用中只有create 语句的话,那么应用执行会报错,报没有定义算子:The main method caused an error: No operators defined in streaming topology. Cannot generate StreamGraph. 但是,这个表却创建成功了,代码如下: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_RESTART, Time.of(DURING_RESTART, TimeUnit.SECONDS))); env.enableCheckpointing(CHECKPOINT_INTERVAL); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings); Catalog catalog = new HiveCatalog(CATALOG_NAME,DEFAULTDATABASE,HIVECONFDIR); tableEnv.registerCatalog(CATALOG_NAME,catalog); tableEnv.useCatalog("myhive"); tableEnv.executeSql("create table ."); tableEnv.execute("test-job"); 加上目前flink-sql 没有show create table 功能,在hive中的元数据没法把已经建的表很好的管理,所以想集成到我们自己的元数据管理平台中 想问下各位大佬,如果只有建表操作,有没有对应的API,单独调用flink-sql 的建表操作 ,上述操作除非加个insert 操作就不报错,但是只是为了建表,加个insert也不行 各位的元数据管理都是咋实现的呢?
关于dataStream 转成kafka流表 新建的表是否可以保存到已存在的catalog里面
Hi 开发者好: 目前有此场景: 业务已经有埋点上报的数据在kafka ,现在想使用对应的kafka数据 转换成 table, 由于这个 kafka的数据格式不是json格式,所以没法直接使用 flink-sql 的kafka-connector 用sql DDL的方式建kafka 表; 现在在尝试对kafka的数据做一次 map转换,返回 json格式,然后再基于此json格式 创建 kafka 流表 但是这种做法好像无法把表的元数据持久化,即在其他session无法读取到建的这个表; 是否有对应的方法,通过对kafka流做一定转换 ,然后再转换成 可以持久化到对应 catalog 上的 kafka 表呢 或者是否可以使用 flink-sql 的kafka-connector ,然后把kafka的数据做一定解析,然后转成指定schema建表呢?元数据能保存到指定catalog上的那种(现在的kafka上的数据是字符串,使用指定的分隔符分割各个字段)
Re:Re:union all 丢失部分数据
flink 版本是1.11的版本了 在 2020-11-05 00:02:12,"hailongwang" <18868816...@163.com> 写道: >Hi liaobiao, > > >你的 flink 版本是什么呢? >根据你的 SQL,如果是版本是 <= 1.10 的话,会根据 MetaDataHander 识别出你的 group by 后面的 key 作为 upsert >key,这样就会产生覆盖的情况。 >你看下结果是否是这种情况的? > > >Best, >Hailong Wang > > > > >在 2020-11-04 17:20:23,"夜思流年梦" 写道: >>开发者好: >> 目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all ,但是实际情况发现 union >> all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了 >> 如果把union all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对 >> >> >> >> >>原sql : >> >> >>insert into dws_ >> >> >>select >>0 as id >>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as bigint) as ftime >>,case >>when dept_name like '%XX%' then 'X1' >>when dept_name = 'xXX' then 'X2' >>else 'X3' end as paytype >>,count(orderid) as paynum_h >>,round(sum(amt)) as paymoney_h >>from dwd_XXX >>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') >>group by >>DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH'), >>case >>when dept_name like '%XX%' then 'X1' >>when dept_name = 'xXX' then 'X2' >>else 'X3' end ; >> >> >> >> >>union all >> >> >> >> >>select 0 as id >>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as int) as ftime >>,'all' as paytype >>,count(orderid) as paynum_h >>,round(sum(amt)) as paymoney_h >>from dwd_XXX >>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') >>group by DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') ;
Re:回复: union all 丢失部分数据
哦,不好意思,因为我把这条SQL是分成两段了,所以在恢复原SQL的时候没有把 分号去掉 ; 当时在union的时候,中间是不会有分号的,不然也提不上去 在 2020-11-05 10:00:01,"史 正超" <792790...@qq.com> 写道: >你的union all上面,也就是insert into 的第一条select 末尾有 分号 ‘;’,, >这样的话我感觉第二条select是不会执行的。还有另一个问题是你把分号去掉,我感觉数据会被 覆盖的。因为我最近也有使用union all的场景。 >我觉得你应该这样组装 你的sql : >```sql > >Insert into xxx >Select > d1, > d2, > count(1) >From ( > Select * from a > Union all > Select * from b, >) >Group by d1, d2 > >``` > >发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 > >发件人: 夜思流年梦<mailto:liaobiao...@163.com> >发送时间: 2020年11月4日 18:21 >收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org> >主题: union all 丢失部分数据 > >开发者好: > 目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all ,但是实际情况发现 union > all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了 > 如果把union all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对 > > > > >原sql : > > >insert into dws_ > > >select >0 as id >,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as bigint) as ftime >,case >when dept_name like '%XX%' then 'X1' >when dept_name = 'xXX' then 'X2' >else 'X3' end as paytype >,count(orderid) as paynum_h >,round(sum(amt)) as paymoney_h >from dwd_XXX >where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') >group by >DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH'), >case >when dept_name like '%XX%' then 'X1' >when dept_name = 'xXX' then 'X2' >else 'X3' end ; > > > > >union all > > > > >select 0 as id >,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as int) as ftime >,'all' as paytype >,count(orderid) as paynum_h >,round(sum(amt)) as paymoney_h >from dwd_XXX >where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') >group by DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') ; >
TUMBLE函数不支持 回撤流
这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性; 原sql select 0 as id , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then memberid else NULL end) as paynum_h ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then real_product else 0 end)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') group by TUMBLE(proctime ,interval '1' HOUR); 报错: org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan 发现把kafka建表语句改成 json格式就可以 数据源不是flink-mysql-cdc得来的 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XX', 'topic' = 'ODS_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'canal-json'); 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, 建kafka表的格式,使用的changelog-json: WITH ( 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XXX', 'topic' = 'DWD_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'changelog-json'); 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道: >Hi, >能贴一下完整的sql吗,数据源是CDC的数据吗? > >> 2020年10月30日 下午2:48,夜思流年梦 写道: >> >> 开发者你好: >> 现有此场景: >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 >> select >> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime >> >>> ,sum(amt) as paymoney_h >> >>> from >> >>> group by TUMBLE(write_time,interval '1' HOUR); >> >> >> 报错: >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't >> support consuming update and delete changes which is produced by node >> TableSourceScan >> >> >> >> >> 发现把kafka建表语句改成 json格式就可以 >> >> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 >> >> >> >> >> >> >> >> >>
Re:Re: TUMBLE函数不支持 回撤流
原sql select 0 as id , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then memberid else NULL end) as paynum_h ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then real_product else 0 end)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') group by TUMBLE(proctime ,interval '1' HOUR); 数据源不是flink-mysql-cdc得来的 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XX', 'topic' = 'ODS_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'canal-json'); 上面这个dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, 建kafka表的格式,使用的changelog-json: WITH ( 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XXX', 'topic' = 'DWD_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'changelog-json'); 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道: >Hi, >能贴一下完整的sql吗,数据源是CDC的数据吗? > >> 2020年10月30日 下午2:48,夜思流年梦 写道: >> >> 开发者你好: >> 现有此场景: >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 >> select >> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime >> >>> ,sum(amt) as paymoney_h >> >>> from >> >>> group by TUMBLE(write_time,interval '1' HOUR); >> >> >> 报错: >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't >> support consuming update and delete changes which is produced by node >> TableSourceScan >> >> >> >> >> 发现把kafka建表语句改成 json格式就可以 >> >> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 >> >> >> >> >> >> >> >> >>
TUMBLE函数不支持 回撤流
开发者你好: 现有此场景: 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 select > HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime > ,sum(amt) as paymoney_h > from > group by TUMBLE(write_time,interval '1' HOUR); 报错: org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan 发现把kafka建表语句改成 json格式就可以 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
Re:Re: 关于flink-sql 维表join问题
批处理的确是可以解决这类问题,只不过不是实时的了,主要是想使用flink-sql解决这类报表问题; 另外问一句, flink-sql 有打算解决这类问题吗?我感觉这个场景还挺多的呢 维表 left join 一张流表, 维表全量数据关联流表,既能获取到实时流表的统计数据,又能保证维表的数据是一个实时更新(或者定期更新)的状态 在 2020-10-27 17:24:05,"Jark Wu" 写道: >我觉得这个更像是一个周期性调度的批处理需求。因为你流处理,只能一直读取员工表的增量,没法每天读个全量。 >是不是用 flink batch + 调度更好一点呢? > >Best, >Jark > >On Tue, 27 Oct 2020 at 16:08, 夜思流年梦 wrote: > >> 目前在准备搞实时数仓:碰到一个问题: >> 比如统计一个所有员工所有的业绩的报表,这个报表需要关联1个员工维表,4个业绩相关流表; >> 如果是正常SQL的话是这样join : >> >> >> 维表 left join 流表 1 >> left join 流表 2 >> left join 流表 3 >> left join 流表 4 >> >> >> 因为flink-sql 的temporal join 不支持 维表在左边 left join 流表, >> >> >> 故只能 流表在左,维表在右来join >> 即:select * from table a left join dim_XXX FOR SYSTEM_TIME AS OF >> a.proctime as c on a.memberId=c.rowkey >> >> >> 但是这个存在的问题是那些今天没有业绩的员工就没有统计数据,如果只是join一张流表,那么我可以把没有数据的员工在出报表时补空数据,现在的情况是要join4 >> 张流表,那么必须得四张流表都有数据的员工才会有数据,这个就是问题了:最终的报表只有4个流表都有数据的员工。 >> >> >> 上次问过一次,上次回答的是双流join,双流join的问题也是一样,只有两者都有数据才会得到最终结果,更何况是员工维表,基本上变化很少。因为有点找不到上次那个邮件了,所以再问一下,这种场景(维表在左 >> left join 流表)有没有比较好的解决方案 >> >> >> >> >> >> >> >>
关于flink-sql 维表join问题
目前在准备搞实时数仓:碰到一个问题: 比如统计一个所有员工所有的业绩的报表,这个报表需要关联1个员工维表,4个业绩相关流表; 如果是正常SQL的话是这样join : 维表 left join 流表 1 left join 流表 2 left join 流表 3 left join 流表 4 因为flink-sql 的temporal join 不支持 维表在左边 left join 流表, 故只能 流表在左,维表在右来join 即:select * from table a left join dim_XXX FOR SYSTEM_TIME AS OF a.proctime as c on a.memberId=c.rowkey 但是这个存在的问题是那些今天没有业绩的员工就没有统计数据,如果只是join一张流表,那么我可以把没有数据的员工在出报表时补空数据,现在的情况是要join4 张流表,那么必须得四张流表都有数据的员工才会有数据,这个就是问题了:最终的报表只有4个流表都有数据的员工。 上次问过一次,上次回答的是双流join,双流join的问题也是一样,只有两者都有数据才会得到最终结果,更何况是员工维表,基本上变化很少。因为有点找不到上次那个邮件了,所以再问一下,这种场景(维表在左 left join 流表)有没有比较好的解决方案
flinksql 不支持 % 运算
flink 版本1.11 目前flink-sql 好像不支持取余运算,会报错: 比如:SELECT * FROM Orders WHERE a % 2 = 0 Percent remainder '%' is not allowed under the current SQL conformance level 看了下flink 的issue ,已经有人碰到过了,说是要1.12版本修复 想问下:如果再1.11版本,flink-sql 要怎么操作才能支持 % 运算呢? 可以通过修改配置文件来实现么?比如flink-conf.yaml
关于flink-sql count/sum 数据如何每天重新计算
现有此场景: 计算每天员工的业绩(只计算当天的) 现在我用flink-sql 的方式,insert into select current_date, count(1) ,worker from XX where writeTime>=current_date group by worker; 把数据按天分区的方式先把数据sink到mysql 但是发现落地到mysql的数据把前几天的数据都给算进来了,如何只算今天的数据? 另外还有一个疑惑,如何既计算当天数据,又包含了本月的所有数据?
Re:Re: 关于flink-sql Join Temporal Tables join 维表问题
你好,我最开始也考虑用双流join,但是双流join 就会碰到一个问题,就是结果集只会包含今天有订单的员工数据,那么没有订单的员工数据是不会体现到结果集的。主要是需要所有员工今天的订单数量; 在 2020-10-12 15:37:51,"Jark Wu" 写道: >我理解楼主的场景不是 temporal join 的场景,而是双流 join >的场景,因为任何一条流的变化,都希望触发对结果的更新,所以讲员工作为右边维度表是不行的。 > >如果是我理解的这样的话,你可以用 flink-cdc-connectors [1] 去对接员工和订单两个 binlog 流,然后直接 >join,然后聚合订单数。伪代码如下: > >create table users ( > user_id bigint, > ... >) with ( > connector = mysql-cdc > ... >); > >create table orders ( > order_id bigint, > user_id bigint, > ... >) with ( > connector = mysql-cdc > ... >); > >select user_id, count(*) as order_num >from (select * from users left join orders on users.user_id = >orders.user_id) >group by user_id; > > >[1]: https://github.com/ververica/flink-cdc-connectors > >On Mon, 12 Oct 2020 at 15:17, caozhen wrote: > >> >> 我理解这个场景下 员工维表在右边没啥问题。 >> >> join过程中需要去员工维表拿哪些字段? >> >> >> >> 夜思流年梦 wrote >> > 现在有一个场景: 一个员工维表,一个订单表(监听mysql binlog的一个流表),想实时计算出所有员工的订单数; >> > 目前flink-sql 支持Join Temporal Tables ,但是官方文档上是这么说的:仅支持带有处理时间的 temporal >> > tables 的 inner 和 left join。 >> > 而这个场景必须是维表在左边,但实际情况是维表在左边无法进行left join :会报错:ClassCastException: >> > org.apache.calcite.rel.logical.LogicalProject cannot be cast to >> > org.apache.calcite.rel.core.TableScan;(如果是流表在左边,然后用temporal join >> > 一个维表那么没有问题,即:left join 维表 FOR SYSTEM_TIME AS OF proctime) >> > 想问下各位,碰到这类情况,大家是怎么处理的 >> >> >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/
Re:Re: 关于flink-sql Join Temporal Tables join 维表问题
是这样子的,比如要统计所有员工的今天的订单数量,如果是订单表left join 员工表的话,那么今天没有订单数量的就无法出现在结果表集合中; 把员工表放在左边left join 订单表的话那么就是所有员工的今天订单数量都会 出现 在 2020-10-12 15:17:07,"caozhen" 写道: > >我理解这个场景下 员工维表在右边没啥问题。 > >join过程中需要去员工维表拿哪些字段? > > > >夜思流年梦 wrote >> 现在有一个场景: 一个员工维表,一个订单表(监听mysql binlog的一个流表),想实时计算出所有员工的订单数; >> 目前flink-sql 支持Join Temporal Tables ,但是官方文档上是这么说的:仅支持带有处理时间的 temporal >> tables 的 inner 和 left join。 >> 而这个场景必须是维表在左边,但实际情况是维表在左边无法进行left join :会报错:ClassCastException: >> org.apache.calcite.rel.logical.LogicalProject cannot be cast to >> org.apache.calcite.rel.core.TableScan;(如果是流表在左边,然后用temporal join >> 一个维表那么没有问题,即:left join 维表 FOR SYSTEM_TIME AS OF proctime) >> 想问下各位,碰到这类情况,大家是怎么处理的 > > > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/
关于flink-sql Join Temporal Tables join 维表问题
现在有一个场景: 一个员工维表,一个订单表(监听mysql binlog的一个流表),想实时计算出所有员工的订单数; 目前flink-sql 支持Join Temporal Tables ,但是官方文档上是这么说的:仅支持带有处理时间的 temporal tables 的 inner 和 left join。 而这个场景必须是维表在左边,但实际情况是维表在左边无法进行left join :会报错:ClassCastException: org.apache.calcite.rel.logical.LogicalProject cannot be cast to org.apache.calcite.rel.core.TableScan;(如果是流表在左边,然后用temporal join 一个维表那么没有问题,即:left join 维表 FOR SYSTEM_TIME AS OF proctime) 想问下各位,碰到这类情况,大家是怎么处理的