回复: JdbcSink引发的IO过高
你好,感谢老师回复 `insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,我理解这两种情况的话,对于数据库来说,就是一次插入与两次插入的问题了吧,要是数据量大的话,感觉对性能还是有影响的 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年7月26日 10:52 | | 收件人 | | | 主题 | Re: JdbcSink引发的IO过高 | Hi, 目前JdbcSink会为每个Sink创建PreparedStatement,当进行batch数据处理时,会先调用PreparedStatement的addBatch()函数将数据放入缓存,到达flush条件后调用executeBatch()函数批量发送数据到jdbc server,这样会节省网络IO。 具体到数据库侧,我理解执行 `insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,对底层存储可能差别不大,因为插入的数据量不会减少,具体你可以观察一下 Best, Shammon FY On Tue, Jul 25, 2023 at 4:02 PM 小昌同学 wrote: 各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师: 我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式 是 insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10); 或者是 insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6) insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10) 如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO | errorStream.addSink(JdbcSink.sink( "insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)", (statement, result) -> { statement.setString(1,result.getAction()); statement.setString(2,result.getServerIp()); statement.setString(3,result.getHandleSerialno()); statement.setString(4,result.getMd5Num()); statement.setString(5,result.getInsertTime()); statement.setString(6, result.getDateTime()); }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://111/data_ret_log?useSSL=false&useUnicode=false&failOverReadOnly=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnectForPools=true&serverTimezone=Asia/Shanghai&autoReconnect=true") .withDriverName("com.mysql.jdbc.Driver") .withUsername("111") .withPassword("222") .build() )).name("sink-error-mysql"); | | | 小昌同学 | | ccc0606fight...@163.com |
Re: JdbcSink引发的IO过高
Hi, 目前JdbcSink会为每个Sink创建PreparedStatement,当进行batch数据处理时,会先调用PreparedStatement的addBatch()函数将数据放入缓存,到达flush条件后调用executeBatch()函数批量发送数据到jdbc server,这样会节省网络IO。 具体到数据库侧,我理解执行 `insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,对底层存储可能差别不大,因为插入的数据量不会减少,具体你可以观察一下 Best, Shammon FY On Tue, Jul 25, 2023 at 4:02 PM 小昌同学 wrote: > 各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师: > > 我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式 > 是 > insert into error_md5_info (action, serverIp, > handleSerialno,md5Num,insertTime,dateTime) values > (1,2,3,4,5,6),(1,2,3,4,9,10); > 或者是 > insert into error_md5_info (action, serverIp, > handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6) > insert into error_md5_info (action, serverIp, > handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10) > 如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO > > > > | > errorStream.addSink(JdbcSink.sink( > "insert into error_md5_info (action, serverIp, > handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)", > (statement, result) -> { > statement.setString(1,result.getAction()); > statement.setString(2,result.getServerIp()); > statement.setString(3,result.getHandleSerialno()); > statement.setString(4,result.getMd5Num()); > statement.setString(5,result.getInsertTime()); > statement.setString(6, result.getDateTime()); > }, > JdbcExecutionOptions.builder() > .withBatchSize(1000) > .withBatchIntervalMs(200) > .withMaxRetries(5) > .build(), > new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() > > .withUrl("jdbc:mysql://111/data_ret_log?useSSL=false&useUnicode=false&failOverReadOnly=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnectForPools=true&serverTimezone=Asia/Shanghai&autoReconnect=true") > .withDriverName("com.mysql.jdbc.Driver") > .withUsername("111") > .withPassword("222") > .build() > )).name("sink-error-mysql"); > | > | | > 小昌同学 > | > | > ccc0606fight...@163.com > |
Re: 关于DataStream API计算批数据的聚合值
你好: Batch 模式下的 reduce 操作默认应该就是只输出最后一条数据(per-key)的。Agg 的话可能有点麻烦,可以使用 GlobalWindow + 自定义 Trigger 来 Workaround. Best regards, Weijie Liu Join 于2023年7月26日周三 09:10写道: > 例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值 >
Re: 关于DataStream API计算批数据的聚合值
Hi, 跟使用普通流式作业的DataStream用法一样,只需要在RuntimeMode里使用Batch模式,Flink在Batch模式下会只输出最后的结果,而不会输出中间结果。具体可以参考Flink里的WordCount例子 [1] [1] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java On Wed, Jul 26, 2023 at 9:10 AM Liu Join wrote: > 例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值 >
关于DataStream API计算批数据的聚合值
例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值
JdbcSink引发的IO过高
各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师: 我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式 是 insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10); 或者是 insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6) insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10) 如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO | errorStream.addSink(JdbcSink.sink( "insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)", (statement, result) -> { statement.setString(1,result.getAction()); statement.setString(2,result.getServerIp()); statement.setString(3,result.getHandleSerialno()); statement.setString(4,result.getMd5Num()); statement.setString(5,result.getInsertTime()); statement.setString(6, result.getDateTime()); }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://111/data_ret_log?useSSL=false&useUnicode=false&failOverReadOnly=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnectForPools=true&serverTimezone=Asia/Shanghai&autoReconnect=true") .withDriverName("com.mysql.jdbc.Driver") .withUsername("111") .withPassword("222") .build() )).name("sink-error-mysql"); | | | 小昌同学 | | ccc0606fight...@163.com |