hi,你好
这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题


| |
Jason_H
|
|
hyb_he...@163.com
|
---- Replied Message ----
| From | 任召金<renzhao...@100.me> |
| Date | 11/15/2022 09:52 |
| To | user-zh<user-zh@flink.apache.org> |
| Subject | Re: flinksql join |
hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
&nbsp;
&nbsp;
------------------&nbsp;Original&nbsp;------------------
From: &nbsp;"Jason_H"<hyb_he...@163.com&gt;;
Date: &nbsp;Tue, Nov 15, 2022 09:46 AM
To: &nbsp;"flink中文邮件组"<user-zh@flink.apache.org&gt;;

Subject: &nbsp;Re: flinksql join

&nbsp;

hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。


| |
Jason_H
|
|
hyb_he...@163.com
|
---- Replied Message ----
| From | RS<tinyshr...@163.com&gt; |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org<user-zh@flink.apache.org&gt; |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,


Thanks






在 2022-11-11 11:10:03,"Jason_H" <hyb_he...@163.com&gt; 写道:


hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1&nbsp; -&gt; 未匹配
1111 100 1&nbsp; -&gt; 未匹配
1111 100 1&nbsp; -&gt; 匹配上

维表
账号&nbsp; 企业
2222&nbsp; BBBB
1111&nbsp; AAAA&nbsp;&nbsp; -&gt; 后插入的账号信息
实际输出结果
企业&nbsp; 金额&nbsp; 笔数
AAAA 100&nbsp;&nbsp; 1


我想要的结果:
企业&nbsp; 金额&nbsp; 笔数
AAAA 300&nbsp;&nbsp; 3





sql如下:
String sql2 =&nbsp; "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"&nbsp; ta.gmtStatistical as gmtStatistical,\n" +
"&nbsp; ta.paymentMethod as paymentMethod,\n" +
"&nbsp; tb.CORP_ID as outCorpId,\n" +
"&nbsp; tc.CORP_ID as inCorpId,\n" +
"&nbsp; sum(ta.tradeAmt) as tranAmount,\n" +
"&nbsp; sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as tb on 
ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode 
= tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"&nbsp;&nbsp; DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, 
\n" +
"&nbsp;&nbsp; gmtStatistical, \n" +
"&nbsp;&nbsp; paymentMethod, \n" +
"&nbsp;&nbsp; outCorpId, \n" +
"&nbsp;&nbsp; inCorpId, \n" +
"&nbsp;&nbsp; tranAmount, \n" +
"&nbsp;&nbsp; tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_he...@163.com
|

回复