我自己写了个 Sink 到数据库的 SinkFunction,SinkFunction 中指定只有数据到了一定条数(100) 才执行入库操作。我通过定义了一个 List 缓存需要入库的数据的方式实现。
public class SinkToJDBCWithJDBCStatementBatch extends RichSinkFunction<JDBCStatement> { private List<JDBCStatement> statementList = new ArrayList<JDBCStatement>(); @Override public void close() throws Exception { writeToDatabase(); this.statementList.clear(); super.close(); if (dataSource != null) { dataSource.close(); } } @Override public void invoke(JDBCStatement statement, Context context) throws Exception { if (statementList.size() < 100) { statementList.add(statement); return; } writeToDatabase(); this.statementList.clear(); } public void writeToDatabase(){ ......... } } 我想确认一下 这个 close() 方法在程序停止的时候一定会被调用到吗?是通过怎样的机制实现的呢? 谢谢, 王磊