各位老师好 ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊; 以下是我的代码: | public class MysqlSource2 extends RichSourceFunction<ActionType> { PreparedStatement ps; private Connection connection;
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql="select * from actiontype;"; ps = connection.prepareStatement(sql); } private static Connection getConnection(){ Connection con=null; String driverClass= FlinkConfig.config.getProperty("driverClass"); String url=FlinkConfig.config.getProperty("jdbcUrl"); String user=FlinkConfig.config.getProperty("jdbcUser"); String passWord=FlinkConfig.config.getProperty("passWord"); try { Class.forName(driverClass); con= DriverManager.getConnection(url,user,passWord); } catch (Exception e) { throw new RuntimeException(e); } return con; } @Override public void run(SourceContext<ActionType> ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()){ ActionType actionType = new ActionType( resultSet.getString("action"), resultSet.getString("action_name") ); ctx.collect(actionType); } } @Override public void close() throws Exception { super.close(); if (null!=connection){ connection.close(); } if (null!=ps){ ps.close(); } } @Override public void cancel() { } }; | | | 小昌同学 | | ccc0606fight...@163.com |