This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4a8f60f [BAHIR-324] Closing KuduReader at JobManager
4a8f60f is described below
commit 4a8f60f03f7caeb5421c7c88fa34a3ba419b61ee
Author: Shimin Huang <[email protected]>
AuthorDate: Wed May 31 00:37:14 2023 +0800
[BAHIR-324] Closing KuduReader at JobManager
---
.../kudu/format/AbstractKuduInputFormat.java | 22 +++++++++++++++-------
.../function/lookup/KuduRowDataLookupFunction.java | 2 +-
2 files changed, 16 insertions(+), 8 deletions(-)
diff --git
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
index 0cdf570..4976241 100644
---
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
+++
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
@@ -104,19 +104,23 @@ public abstract class AbstractKuduInputFormat<T> extends
RichInputFormat<T, Kudu
}
}
+ private void closeKuduReader() throws IOException {
+ if (kuduReader != null) {
+ kuduReader.close();
+ kuduReader = null;
+ }
+ }
+
@Override
public void close() throws IOException {
if (resultIterator != null) {
try {
resultIterator.close();
} catch (KuduException e) {
- e.printStackTrace();
+ log.error("Error while closing reader iterator.", e);
}
}
- if (kuduReader != null) {
- kuduReader.close();
- kuduReader = null;
- }
+ closeKuduReader();
}
@Override
@@ -131,8 +135,12 @@ public abstract class AbstractKuduInputFormat<T> extends
RichInputFormat<T, Kudu
@Override
public KuduInputSplit[] createInputSplits(int minNumSplits) throws
IOException {
- startKuduReader();
- return kuduReader.createInputSplits(minNumSplits);
+ try {
+ startKuduReader();
+ return kuduReader.createInputSplits(minNumSplits);
+ } finally {
+ closeKuduReader();
+ }
}
@Override
diff --git
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
index d54f23a..d29588f 100644
---
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
+++
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
@@ -107,7 +107,7 @@ public class KuduRowDataLookupFunction extends
TableFunction<RowData> {
ArrayList<RowData> rows = new ArrayList<>();
for (KuduInputSplit inputSplit : inputSplits) {
KuduReaderIterator<RowData> scanner =
kuduReader.scanner(inputSplit.getScanToken());
- // 没有启用cache
+ // not use cache
if (cache == null) {
while (scanner.hasNext()) {
collect(scanner.next());