Hi, 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏
至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况 Best, Shammon FY On Mon, Aug 7, 2023 at 5:04 PM 小昌同学 <ccc0606fight...@163.com> wrote: > 各位老师好 > ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊; > 以下是我的代码: > | > public class MysqlSource2 extends RichSourceFunction<ActionType> { > PreparedStatement ps; > private Connection connection; > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > connection = getConnection(); > String sql="select * from actiontype;"; > ps = connection.prepareStatement(sql); > } > > private static Connection getConnection(){ > Connection con=null; > String driverClass= FlinkConfig.config.getProperty("driverClass"); > String url=FlinkConfig.config.getProperty("jdbcUrl"); > String user=FlinkConfig.config.getProperty("jdbcUser"); > String passWord=FlinkConfig.config.getProperty("passWord"); > > try { > Class.forName(driverClass); > con= DriverManager.getConnection(url,user,passWord); > } catch (Exception e) { > throw new RuntimeException(e); > } > return con; > } > > @Override > public void run(SourceContext<ActionType> ctx) throws Exception { > ResultSet resultSet = ps.executeQuery(); > while (resultSet.next()){ > ActionType actionType = new ActionType( > resultSet.getString("action"), > resultSet.getString("action_name") > ); > ctx.collect(actionType); > } > } > > @Override > public void close() throws Exception { > super.close(); > if (null!=connection){ > connection.close(); > } > if (null!=ps){ > ps.close(); > } > } > > @Override > public void cancel() { > } > }; > > > | > > > | | > 小昌同学 > | > | > ccc0606fight...@163.com > |