dob_dim_account 维表如果使用 jdbc 的 connector, flink 会在初始化的时候一次性读取所有的数据,
后续数据库中更新并不会触发 flink 计算。
要解决这个问题, dob_dim_account 需要变成流表。
Zhiwen Sun
On Thu, Nov 17, 2022 at 1:56 PM Jason_H wrote:
> hi,你好
> 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题
>
>
> | |
> Jason_H
>
用普通的 join, 不要用 lookup join
Zhiwen Sun
On Fri, Nov 11, 2022 at 11:10 AM Jason_H wrote:
>
>
> hi,大家好
>
> 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
> kakfa输入:
>
谢谢,有具体的思路嘛?
比如我需要先写入 jdbc 后再发送消息
是自定义一个 DynamicTableSink , 里面有 JdbcDynamicTableSink KafkaDynamicSink ,
还是说继承 JdbcDynamicTableSink , 自定义的类里面再去 new KafkaDynamicSink?
初看起来没办法知道什么时候 db 写入了。要知道什么时候写入,要去自定义 TableInsertOrUpdateStatementExecutor
Zhiwen Sun
On Tue, Oct 18, 2022 at 5:56 PM 悟空
不用 interval join 用普通的流 join 。时间只要不是 proctime 或者 eventtime 就行。
Zhiwen Sun
On Sat, Oct 15, 2022 at 9:46 AM 余列冰 wrote:
> Hi!
>
> 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15
>
> 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。
> ```sql
> CREATE TEMPORARY TA
好的,谢谢大家,之前也想过这个方案,复用/继承 JdbcDynamicTableSink 相关代码自定义 connector 。
Zhiwen Sun
On Fri, Oct 14, 2022 at 10:08 AM yidan zhao wrote:
> 在一个自定义sink中实现先写database,再发消息。
>
> 或者2个都是自定义的,但是不能通过sink,因为sink后就没数据了。通过process,第一个process完成写入database后,后续process发送消息。
>
> Shuo Cheng 于2022年1
实际业务的确是这样的。
state 永不过期, 要全量的数据计算,全量的数据放到 state 里面。
目前看来只有等 flink table store 了。
Zhiwen Sun
On Fri, Sep 23, 2022 at 8:29 AM casel.chen wrote:
>
> 我这里只是举了一个例子表示Flink用于OLAP实时关联场景会遇到的一个问题,实际业务中确实会出现两张关联表都需要更新情况,不管哪一边更新数据业务都想获取到最新关联结果,而不是旧的关联状态。引出我想问的另一个问题是如果查询模式固定,Flink实时关联是否能取代OLAP系统例如
就正常,而使用
AggregateFunction> 就会 NPE。
我怀疑使用 ListView 时,无法正常获得 TypeInference。
Zhiwen Sun
On Wed, Sep 7, 2022 at 11:46 PM Xuyang wrote:
> Hi,
> 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。
>
>
>
regateFunction extends AggregateFunction
我想请教下大家,为什么需要在外层包裹一个 MyAccumulator 呢, 我实际测下来, 直接时用
AggregateFunction> 在 getValue 的时候会报空指针异常
Flink 版本: 1.13.1
谢谢。
Zhiwen Sun
你应该没有正确理解 state 的使用
我们一般在程序里面是用的是 KeyedState , 也就是和 key 伴随的。
基于上面,所以 open() 里面只能对 state 进行初始化, 但是没有办法设置 state 的 value,因为这时候没有 key ;
另外一方面,也不会在 map() 的时候去 new state (可以认为 state 是一个大的 Map,你 map 的时候只是操作其中的一个
key)。
回到你的需求,你应该在 open() 的时候保存相关信息到类变量里面,当 map() 的时候再去 update state。
Zhiwen Sun
我猜测是 watermark 的问题, 看楼主的设置, watermark 是 -2s ,也就是说, order header 流,有数据晚了 2s
,就会被丢弃。
楼主之前看的也是 订单明细比订单主表晚几秒, 这只是同一个订单的数据生成时间差异。 如果是这样的话,使用一般的 inner join + ttl
就可以满足需求了。
BTW: watermark 我觉得很难使用好,实际使用场景非常有限。
Zhiwen Sun
On Wed, Jun 15, 2022 at 11:43 AM Shengkai Fang wrote:
> > 我在sql inner
'OK', 'order-name-1', 131)
> ON DUPLICATE KEY UPDATE `order_id`=VALUES(`order_id`),
> `proctime`=VALUES(`proctime`), `order_status`=VALUES(`order_status`),
> `order_name`=VALUES(`order_name`), `total`=VALUES(`total`)
Zhiwen Sun
On Mon, Mar 7, 2022 at 5:07 PM 黑色 wrote:
> 你看一下底层的源码实
不用那么复杂,正常的 insert select group by 即可, 一分钟写一次 mysql 就行。
参考 JDBC sink [1] 中的 sink.buffer-flush.interval 和 sink.buffer-flush.max-rows
参数
[1] :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/
Zhiwen Sun
On Thu, Dec 23, 2021 at 8:15 AM casel.chen wrote
看看 task manager 的 jvm 内存, jstack 情况 ?
Zhiwen Sun
On Tue, Oct 26, 2021 at 7:22 PM mayifan wrote:
> 非常感谢大佬的答复:
>
> 目前从任务来看的话总共存在三个任务,其中两个异常任务分别使用了1到2个MapState,过期时间均为1天或3天。
>
> 正常运行的任务使用了MapState及ListState各4个,过期时间为60min-120min。
>
> 异常任务在产生异
先通过 json 或者 raw format 消费原始 canal kafka , 过滤掉 delete 的数据写入到一个新的 kafka
,然后你再基于新的 kafka 建一个 canal-json 的表来落地。
Zhiwen Sun
On Wed, Jul 7, 2021 at 10:51 PM JasonLee <17610775...@163.com> wrote:
> hi
>
>
> 最后一个字段 type 就是操作的类型, 过滤掉 DELETE 就行了.
>
>
> Best
> JasonLee
&g
parquet 相关依赖增加了吗?
Zhiwen Sun
On Sun, Jun 27, 2021 at 3:57 PM Wei JI10 季伟
wrote:
> Hi:
>在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any format factory for identifier 'parquet' in t
HADOOP_CLASSPATH 设置了吗?
Zhiwen Sun
On Fri, Jun 18, 2021 at 9:47 AM yangpengyi <963087...@qq.com.invalid> wrote:
> 环境: FLINK 1.12 & CDH6.1.1
> 问题:
>
> 利用yarn-per-job提交时,在初始化hdfs客户端时出错。看起来应该是hadoop版本的兼容问题,不过从堆栈看应该使用
退订是发邮件到 user-zh-unsubscr...@flink.apache.org
Zhiwen Sun
On Tue, Jun 8, 2021 at 10:21 PM happiless wrote:
> 您好,麻烦邮件退订一下
>
>
> 发自我的iPhone
不需要 mapreduce 相关库吧。
我看我的 job 里加载到 classpath 的也没有 mapreduce。
Zhiwen Sun
On Wed, Jun 2, 2021 at 11:56 AM datayangl wrote:
> flink1.11.2 启动yarn-session之后发现,有部分类路径始终没有加载到class_path中去
> 环境变量配置如下:
> <
> http://apache-flink.147419.n8.nabble.com/file/t919/66604010-2A08-4A68-8478-7
]
[INFO] BUILD SUCCESS
[INFO]
Zhiwen Sun
On Fri, May 28, 2021 at 11:12 AM Zhiwen Sun wrote:
> 谢谢,看了下,junit 的确依赖 org.hamcrest ,而且相关版本都没问题。
>
> 那这个报错的原因是什么呢? 什么地方导致 hamcrest 被 exclude 了?然后手动增加了dependency 就好了?
谢谢,看了下,junit 的确依赖 org.hamcrest ,而且相关版本都没问题。
那这个报错的原因是什么呢? 什么地方导致 hamcrest 被 exclude 了?然后手动增加了dependency 就好了? 代码拉下来没修改过。
Zhiwen Sun
On Fri, May 28, 2021 at 10:58 AM Shuo Cheng wrote:
> Hi, org.hamcrest 是 junit 的依赖
>
> On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun wrote:
>
> &g
: mvn clean install -DskipTests -Dfast -Dscala-2.12
目前能够正常编译了,我看 release 版本支持 scala-2.11 的。是我的环境有问题吗?
Zhiwen Sun
On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun wrote:
> 才编译到 Test utils : Junit 模块,就报错了
>
> maven 版本: 3.2.5
> jdk 版本:1.8.0_251
> flink 版本: flink 1.12.2
> 执行的命令
utils : Junit . FAILURE [
0.283 s]
而且我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml
的确没加 org.hamcrest 相关依赖啊。
请问大家下,原因是什么呢?
Zhiwen Sun
utils : Junit . FAILURE [
0.283 s]
看起来是缺少 org.hamcrest 相关依赖
我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml
的确没加 org.hamcrest 相关依赖, 不知道这个是怎么工作的。
请问大家下,原因是什么呢?
Zhiwen Sun
23 matches
Mail list logo