flink数据sink到mysql 是事务处理

2020-04-09 Thread 1101300123


目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
 多条记录的操作
sink的invoke代码
@Override
public void invoke(Tuple5> 
value, Context context) throws Exception {
connection.setAutoCommit(false);
List f4 = value.f4;
for (BroadBandReq rs: f4){
statement.setString(1,rs.getUserId());
statement.setString(2,rs.getPhoneNum());
statement.setString(3,rs.getProvId());
statement.addBatch();
}
try {
statement.executeBatch();
connection.commit();
}catch (Exception e){
LOG.info(" add data for rds ; operTag:{}, userId:{},removeTag:{},stateCode:{}, 
phoneList:{}", value.f0, value.f1, value.f2, value.f3,f4);
connection.rollback();
e.printStackTrace();
throw new Exception(e);
}
}




java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when trying 
to get lock; try restarting transaction
at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
at 
org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
at 
com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
at 
com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.BatchUpdateException: Deadlock found when trying to get 
lock; try restarting transaction
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstruct

Re: flink数据sink到mysql 是事务处理

2020-06-02 Thread Px New
Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
也可发code到我的email 15701181132mr@gmail.com😀

1101300123  于2020年4月10日周五 上午11:42写道:

>
>
> 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
> 多条记录的操作
> sink的invoke代码
> @Override
> public void invoke(Tuple5 List> value, Context context) throws Exception {
> connection.setAutoCommit(false);
> List f4 = value.f4;
> for (BroadBandReq rs: f4){
> statement.setString(1,rs.getUserId());
> statement.setString(2,rs.getPhoneNum());
> statement.setString(3,rs.getProvId());
> statement.addBatch();
> }
> try {
> statement.executeBatch();
> connection.commit();
> }catch (Exception e){
> LOG.info(" add data for rds ; operTag:{},
> userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1,
> value.f2, value.f3,f4);
> connection.rollback();
> e.printStackTrace();
> throw new Exception(e);
> }
> }
>
>
>
>
> java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when
> trying to get lock; try restarting transaction
> at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
> at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
> at
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
> at
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
> at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.sql.BatchU

回复: flink数据sink到mysql 是事务处理

2020-06-03 Thread 1048262223
Hi
      
可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。


Best,
Yichao Yang




-- 原始邮件 --
发件人: "Px New"<15701181132mr@gmail.com>;
发送时间: 2020年6月3日(星期三) 中午11:35
收件人: "user-zh"

Re: Re: flink数据sink到mysql 是事务处理

2020-06-04 Thread hdxg1101300...@163.com
您好:

我是在样的情况,flink处理完数据后数据被我组织成一个3元组,第一个元素代表的是对这个元素的操作(插入还是删除),第二个是我的user_ID,第三个是一个list里面有多个实体;

数据要插入或者删除,因为list的大小不确定所以在操作mysql的时候开起了事务保证这一批次的数据幂等,但是程序运行一段时间后出现死锁,但是不影响数据,所以发邮件想知道大家
是怎么处理的,结果这么多天才有回复讨论;
伪代码如下:

public RdsOperaterSink2(String url, String name, String password) {
this.url = url;
this.name = name;
this.password = password;
}

private transient volatile PreparedStatement statement;
private transient volatile Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("com.mysql.jdbc.Driver");
connection = DriverManager.getConnection(url,name,password);
connection.setAutoCommit(false);
System.out.println("connection ---" + connection);
}

@Override
public void invoke(Tuple5> value, Context context) throws Exception {
List f4 = value.f4;
String operType = value.f0;
if(operType.equals(BusiConst.D_OPER_TYPE)){
statement = connection.prepareStatement(BusiConst.DELETESQL);
//LOG.info(" delete data for rds ; operTag:{}, 
userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, 
value.f2, value.f3,f4);
for (BroadBandReq rs: f4){
statement.setString(1,rs.getPhoneNum());
statement.setString(2,rs.getProvId());
statement.addBatch();
}
}else{
statement = connection.prepareStatement(BusiConst.ADDSQL);
//LOG.info(" add data for rds ; operTag:{}, 
userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, 
value.f2, value.f3,f4);
for (BroadBandReq rs: f4){
statement.setString(1,rs.getUserId());
statement.setString(2,rs.getPhoneNum());
statement.setString(3,rs.getProvId());
statement.addBatch();
}
}
try {
statement.executeBatch();
connection.commit();
}catch (Exception e){
LOG.info("  data for rds ; operTag:{}, 
userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, 
value.f2, value.f3,f4);
System.out.println("  data for rds :  " +  value);
System.out.println("invoke  ---" + connection);
connection.rollback();
LOG.error(e.getMessage());
e.printStackTrace();
throw new Exception(e);
}
}

@Override
public void close() throws Exception {
super.close();
statement.close();
connection.close();
}



hdxg1101300...@163.com
 
发件人: godfrey he
发送时间: 2020-06-03 21:59
收件人: user-zh; greemqqran; 15701181132mr.liu; hdxg1101300123
主题: Re: 回复: flink数据sink到mysql 是事务处理
hi greemqq...@163.com,15701181132mr@gmail.com
能详细介绍一下 幂等 处理的场景吗? 例如通过upsert语句能做到幂等
 
hi hdxg1101300...@163.com,
你的sink function的一个statement batch里有insert,delete等语句混合的情况?是用的是flink
sql,还是datastream?
 
Bests,
Godfrey
 
Bests,
Godfrey
 
Michael Ran  于2020年6月3日周三 下午8:07写道:
 
> 我们也会用幂等处理类似的东西。
> 1.你要么单条数据处理
> 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-03 15:41:44,"1048262223" <1048262...@qq.com> 写道:
> >Hi
> >     
> 可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。
> >
> >
> >Best,
> >Yichao Yang
> >
> >
> >
> >
> >-- 原始邮件 --
> >发件人: "Px New"<15701181132mr@gmail.com>;
> >发送时间: 2020年6月3日(星期三) 中午11:35
> >收件人: "user-zh" >
> >主题: Re: flink数据sink到mysql 是事务处理
> >
> >
> >
> >Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
> >也可发code到我的email 15701181132mr@gmail.com
> >
> >1101300123  >
> >>
> >>
> >>
> 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
> >> 多条记录的操作
> >> sink的invoke代码
> >> @Override
> >> public void invoke(Tuple5 >> List >> connection.setAutoCommit(false);
> >> List >> for (BroadBandReq rs: f4){
> >> statement.setString(1,rs.getUserId());
> >> statement.setString(2,rs.getPhoneNum());
> >> statement.setString(3,rs.getProvId());
> >> statement.addBatch();
> >> }
> >> try {
> >> statement.executeBatch();
> >> connection.commit();
> >> }catch (Exception e){
> >> LOG.info(" add data for rds ; operTag:{},
> >> userId:{},removeTag:{},state

Re:回复: flink数据sink到mysql 是事务处理

2020-06-03 Thread Michael Ran
我们也会用幂等处理类似的东西。
1.你要么单条数据处理
2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的

















在 2020-06-03 15:41:44,"1048262223" <1048262...@qq.com> 写道:
>Hi
>      
>可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。
>
>
>Best,
>Yichao Yang
>
>
>
>
>-- 原始邮件 --
>发件人: "Px New"<15701181132mr@gmail.com>;
>发送时间: 2020年6月3日(星期三) 中午11:35
>收件人: "user-zh"
>主题: Re: flink数据sink到mysql 是事务处理
>
>
>
>Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
>也可发code到我的email 15701181132mr@gmail.com
>
>1101300123 
>>
>>
>> 
>目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
>> 多条记录的操作
>> sink的invoke代码
>> @Override
>> public void invoke(Tuple5> List> connection.setAutoCommit(false);
>> List> for (BroadBandReq rs: f4){
>> statement.setString(1,rs.getUserId());
>> statement.setString(2,rs.getPhoneNum());
>> statement.setString(3,rs.getProvId());
>> statement.addBatch();
>> }
>> try {
>> statement.executeBatch();
>> connection.commit();
>> }catch (Exception e){
>> LOG.info(" add data for rds ; operTag:{},
>> userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1,
>> value.f2, value.f3,f4);
>> connection.rollback();
>> e.printStackTrace();
>> 
>throw new Exception(e);
>> }
>> }
>>
>>
>>
>>
>> java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when
>> trying to get lock; try restarting transaction
>> at 
>com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
>> at 
>com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
>> at
>> 
>org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>> at
>> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>> at
>> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>> at
>> 
>org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>> at
>> 
>org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>> at
>> 
>org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>> at
>> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>> at
>> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
>> at
>> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
>> at
>> 
>org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
>> at
>> 
>org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
>> at
>> 
>com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
>> at
>> 
>com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
>> at
>> 
>org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>> at
>> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>> at
>> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>> at
>> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>> at
>> 
>org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>> at
>> 
>org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>> at
>> 
>org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFi

Re: 回复: flink数据sink到mysql 是事务处理

2020-06-03 Thread godfrey he
hi greemqq...@163.com,15701181132mr@gmail.com
能详细介绍一下 幂等 处理的场景吗? 例如通过upsert语句能做到幂等

hi hdxg1101300...@163.com,
你的sink function的一个statement batch里有insert,delete等语句混合的情况?是用的是flink
sql,还是datastream?

Bests,
Godfrey

Bests,
Godfrey

Michael Ran  于2020年6月3日周三 下午8:07写道:

> 我们也会用幂等处理类似的东西。
> 1.你要么单条数据处理
> 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-03 15:41:44,"1048262223" <1048262...@qq.com> 写道:
> >Hi
> >     
> 可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。
> >
> >
> >Best,
> >Yichao Yang
> >
> >
> >
> >
> >------ 原始邮件 ------
> >发件人: "Px New"<15701181132mr@gmail.com>;
> >发送时间: 2020年6月3日(星期三) 中午11:35
> >收件人: "user-zh" >
> >主题: Re: flink数据sink到mysql 是事务处理
> >
> >
> >
> >Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
> >也可发code到我的email 15701181132mr@gmail.com
> >
> >1101300123  >
> >>
> >>
> >>
> 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
> >> 多条记录的操作
> >> sink的invoke代码
> >> @Override
> >> public void invoke(Tuple5 >> List >> connection.setAutoCommit(false);
> >> List >> for (BroadBandReq rs: f4){
> >> statement.setString(1,rs.getUserId());
> >> statement.setString(2,rs.getPhoneNum());
> >> statement.setString(3,rs.getProvId());
> >> statement.addBatch();
> >> }
> >> try {
> >> statement.executeBatch();
> >> connection.commit();
> >> }catch (Exception e){
> >> LOG.info(" add data for rds ; operTag:{},
> >> userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0,
> value.f1,
> >> value.f2, value.f3,f4);
> >> connection.rollback();
> >> e.printStackTrace();
> >>
> throw new Exception(e);
> >> }
> >> }
> >>
> >>
> >>
> >>
> >> java.lang.Exception: java.sql.BatchUpdateException: Deadlock found
> when
> >> trying to get lock; try restarting transaction
> >> at
> com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
> >> at
> com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
> >> at
> >>
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
> >> at
> >>
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
> >> at
> >>
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
> >> at
> >>
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> >> at
> >>
>