我们也会用幂等处理类似的东西。 1.你要么单条数据处理 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的
在 2020-06-03 15:41:44,"1048262223" <1048262...@qq.com> 写道: >Hi > >可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。 > > >Best, >Yichao Yang > > > > >------------------ 原始邮件 ------------------ >发件人: "Px New"<15701181132mr....@gmail.com>; >发送时间: 2020年6月3日(星期三) 中午11:35 >收件人: "user-zh"<user-zh@flink.apache.org>; > >主题: Re: flink数据sink到mysql 是事务处理 > > > >Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你 >也可发code到我的email 15701181132mr....@gmail.com > >1101300123 <hdxg1101300...@163.com> 于2020年4月10日周五 上午11:42写道: > >> >> >> >目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 >> 多条记录的操作 >> sink的invoke代码 >> @Override >> public void invoke(Tuple5<String, String, String, String, >> List<BroadBandReq>> value, Context context) throws Exception { >> connection.setAutoCommit(false); >> List<BroadBandReq> f4 = value.f4; >> for (BroadBandReq rs: f4){ >> statement.setString(1,rs.getUserId()); >> statement.setString(2,rs.getPhoneNum()); >> statement.setString(3,rs.getProvId()); >> statement.addBatch(); >> } >> try { >> statement.executeBatch(); >> connection.commit(); >> }catch (Exception e){ >> LOG.info(" add data for rds ; operTag:{}, >> userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, >> value.f2, value.f3,f4); >> connection.rollback(); >> e.printStackTrace(); >> >throw new Exception(e); >> } >> } >> >> >> >> >> java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when >> trying to get lock; try restarting transaction >> at >com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73) >> at >com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18) >> at >> >org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >> at >> >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) >> at >> >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) >> at >> >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) >> at >> >org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) >> at >> >org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) >> at >> >org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) >> at >> >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) >> at >> >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627) >> at >> >org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718) >> at >> >org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736) >> at >> >org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) >> at >> >com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162) >> at >> >com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157) >> at >> >org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) >> at >> >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) >> at >> >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) >> at >> >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) >> at >> >org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) >> at >> >org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) >> at >> >org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) >> at >> >org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) >> at >org.apache.flink.streaming.runtime.io >> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) >> at >org.apache.flink.streaming.runtime.io >> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) >> at >org.apache.flink.streaming.runtime.io >> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) >> at >> >org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) >> at >> >org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) >> at >> >org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) >> at >> >org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) >> at >org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> at >org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> at >java.lang.Thread.run(Thread.java:748) >> Caused by: java.sql.BatchUpdateException: Deadlock found when trying to >> get lock; try restarting transaction >> at >sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> >sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> >sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at >java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >com.mysql.jdbc.Util.handleNewInstance(Util.java:425) >> at >com.mysql.jdbc.Util.getInstance(Util.java:408) >> at >> com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163) >> at >> >com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772) >> at >> >com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262) >> at >> com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) >> at >com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67) >> ... 33 more >> Caused by: >> com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: >Deadlock >> found when trying to get lock; try restarting transaction >> at >sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> >sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> >sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at >java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >com.mysql.jdbc.Util.handleNewInstance(Util.java:425) >> at >com.mysql.jdbc.Util.getInstance(Util.java:408) >> at >com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952) >> at >com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976) >> at >com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912) >> at >com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530) >> at >com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683) >> at >com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) >> at >> >com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) >> at >> >com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079) >> at >> >com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756) >> ... 36 more >> [flink-akka.actor.default-dispatcher-783] >> >> >> >> >> >> >> >> >> >> >> >>