哦哦, 好吧,我一直以为你说的“新旧”是是否指定了update-mode。理解错了。 good,那应该没问题了,我去改改。
Jark Wu <imj...@gmail.com> 于2020年11月23日周一 下午5:18写道: > 你用的还是老的 connector 吧?也就是 'connector.type' = 'jdbc'。这个是根据 query > 是否有更新来决定工作模式的。 > 如果你用新的 connector,也就是 'connector' = 'jdbc',这个的行为就是你说的第 (2) 种行为。 > > Best, > Jark > > On Mon, 23 Nov 2020 at 17:14, 赵一旦 <hinobl...@gmail.com> wrote: > > > duplicate情况可能update pv =values(pv), 也可能 update pv = pv + > > values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。 > > > > 此外,当前jdbc connector中,对于source情况也支持自定义sql,我指参数connector.read.query' > > 。所以其实可以类似来个参数connector.write.sql',sql中通过?占位符指定字段啥的。 > > > > > > 赵一旦 <hinobl...@gmail.com> 于2020年11月23日周一 下午5:09写道: > > > > > 总结下: > > > (1)group > > > > > > by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。 > > > (2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。 > > > > > > > > > > > > 如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。 > > > 还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。 > > > > > > > > > 说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate > > update方式输出。 > > > 甚至DDL中推荐可以搞个自定义on > > > duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on > duplicate > > > update功能。 > > > > > > > > > > > > > > > 赵一旦 <hinobl...@gmail.com> 于2020年11月23日周一 下午4:48写道: > > > > > >> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。 > > >> 发现这种方式也不行,但是加了group by之后是可以的。 > > >> > > >> (1) > > >> 所以说是否还需要query带有key的语义才行呢? > > >> 比如group by的结果是可能update的,并且基于group by key也指出了key。 > > >> > > >> 那么group by + tumble > window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢? > > >> > > >> (2)如JarkWu所说,是mysql表的DDL部分决定。 > > >> > > >> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢? > > >> > > >> Jark Wu <imj...@gmail.com> 于2020年11月23日周一 下午4:28写道: > > >> > > >>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。 > > >>> > > >>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。 > > >>> > > >>> Best, > > >>> Jark > > >>> > > >>> On Mon, 23 Nov 2020 at 15:39, 赵一旦 <hinobl...@gmail.com> wrote: > > >>> > > >>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到 > > >>> > > > >>> > > > >>> > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes > > >>> > 页面。 > > >>> > > > >>> > > > >>> > > > >>> > > > 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。 > > >>> > > > >>> > Jark Wu <imj...@gmail.com> 于2020年11月23日周一 下午3:32写道: > > >>> > > > >>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。 > > >>> > > > > >>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦 <hinobl...@gmail.com> wrote: > > >>> > > > > >>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling > > >>> > > > < > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling > > >>> > > > > > > >>> > > > > > >>> > > > Flink uses the primary key that defined in DDL when writing > data > > to > > >>> > > > external databases. The connector operate in upsert mode if the > > >>> primary > > >>> > > key > > >>> > > > was defined, otherwise, the connector operate in append mode. > > >>> > > > > > >>> > > > In upsert mode, Flink will insert a new row or update the > > existing > > >>> row > > >>> > > > according to the primary key, Flink can ensure the idempotence > in > > >>> this > > >>> > > way. > > >>> > > > To guarantee the output result is as expected, it’s recommended > > to > > >>> > define > > >>> > > > primary key for the table and make sure the primary key is one > of > > >>> the > > >>> > > > unique key sets or primary key of the underlying database > table. > > In > > >>> > > append > > >>> > > > mode, Flink will interpret all records as INSERT messages, the > > >>> INSERT > > >>> > > > operation may fail if a primary key or unique constraint > > violation > > >>> > > happens > > >>> > > > in the underlying database. > > >>> > > > > > >>> > > > See CREATE TABLE DDL > > >>> > > > < > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table > > >>> > > > > > > >>> > > > for > > >>> > > > more details about PRIMARY KEY syntax. > > >>> > > > > > >>> > > > > > >>> > > > 这里也有一点,In append mode, Flink will interpret all records as > INSERT > > >>> > > messages, > > >>> > > > the INSERT operation may fail if a primary key or unique > > constraint > > >>> > > > violation happens in the underlying database. 什么叫append > > >>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥? > > >>> > > > > > >>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。 > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > 赵一旦 <hinobl...@gmail.com> 于2020年11月23日周一 下午3:02写道: > > >>> > > > > > >>> > > > > 补充sql: > > >>> > > > > > > >>> > > > > DDL: > > >>> > > > > > > >>> > > > > CREATE TABLE flink_recent_pv_subid > > >>> > > > > ( > > >>> > > > > `supply_id` STRING, > > >>> > > > > `subid` STRING, > > >>> > > > > `mark` STRING, > > >>> > > > > `time` STRING, > > >>> > > > > `pv` BIGINT, > > >>> > > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT > > >>> ENFORCED > > >>> > > > > ) WITH ( > > >>> > > > > 'connector.type' = 'jdbc', > > >>> > > > > > > >>> > > > > ...... > > >>> > > > > > > >>> > > > > ); > > >>> > > > > > > >>> > > > > > > >>> > > > > 查询SQL: > > >>> > > > > > > >>> > > > > INSERT INTO > > >>> > > > > flink_recent_pv_subid > > >>> > > > > SELECT > > >>> > > > > `sid`, > > >>> > > > > `subid`, > > >>> > > > > `mark`, > > >>> > > > > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' > MINUTE), > > >>> > > > 'yyyyMMddHHmm') as `time`, > > >>> > > > > count(1) AS `pv` > > >>> > > > > FROM baidu_log_view > > >>> > > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL > > '5' > > >>> > > MINUTE); > > >>> > > > > > > >>> > > > > > > >>> > > > > 赵一旦 <hinobl...@gmail.com> 于2020年11月23日周一 下午3:00写道: > > >>> > > > > > > >>> > > > >> @hailongwang 一样的。 > > >>> > > > >> > > >>> > > > >> 有个情况说明下,我是tumble > > >>> > window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。 > > >>> > > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况, > > >>> > > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。 > > >>> > > > >> > > >>> > > > >> > > >>> > > > >> > > >>> > > > >> > > >>> > > > >> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道: > > >>> > > > >> > > >>> > > > >>> 数据库中主键的设置跟 primary key 定义的一样不? > > >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> Best, > > >>> > > > >>> Hailong > > >>> > > > >>> 在 2020-11-23 13:15:01,"赵一旦" <hinobl...@gmail.com> 写道: > > >>> > > > >>> >如题,按照官方文档,当mysql表定义了primary > > >>> key的时候,会使用UpsertTableSink,并且会使用insert > > >>> > on > > >>> > > > >>> >duplicate方式写入。 > > >>> > > > >>> > > > >>> > > > >>> >但我在使用中,发现报了 duplicate entry的错误。例如: > > >>> > > > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4. > > >>> > > > >>> >MySQLIntegrityConstraintViolationException: Duplicate > entry > > >>> > > > >>> >'2036-feed_landing_box_news-2000-202011231405' for key > > >>> 'uniq_ssmt' > > >>> > > > >>> > at > > >>> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > > >>> > > > >>> Method) > > >>> > > > >>> > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance( > > >>> > > > >>> >NativeConstructorAccessorImpl.java:62) > > >>> > > > >>> > at > > >>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance( > > >>> > > > >>> >DelegatingConstructorAccessorImpl.java:45) > > >>> > > > >>> > at > > >>> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > > >>> > > > >>> > at > com.mysql.jdbc.Util.handleNewInstance(Util.java:411) > > >>> > > > >>> > at com.mysql.jdbc.Util.getInstance(Util.java:386) > > >>> > > > >>> > at > > >>> > > com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041) > > >>> > > > >>> > at > > >>> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190) > > >>> > > > >>> > at > > >>> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122) > > >>> > > > >>> > at > com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570) > > >>> > > > >>> > at > > >>> com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731) > > >>> > > > >>> > at > > >>> > > com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818) > > >>> > > > >>> > at > > >>> > > > >>> > > >>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement > > >>> > > > >>> >.java:2157) > > >>> > > > >>> > at > > >>> > > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement > > >>> > > > >>> >.java:2460) > > >>> > > > >>> > at > > >>> > > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement > > >>> > > > >>> >.java:2377) > > >>> > > > >>> > at > > >>> > > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement > > >>> > > > >>> >.java:2361) > > >>> > > > >>> > at > > com.mysql.jdbc.PreparedStatement.executeBatchedInserts( > > >>> > > > >>> >PreparedStatement.java:1793) > > >>> > > > >>> > > > >>> > > > >>> >(2) > > >>> > > > >>> > > >>> > > >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。 > > >>> > > > >>> >但这个冲突的entry是在14.11分那一波才报错的。 > > >>> > > > >>> > > >>> > > > >> > > >>> > > > > > >>> > > > > >>> > > > >>> > > >> > > >