hi greemqq...@163.com,15701181132mr....@gmail.com 能详细介绍一下 幂等 处理的场景吗? 例如通过upsert语句能做到幂等
hi hdxg1101300...@163.com, 你的sink function的一个statement batch里有insert,delete等语句混合的情况?是用的是flink sql,还是datastream? Bests, Godfrey Bests, Godfrey Michael Ran <greemqq...@163.com> 于2020年6月3日周三 下午8:07写道: > 我们也会用幂等处理类似的东西。 > 1.你要么单条数据处理 > 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的 > > > > > > > > > > > > > > > > > > 在 2020-06-03 15:41:44,"1048262223" <1048262...@qq.com> 写道: > >Hi > > > 可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。 > > > > > >Best, > >Yichao Yang > > > > > > > > > >------------------ 原始邮件 ------------------ > >发件人: "Px New"<15701181132mr....@gmail.com>; > >发送时间: 2020年6月3日(星期三) 中午11:35 > >收件人: "user-zh"<user-zh@flink.apache.org>; > > > >主题: Re: flink数据sink到mysql 是事务处理 > > > > > > > >Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你 > >也可发code到我的email 15701181132mr....@gmail.com > > > >1101300123 <hdxg1101300...@163.com> 于2020年4月10日周五 上午11:42写道: > > > >> > >> > >> > 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 > >> 多条记录的操作 > >> sink的invoke代码 > >> @Override > >> public void invoke(Tuple5<String, String, String, String, > >> List<BroadBandReq>> value, Context context) throws Exception { > >> connection.setAutoCommit(false); > >> List<BroadBandReq> f4 = value.f4; > >> for (BroadBandReq rs: f4){ > >> statement.setString(1,rs.getUserId()); > >> statement.setString(2,rs.getPhoneNum()); > >> statement.setString(3,rs.getProvId()); > >> statement.addBatch(); > >> } > >> try { > >> statement.executeBatch(); > >> connection.commit(); > >> }catch (Exception e){ > >> LOG.info(" add data for rds ; operTag:{}, > >> userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, > value.f1, > >> value.f2, value.f3,f4); > >> connection.rollback(); > >> e.printStackTrace(); > >> > throw new Exception(e); > >> } > >> } > >> > >> > >> > >> > >> java.lang.Exception: java.sql.BatchUpdateException: Deadlock found > when > >> trying to get lock; try restarting transaction > >> at > com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73) > >> at > com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18) > >> at > >> > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > >> at > >> > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736) > >> at > >> > org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) > >> at > >> > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162) > >> at > >> > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157) > >> at > >> > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > >> at > >> > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > >> at > >> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) > >> at > org.apache.flink.streaming.runtime.io > >> > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) > >> at > org.apache.flink.streaming.runtime.io > >> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) > >> at > org.apache.flink.streaming.runtime.io > >> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) > >> at > >> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > >> at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > >> at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > >> at > java.lang.Thread.run(Thread.java:748) > >> Caused by: java.sql.BatchUpdateException: Deadlock found when trying > to > >> get lock; try restarting transaction > >> at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > >> Method) > >> at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> at > com.mysql.jdbc.Util.handleNewInstance(Util.java:425) > >> at > com.mysql.jdbc.Util.getInstance(Util.java:408) > >> at > >> > com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262) > >> at > >> com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) > >> at > com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67) > >> ... 33 more > >> Caused by: > >> com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: > Deadlock > >> found when trying to get lock; try restarting transaction > >> at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > >> Method) > >> at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> at > com.mysql.jdbc.Util.handleNewInstance(Util.java:425) > >> at > com.mysql.jdbc.Util.getInstance(Util.java:408) > >> at > com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952) > >> at > com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976) > >> at > com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912) > >> at > com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530) > >> at > com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683) > >> at > com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079) > >> at > >> > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756) > >> ... 36 more > >> [flink-akka.actor.default-dispatcher-783] > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> >