This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
commit d8c803af8921d4c26865143f5ea3976f43a163f2 Author: Shimin Huang <40719512+coll...@users.noreply.github.com> AuthorDate: Wed May 31 00:37:14 2023 +0800 [BAHIR-324] Closing KuduReader at JobManager --- .../kudu/format/AbstractKuduInputFormat.java | 22 +++++++++++++++------- .../function/lookup/KuduRowDataLookupFunction.java | 2 +- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java index 0cdf570..4976241 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java @@ -104,19 +104,23 @@ public abstract class AbstractKuduInputFormat<T> extends RichInputFormat<T, Kudu } } + private void closeKuduReader() throws IOException { + if (kuduReader != null) { + kuduReader.close(); + kuduReader = null; + } + } + @Override public void close() throws IOException { if (resultIterator != null) { try { resultIterator.close(); } catch (KuduException e) { - e.printStackTrace(); + log.error("Error while closing reader iterator.", e); } } - if (kuduReader != null) { - kuduReader.close(); - kuduReader = null; - } + closeKuduReader(); } @Override @@ -131,8 +135,12 @@ public abstract class AbstractKuduInputFormat<T> extends RichInputFormat<T, Kudu @Override public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException { - startKuduReader(); - return kuduReader.createInputSplits(minNumSplits); + try { + startKuduReader(); + return kuduReader.createInputSplits(minNumSplits); + } finally { + closeKuduReader(); + } } @Override diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java index d54f23a..d29588f 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java @@ -107,7 +107,7 @@ public class KuduRowDataLookupFunction extends TableFunction<RowData> { ArrayList<RowData> rows = new ArrayList<>(); for (KuduInputSplit inputSplit : inputSplits) { KuduReaderIterator<RowData> scanner = kuduReader.scanner(inputSplit.getScanToken()); - // 没有启用cache + // not use cache if (cache == null) { while (scanner.hasNext()) { collect(scanner.next());