你好,感谢老师回复 `insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,我理解这两种情况的话,对于数据库来说,就是一次插入与两次插入的问题了吧,要是数据量大的话,感觉对性能还是有影响的 | | 小昌同学 | | ccc0606fight...@163.com | ---- 回复的原邮件 ---- | 发件人 | Shammon FY<zjur...@gmail.com> | | 发送日期 | 2023年7月26日 10:52 | | 收件人 | <user-zh@flink.apache.org> | | 主题 | Re: JdbcSink引发的IO过高 | Hi,
目前JdbcSink会为每个Sink创建PreparedStatement,当进行batch数据处理时,会先调用PreparedStatement的addBatch()函数将数据放入缓存,到达flush条件后调用executeBatch()函数批量发送数据到jdbc server,这样会节省网络IO。 具体到数据库侧,我理解执行 `insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,对底层存储可能差别不大,因为插入的数据量不会减少,具体你可以观察一下 Best, Shammon FY On Tue, Jul 25, 2023 at 4:02 PM 小昌同学 <ccc0606fight...@163.com> wrote: 各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师: 我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式 是 insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10); 或者是 insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6) insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10) 如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO | errorStream.addSink(JdbcSink.sink( "insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)", (statement, result) -> { statement.setString(1,result.getAction()); statement.setString(2,result.getServerIp()); statement.setString(3,result.getHandleSerialno()); statement.setString(4,result.getMd5Num()); statement.setString(5,result.getInsertTime()); statement.setString(6, result.getDateTime()); }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://111/data_ret_log?useSSL=false&useUnicode=false&failOverReadOnly=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnectForPools=true&serverTimezone=Asia/Shanghai&autoReconnect=true") .withDriverName("com.mysql.jdbc.Driver") .withUsername("111") .withPassword("222") .build() )).name("sink-error-mysql"); | | | 小昌同学 | | ccc0606fight...@163.com |