Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 jy l
好的,我试一下。谢谢 Best Jark Wu 于2020年11月23日周一 下午2:06写道: > 那是不是用非窗口聚合,开5s mini batch,是不是可以达到你的需求? > > Best, > Jark > > On Mon, 23 Nov 2020 at 13:16, jy l wrote: > > > 使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。 > > 目前我们的系统大致架构是mysql(debezium)>kafka--->flink>es > > > > Jark Wu 于2020年11月23日周一

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 赵一旦
看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes 页面。 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。 Jark Wu 于2020年11月23日周一 下午3:32写道: > 请用新的 jdbc connector 。老的

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 Jark Wu
请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。 On Mon, 23 Nov 2020 at 15:21, 赵一旦 wrote: > 如下是Flink官方文档JBDC connector的部分内容。Key handling > < > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling > > > > Flink uses the primary key that defined

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 赵一旦
如下是Flink官方文档JBDC connector的部分内容。Key handling Flink uses the primary key that defined in DDL when writing data to external databases. The connector operate in upsert mode if the primary key

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 赵一旦
补充sql: DDL: CREATE TABLE flink_recent_pv_subid ( `supply_id` STRING, `subid` STRING, `mark` STRING, `time` STRING, `pv`BIGINT, PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED ) WITH ( 'connector.type' = 'jdbc', .. ); 查询SQL:

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 赵一旦
@hailongwang 一样的。 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况, flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。 hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道: > 数据库中主键的设置跟 primary key

Re: TUMBLE函数不支持 回撤流

2020-11-22 文章 赵一旦
@LakeShen。 怎么看是append/retract数据流呢?是通过逻辑自己判定还是说有什么flink层面的信息直接反映。 LakeShen 于2020年11月4日周三 上午10:12写道: > Hi 夜思流年梦, > > 看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。 > 如果是 retract ,应该就不能再上面进行窗口计算了。 > > Best, > LakeShen > > 史 正超 于2020年11月3日周二 下午6:34写道: > > > canal-json 的format也是会有delete

答复: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-22 文章 sherlock zw
多谢指点,试了下,返回 Tuple 类型作为 key 是可以按多个字段进行分组的,拼接成 String 的话应该也是可以的 final SingleOutputStreamOperator> sum = flatMap .keyBy(new KeySelector, Tuple2>() { @Override public Tuple2 getKey(Tuple3 tuple3) throws Exception { return

Re:JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 hailongwang
数据库中主键的设置跟 primary key 定义的一样不? Best, Hailong 在 2020-11-23 13:15:01,"赵一旦" 写道: >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on >duplicate方式写入。 > >但我在使用中,发现报了 duplicate entry的错误。例如: >Caused by: com.mysql.jdbc.exceptions.jdbc4. >MySQLIntegrityConstraintViolationException:

Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-22 文章 赵一旦
connect前生成watermark也是可以的应该,但是你需要把ruleConfigSource流也赋watermark。我猜是这个地方出问题了。 huang botao 于2020年11月19日周四 下午12:58写道: > hi, zhisheng, hailongwang: > > 感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect() > 后面没有定义watermar导致,在connect后指定watermark就可以触发window了。 > > > > On Wed, Nov 18, 2020 at 10:46 PM

Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-22 文章 赵一旦
(1)返回字符串,自己拼接就可以。 (2)返回Tuple类型作为Key。 1.10到1.11相当于是去除了多key的辅助keyBy方法,本身内部就是组成tuple。原因不清楚。 sherlock zw 于2020年11月20日周五 上午11:18写道: > 我看了现在的 flink 1.11 的 keyBy 的代码,是使用的KeySelector > key,但每次只能返回一个字段,不支持返回多个字段,也就说明了一次只能按一个字段去分组(PS: test.keyBy(t -> > t.f0)),如果我想按多个字段进行分组的话该怎么操作呢? > > -邮件原件- >

JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 赵一旦
如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on duplicate方式写入。 但我在使用中,发现报了 duplicate entry的错误。例如: Caused by: com.mysql.jdbc.exceptions.jdbc4. MySQLIntegrityConstraintViolationException: Duplicate entry '2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt' at

Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 Jark Wu
那是不是用非窗口聚合,开5s mini batch,是不是可以达到你的需求? Best, Jark On Mon, 23 Nov 2020 at 13:16, jy l wrote: > 使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。 > 目前我们的系统大致架构是mysql(debezium)>kafka--->flink>es > > Jark Wu 于2020年11月23日周一 上午10:35写道: > > > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。 > > 你可以使用非

Re: Flink Standalone HA问题

2020-11-22 文章 赵一旦
没看懂逻辑,flink的HA和hadoop的HA啥关系。 如果hadoop出现问题,对于flink来说hadoop是依赖,这会导致ckpt失败感觉,但和flink集群自身没关系,flink集群应该不会失败。 Fei Han 于2020年11月21日周六 上午11:51写道: > @all! > Flink版本是1.10.2。集群模式是Flink Standalone HA。 > 问题: > 如果在hadoop HA 的情况下,两个namenode都宕机了。重启机器后,启动hadoop。 > Flink Standalone HA

Re: flink zeppelin的type参数(append/update/single)和flink的动态表有关系嘛

2020-11-22 文章 Jeff Zhang
和 flink 没关系,是zeppelin自己定义的参数,只影响select 语句,对于zeppelin的数据可视化有影响,不影响flink job 赵一旦 于2020年11月23日周一 下午1:11写道: > 如题,这个参数是仅仅zeppelin自身的参数,用于决定如何展示数据之类的逻辑呢? > 还是和flink任务也有关系。按照 > https://www.yuque.com/jeffzhangjianfeng/gldg8w/te2l1c#AzSOu > 说法,append模式第一个结果列必须是时间,所以看起来更像是zeppelin自身的要求。 > >

Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 jy l
使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。 目前我们的系统大致架构是mysql(debezium)>kafka--->flink>es Jark Wu 于2020年11月23日周一 上午10:35写道: > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。 > 你可以使用非 window 聚合来代替。 > > Btw,你可能说一下你的需求场景么? 为什么需要在 CDC 上做 window 操作呢? > > Best, > Jark > > On Mon, 23 Nov 2020 at

flink zeppelin的type参数(append/update/single)和flink的动态表有关系嘛

2020-11-22 文章 赵一旦
如题,这个参数是仅仅zeppelin自身的参数,用于决定如何展示数据之类的逻辑呢? 还是和flink任务也有关系。按照https://www.yuque.com/jeffzhangjianfeng/gldg8w/te2l1c#AzSOu 说法,append模式第一个结果列必须是时间,所以看起来更像是zeppelin自身的要求。 我看了下append方式执行,jdbc仍然使用的upsertSink。 所以谁确认下这个参数是不是和具体任务没啥关系。

Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-22 文章 赵一旦
这个报错和kafka没有关系的哈,我大概理解是提交任务的瞬间,jobManager/taskManager机器压力较大,存在机器之间心跳超时什么的? 这个partition应该是指flink运行图中的数据partition,我感觉。没有具体细看,每次提交的瞬间可能遇到这个问题,然后会自动重试成功。 zhisheng 于2020年11月18日周三 下午10:51写道: > 是不是有 kafka 机器挂了? > > Best > zhisheng > > hailongwang <18868816...@163.com> 于2020年11月18日周三 下午5:56写道: > > >

Re: 请教这种数据格式怎么配置event time呢

2020-11-22 文章 赵一旦
已解决。COALESCE函数调用那的类型问题。d['server_time']是String类型,后边0是数字。 Jark Wu 于2020年11月17日周二 下午11:48写道: > 报什么错? > > > On Tue, 17 Nov 2020 at 23:43, 赵一旦 wrote: > > > CREATE TABLE user_log > > ( > > d MAP, > > process_time AS PROCTIME(), > > event_time AS > >

Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 Jark Wu
我建了个 issue 跟进这个功能:https://issues.apache.org/jira/browse/FLINK-20281 On Mon, 23 Nov 2020 at 10:35, Jark Wu wrote: > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。 > 你可以使用非 window 聚合来代替。 > > Btw,你可能说一下你的需求场景么? 为什么需要在 CDC 上做 window 操作呢? > > Best, > Jark > > On Mon, 23 Nov 2020 at 10:28, jy l wrote: >

Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 Jark Wu
Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。 你可以使用非 window 聚合来代替。 Btw,你可能说一下你的需求场景么? 为什么需要在 CDC 上做 window 操作呢? Best, Jark On Mon, 23 Nov 2020 at 10:28, jy l wrote: > Hi: > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下: > [image: image.png] > [image: image.png] >

maven overlapp 情况下shade选择

2020-11-22 文章 赵一旦
如题,如果出现overlap,shade的选择策略有人清楚嘛。 如果我依赖某个包,重写覆盖部分类(我知道的是我的类优先级更高),但shade的时候是否也是我的类优先级更高。

FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 jy l
Hi: 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下: [image: image.png] [image: image.png] 分组计算的SQL如下: [image: image.png] 在执行计算时,报了如下异常: Exception in thread "main" org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete

加载外部配置文件

2020-11-22 文章 范未太
哪位大佬用过typeSafe config,flink 如何把外部.conf 配置文件,在main函数中,通过全局配置的方式,传递给function 在open函数里调用 | | 范未太 | | 邮箱:fwt_...@163.com | 签名由 网易邮箱大师 定制

flink 1.11.2 SQL Client self inner join 结果不正确

2020-11-22 文章 段晓雄
我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值 sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同 select p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c, p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d from snmpprobe.p_snmp_ifxtable as p0m inner join snmpprobe.p_snmp_ifxtable as p5m on

flink 1.11.2 SQL Client self inner join 结果不正确

2020-11-22 文章 macdoor
我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值 sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同 select p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c, p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d from snmpprobe.p_snmp_ifxtable as p0m inner join snmpprobe.p_snmp_ifxtable as p5m on

用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确

2020-11-22 文章 macdoor
我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值 sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同 select p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c, p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d from snmpprobe.p_snmp_ifxtable as p0m inner join snmpprobe.p_snmp_ifxtable as p5m on