|
> |
> hyb_he...@163.com
> |
> Replied Message
> | From | 任召金 |
> | Date | 11/15/2022 09:52 |
> | To | user-zh |
> | Subject | Re: flinksql join |
> hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
>
>
> --Original
hi,你好
这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题
| |
Jason_H
|
|
hyb_he...@163.com
|
Replied Message
| From | 任召金 |
| Date | 11/15/2022 09:52 |
| To | user-zh |
| Subject | Re: flinksql join |
hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
| |
Jason_H
|
|
hyb_he...@163.com
|
Replied Message
| From | RS |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据,
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
| |
Jason
|
|
hyb_he...@163.com
|
Replied Message
| From | RS |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据,
+
"SUPER_ORG_ID string, \n" +
"IS_OUTSIDE BIGINT \n" +
") \n" +
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '***',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n&quo
+
"SUPER_ORG_ID string, \n" +
"IS_OUTSIDE BIGINT \n" +
") \n" +
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '***',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'passwor
ver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
//" 'lookup.cache.ttl' = '1s', \n" +
" 'table-name' = 'dob_dim_account' \n" +
//"
用普通的 join, 不要用 lookup join
Zhiwen Sun
On Fri, Nov 11, 2022 at 11:10 AM Jason_H wrote:
>
>
> hi,大家好
>
> 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
> kakfa输入:
> 账号 金额 笔数
> 100 1
hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
100 1 -> 未匹配
100 1 -> 未匹配
100 1 -> 匹配上
维表
账号 企业
-> 后插入的账号信息
实际输出结果
企业
和你join使用的 left table RowKind 保持一致。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
请教各位一下,我使用 FlinkSQL 编写任务时,kafka source -> MySQL sink 不设置主键,查看了一下 request
mode 是 [INSERT] ,也就是普通的 append 流,这很正常。
但是当我关联上维表后,发现 request mode 变成了 [INSERT, UPDATE_BEFORE, UPDATE_AFTER,
DELETE],这时异常报错会要求我给 sink 表设置主键,当我设置上主键后就会变成了 upsert 流。
upsert流底层实现原理是 INSERT INTO ... DUPLICATE KEY
Thanks, it works.
wangl...@geekplus.com.cn
Sender: sunfulin
Send Time: 2020-03-12 14:19
Receiver: user-zh; wanglei2
cc: jinhai.me
Subject: Re:Re: Re: flinkSQL join表的历史信息保存在哪里保存多久
这样来用:
StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max);
在 2020-03-12 14:11:31,"
ecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>StreamQueryConfig qConfig = tableEnv.queryConfig();
>
>
>
>wangl...@geekplus.com.cn
>
>
>Sender: jinhai wang
>Send Time: 2020-03-12 13:44
>
();
wangl...@geekplus.com.cn
Sender: jinhai wang
Send Time: 2020-03-12 13:44
Receiver: user-zh@flink.apache.org
Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久
应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming
应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:
两个从 kafka 创建的表:
tableA: key valueA
tableB: key
两个从 kafka 创建的表:
tableA: key valueA
tableB: key valueB
用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from
tableA join tableB on tableA.key = tableB.key;
这两个表的历史数据在 flink 中存在哪里?存多久呢?
比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
谢谢,
王磊
16 matches
Mail list logo