Hi,

你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏

至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况

Best,
Shammon FY

On Mon, Aug 7, 2023 at 5:04 PM 小昌同学 <ccc0606fight...@163.com> wrote:

> 各位老师好
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> 以下是我的代码:
> |
> public class MysqlSource2 extends RichSourceFunction<ActionType> {
>     PreparedStatement ps;
> private Connection connection;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> connection = getConnection();
>         String sql="select * from actiontype;";
> ps = connection.prepareStatement(sql);
>     }
>
> private static Connection getConnection(){
>         Connection con=null;
>         String driverClass= FlinkConfig.config.getProperty("driverClass");
>         String url=FlinkConfig.config.getProperty("jdbcUrl");
>         String user=FlinkConfig.config.getProperty("jdbcUser");
>         String passWord=FlinkConfig.config.getProperty("passWord");
>
> try {
>             Class.forName(driverClass);
>             con= DriverManager.getConnection(url,user,passWord);
>         } catch (Exception e) {
> throw new RuntimeException(e);
>         }
> return con;
>     }
>
> @Override
> public void run(SourceContext<ActionType> ctx) throws Exception {
>         ResultSet resultSet = ps.executeQuery();
> while (resultSet.next()){
>     ActionType actionType = new ActionType(
>             resultSet.getString("action"),
>             resultSet.getString("action_name")
>     );
>     ctx.collect(actionType);
> }
>     }
>
> @Override
> public void close() throws Exception {
> super.close();
> if (null!=connection){
> connection.close();
>         }
> if (null!=ps){
> ps.close();
>         }
>     }
>
> @Override
> public void cancel() {
>     }
> };
>
>
> |
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |

回复