好的 谢谢各位老师的指导

| |
小昌同学
|
|
ccc0606fight...@163.com
|
---- 回复的原邮件 ----
| 发件人 | ron<ld...@zju.edu.cn> |
| 发送日期 | 2023年8月10日 00:51 |
| 收件人 | <user-zh@flink.apache.org> |
| 主题 | Re: 回复: Flink消费MySQL |

Hi,

建议通过CDC实时读,然后用Flink的双流Join进行关联。

-----原始邮件-----
发件人: "小昌同学" <ccc0606fight...@163.com>
发送时间: 2023-08-08 11:10:19 (星期二)
收件人: user-zh <user-zh@flink.apache.org>
抄送: user-zh <user-zh@flink.apache.org>
主题: 回复: Flink消费MySQL

谢谢老师指导呀;
我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
老师这一块有更好的建议嘛


| |
小昌同学
|
|
ccc0606fight...@163.com
|
---- 回复的原邮件 ----
| 发件人 | Shammon FY<zjur...@gmail.com> |
| 发送日期 | 2023年8月8日 10:37 |
| 收件人 | <user-zh@flink.apache.org> |
| 主题 | Re: Flink消费MySQL |
Hi,

你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏

至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况

Best,
Shammon FY

On Mon, Aug 7, 2023 at 5:04 PM 小昌同学 <ccc0606fight...@163.com> wrote:

各位老师好
,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
以下是我的代码:
|
public class MysqlSource2 extends RichSourceFunction<ActionType> {
PreparedStatement ps;
private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql="select * from actiontype;";
ps = connection.prepareStatement(sql);
}

private static Connection getConnection(){
Connection con=null;
String driverClass= FlinkConfig.config.getProperty("driverClass");
String url=FlinkConfig.config.getProperty("jdbcUrl");
String user=FlinkConfig.config.getProperty("jdbcUser");
String passWord=FlinkConfig.config.getProperty("passWord");

try {
Class.forName(driverClass);
con= DriverManager.getConnection(url,user,passWord);
} catch (Exception e) {
throw new RuntimeException(e);
}
return con;
}

@Override
public void run(SourceContext<ActionType> ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
ActionType actionType = new ActionType(
resultSet.getString("action"),
resultSet.getString("action_name")
);
ctx.collect(actionType);
}
}

@Override
public void close() throws Exception {
super.close();
if (null!=connection){
connection.close();
}
if (null!=ps){
ps.close();
}
}

@Override
public void cancel() {
}
};


|


| |
小昌同学
|
|
ccc0606fight...@163.com
|


------------------------------
Best,
Ron

回复