好的 谢谢各位老师的指导
| | 小昌同学 | | ccc0606fight...@163.com | ---- 回复的原邮件 ---- | 发件人 | ron<ld...@zju.edu.cn> | | 发送日期 | 2023年8月10日 00:51 | | 收件人 | <user-zh@flink.apache.org> | | 主题 | Re: 回复: Flink消费MySQL | Hi, 建议通过CDC实时读,然后用Flink的双流Join进行关联。 -----原始邮件----- 发件人: "小昌同学" <ccc0606fight...@163.com> 发送时间: 2023-08-08 11:10:19 (星期二) 收件人: user-zh <user-zh@flink.apache.org> 抄送: user-zh <user-zh@flink.apache.org> 主题: 回复: Flink消费MySQL 谢谢老师指导呀; 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据 老师这一块有更好的建议嘛 | | 小昌同学 | | ccc0606fight...@163.com | ---- 回复的原邮件 ---- | 发件人 | Shammon FY<zjur...@gmail.com> | | 发送日期 | 2023年8月8日 10:37 | | 收件人 | <user-zh@flink.apache.org> | | 主题 | Re: Flink消费MySQL | Hi, 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况 Best, Shammon FY On Mon, Aug 7, 2023 at 5:04 PM 小昌同学 <ccc0606fight...@163.com> wrote: 各位老师好 ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊; 以下是我的代码: | public class MysqlSource2 extends RichSourceFunction<ActionType> { PreparedStatement ps; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql="select * from actiontype;"; ps = connection.prepareStatement(sql); } private static Connection getConnection(){ Connection con=null; String driverClass= FlinkConfig.config.getProperty("driverClass"); String url=FlinkConfig.config.getProperty("jdbcUrl"); String user=FlinkConfig.config.getProperty("jdbcUser"); String passWord=FlinkConfig.config.getProperty("passWord"); try { Class.forName(driverClass); con= DriverManager.getConnection(url,user,passWord); } catch (Exception e) { throw new RuntimeException(e); } return con; } @Override public void run(SourceContext<ActionType> ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()){ ActionType actionType = new ActionType( resultSet.getString("action"), resultSet.getString("action_name") ); ctx.collect(actionType); } } @Override public void close() throws Exception { super.close(); if (null!=connection){ connection.close(); } if (null!=ps){ ps.close(); } } @Override public void cancel() { } }; | | | 小昌同学 | | ccc0606fight...@163.com | ------------------------------ Best, Ron