Re: flinksql join

2022-11-16 Thread Zhiwen Sun
dob_dim_account 维表如果使用 jdbc 的 connector, flink 会在初始化的时候一次性读取所有的数据, 后续数据库中更新并不会触发 flink 计算。 要解决这个问题, dob_dim_account 需要变成流表。 Zhiwen Sun On Thu, Nov 17, 2022 at 1:56 PM Jason_H wrote: > hi,你好 > 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题 > > > | | > Jason_H >

Re: flinksql join

2022-11-10 Thread Zhiwen Sun
用普通的 join, 不要用 lookup join Zhiwen Sun On Fri, Nov 11, 2022 at 11:10 AM Jason_H wrote: > > > hi,大家好 > > 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: > kakfa输入: >

Re: Flink SQL 中同时写入多个 sink 时,是否能够保证先后次序

2022-10-19 Thread Zhiwen Sun
谢谢,有具体的思路嘛? 比如我需要先写入 jdbc 后再发送消息 是自定义一个 DynamicTableSink , 里面有 JdbcDynamicTableSink KafkaDynamicSink , 还是说继承 JdbcDynamicTableSink , 自定义的类里面再去 new KafkaDynamicSink? 初看起来没办法知道什么时候 db 写入了。要知道什么时候写入,要去自定义 TableInsertOrUpdateStatementExecutor Zhiwen Sun On Tue, Oct 18, 2022 at 5:56 PM 悟空

Re: Flink 1.15 Deduplicate之后Interval Join出错

2022-10-17 Thread Zhiwen Sun
不用 interval join 用普通的流 join 。时间只要不是 proctime 或者 eventtime 就行。 Zhiwen Sun On Sat, Oct 15, 2022 at 9:46 AM 余列冰 wrote: > Hi! > > 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15 > > 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 > ```sql > CREATE TEMPORARY TA

Re: Flink SQL 中同时写入多个 sink 时,是否能够保证先后次序

2022-10-13 Thread Zhiwen Sun
好的,谢谢大家,之前也想过这个方案,复用/继承 JdbcDynamicTableSink 相关代码自定义 connector 。 Zhiwen Sun On Fri, Oct 14, 2022 at 10:08 AM yidan zhao wrote: > 在一个自定义sink中实现先写database,再发消息。 > > 或者2个都是自定义的,但是不能通过sink,因为sink后就没数据了。通过process,第一个process完成写入database后,后续process发送消息。 > > Shuo Cheng 于2022年1

Re: Re: flink实时双流驱动join问题

2022-09-23 Thread Zhiwen Sun
实际业务的确是这样的。 state 永不过期, 要全量的数据计算,全量的数据放到 state 里面。 目前看来只有等 flink table store 了。 Zhiwen Sun On Fri, Sep 23, 2022 at 8:29 AM casel.chen wrote: > > 我这里只是举了一个例子表示Flink用于OLAP实时关联场景会遇到的一个问题,实际业务中确实会出现两张关联表都需要更新情况,不管哪一边更新数据业务都想获取到最新关联结果,而不是旧的关联状态。引出我想问的另一个问题是如果查询模式固定,Flink实时关联是否能取代OLAP系统例如

Re: 关于 UDAF 里面 ListView 的疑问

2022-09-07 Thread Zhiwen Sun
就正常,而使用 AggregateFunction> 就会 NPE。 我怀疑使用 ListView 时,无法正常获得 TypeInference。 Zhiwen Sun On Wed, Sep 7, 2022 at 11:46 PM Xuyang wrote: > Hi, > 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。 > > >

关于 UDAF 里面 ListView 的疑问

2022-09-07 Thread Zhiwen Sun
regateFunction extends AggregateFunction 我想请教下大家,为什么需要在外层包裹一个 MyAccumulator 呢, 我实际测下来, 直接时用 AggregateFunction> 在 getValue 的时候会报空指针异常 Flink 版本: 1.13.1 谢谢。 Zhiwen Sun

Re: Re: 关于Flink state初始化的问题

2022-08-29 Thread Zhiwen Sun
你应该没有正确理解 state 的使用 我们一般在程序里面是用的是 KeyedState , 也就是和 key 伴随的。 基于上面,所以 open() 里面只能对 state 进行初始化, 但是没有办法设置 state 的 value,因为这时候没有 key ; 另外一方面,也不会在 map() 的时候去 new state (可以认为 state 是一个大的 Map,你 map 的时候只是操作其中的一个 key)。 回到你的需求,你应该在 open() 的时候保存相关信息到类变量里面,当 map() 的时候再去 update state。 Zhiwen Sun

Re: Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-14 Thread Zhiwen Sun
我猜测是 watermark 的问题, 看楼主的设置, watermark 是 -2s ,也就是说, order header 流,有数据晚了 2s ,就会被丢弃。 楼主之前看的也是 订单明细比订单主表晚几秒, 这只是同一个订单的数据生成时间差异。 如果是这样的话,使用一般的 inner join + ttl 就可以满足需求了。 BTW: watermark 我觉得很难使用好,实际使用场景非常有限。 Zhiwen Sun On Wed, Jun 15, 2022 at 11:43 AM Shengkai Fang wrote: > > 我在sql inner

Re: flink-connector-jdbc是否支持多个values问题

2022-06-14 Thread Zhiwen Sun
'OK', 'order-name-1', 131) > ON DUPLICATE KEY UPDATE `order_id`=VALUES(`order_id`), > `proctime`=VALUES(`proctime`), `order_status`=VALUES(`order_status`), > `order_name`=VALUES(`order_name`), `total`=VALUES(`total`) Zhiwen Sun On Mon, Mar 7, 2022 at 5:07 PM 黑色 wrote: > 你看一下底层的源码实

Re: flink sql回撤流sink优化问题

2021-12-25 Thread Zhiwen Sun
不用那么复杂,正常的 insert select group by 即可, 一分钟写一次 mysql 就行。 参考 JDBC sink [1] 中的 sink.buffer-flush.interval 和 sink.buffer-flush.max-rows 参数 [1] : https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/ Zhiwen Sun On Thu, Dec 23, 2021 at 8:15 AM casel.chen wrote

Re: Re: Flink任务每运行20天均会发生内部异常

2021-10-27 Thread Zhiwen Sun
看看 task manager 的 jvm 内存, jstack 情况 ? Zhiwen Sun On Tue, Oct 26, 2021 at 7:22 PM mayifan wrote: > 非常感谢大佬的答复: > > 目前从任务来看的话总共存在三个任务,其中两个异常任务分别使用了1到2个MapState,过期时间均为1天或3天。 > > 正常运行的任务使用了MapState及ListState各4个,过期时间为60min-120min。 > > 异常任务在产生异

Re: 如何将canal json格式数据按操作类型过滤

2021-07-07 Thread Zhiwen Sun
先通过 json 或者 raw format 消费原始 canal kafka , 过滤掉 delete 的数据写入到一个新的 kafka ,然后你再基于新的 kafka 建一个 canal-json 的表来落地。 Zhiwen Sun On Wed, Jul 7, 2021 at 10:51 PM JasonLee <17610775...@163.com> wrote: > hi > > > 最后一个字段 type 就是操作的类型, 过滤掉 DELETE 就行了. > > > Best > JasonLee &g

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 Thread Zhiwen Sun
parquet 相关依赖增加了吗? Zhiwen Sun On Sun, Jun 27, 2021 at 3:57 PM Wei JI10 季伟 wrote: > Hi: >在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息 > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any format factory for identifier 'parquet' in t

Re: Flink 提交到yarn失败

2021-06-18 Thread Zhiwen Sun
HADOOP_CLASSPATH 设置了吗? Zhiwen Sun On Fri, Jun 18, 2021 at 9:47 AM yangpengyi <963087...@qq.com.invalid> wrote: > 环境: FLINK 1.12 & CDH6.1.1 > 问题: > > 利用yarn-per-job提交时,在初始化hdfs客户端时出错。看起来应该是hadoop版本的兼容问题,不过从堆栈看应该使用

Re: 邮件退订

2021-06-08 Thread Zhiwen Sun
退订是发邮件到 user-zh-unsubscr...@flink.apache.org Zhiwen Sun On Tue, Jun 8, 2021 at 10:21 PM happiless wrote: > 您好,麻烦邮件退订一下 > > > 发自我的iPhone

Re: flink1.11.2 yarn-session 部分类路径未加载

2021-06-01 Thread Zhiwen Sun
不需要 mapreduce 相关库吧。 我看我的 job 里加载到 classpath 的也没有 mapreduce。 Zhiwen Sun On Wed, Jun 2, 2021 at 11:56 AM datayangl wrote: > flink1.11.2 启动yarn-session之后发现,有部分类路径始终没有加载到class_path中去 > 环境变量配置如下: > < > http://apache-flink.147419.n8.nabble.com/file/t919/66604010-2A08-4A68-8478-7

Re: flink 1.12.2 编译报错

2021-05-28 Thread Zhiwen Sun
] [INFO] BUILD SUCCESS [INFO] Zhiwen Sun On Fri, May 28, 2021 at 11:12 AM Zhiwen Sun wrote: > 谢谢,看了下,junit 的确依赖 org.hamcrest ,而且相关版本都没问题。 > > 那这个报错的原因是什么呢? 什么地方导致 hamcrest 被 exclude 了?然后手动增加了dependency 就好了?

Re: flink 1.12.2 编译报错

2021-05-27 Thread Zhiwen Sun
谢谢,看了下,junit 的确依赖 org.hamcrest ,而且相关版本都没问题。 那这个报错的原因是什么呢? 什么地方导致 hamcrest 被 exclude 了?然后手动增加了dependency 就好了? 代码拉下来没修改过。 Zhiwen Sun On Fri, May 28, 2021 at 10:58 AM Shuo Cheng wrote: > Hi, org.hamcrest 是 junit 的依赖 > > On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun wrote: > > &g

Re: flink 1.12.2 编译报错

2021-05-27 Thread Zhiwen Sun
: mvn clean install -DskipTests -Dfast -Dscala-2.12 目前能够正常编译了,我看 release 版本支持 scala-2.11 的。是我的环境有问题吗? Zhiwen Sun On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun wrote: > 才编译到 Test utils : Junit 模块,就报错了 > > maven 版本: 3.2.5 > jdk 版本:1.8.0_251 > flink 版本: flink 1.12.2 > 执行的命令

flink 1.12.2 编译报错

2021-05-27 Thread Zhiwen Sun
utils : Junit . FAILURE [ 0.283 s] 而且我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml 的确没加 org.hamcrest 相关依赖啊。 请问大家下,原因是什么呢? Zhiwen Sun

flink 1.12.2 编译报错

2021-05-27 Thread Zhiwen Sun
utils : Junit . FAILURE [ 0.283 s] 看起来是缺少 org.hamcrest 相关依赖 我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml 的确没加 org.hamcrest 相关依赖, 不知道这个是怎么工作的。 请问大家下,原因是什么呢? Zhiwen Sun