各位老师好 
,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
以下是我的代码:
|
public class MysqlSource2 extends RichSourceFunction<ActionType> {
    PreparedStatement ps;
private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
        String sql="select * from actiontype;";
ps = connection.prepareStatement(sql);
    }

private static Connection getConnection(){
        Connection con=null;
        String driverClass= FlinkConfig.config.getProperty("driverClass");
        String url=FlinkConfig.config.getProperty("jdbcUrl");
        String user=FlinkConfig.config.getProperty("jdbcUser");
        String passWord=FlinkConfig.config.getProperty("passWord");

try {
            Class.forName(driverClass);
            con= DriverManager.getConnection(url,user,passWord);
        } catch (Exception e) {
throw new RuntimeException(e);
        }
return con;
    }

@Override
public void run(SourceContext<ActionType> ctx) throws Exception {
        ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
    ActionType actionType = new ActionType(
            resultSet.getString("action"),
            resultSet.getString("action_name")
    );
    ctx.collect(actionType);
}
    }

@Override
public void close() throws Exception {
super.close();
if (null!=connection){
connection.close();
        }
if (null!=ps){
ps.close();
        }
    }

@Override
public void cancel() {
    }
};


|


| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复