This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit d8c803af8921d4c26865143f5ea3976f43a163f2
Author: Shimin Huang <40719512+coll...@users.noreply.github.com>
AuthorDate: Wed May 31 00:37:14 2023 +0800

    [BAHIR-324] Closing KuduReader at JobManager
---
 .../kudu/format/AbstractKuduInputFormat.java       | 22 +++++++++++++++-------
 .../function/lookup/KuduRowDataLookupFunction.java |  2 +-
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
index 0cdf570..4976241 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
@@ -104,19 +104,23 @@ public abstract class AbstractKuduInputFormat<T> extends 
RichInputFormat<T, Kudu
         }
     }
 
+    private void closeKuduReader() throws IOException {
+        if (kuduReader != null) {
+            kuduReader.close();
+            kuduReader = null;
+        }
+    }
+
     @Override
     public void close() throws IOException {
         if (resultIterator != null) {
             try {
                 resultIterator.close();
             } catch (KuduException e) {
-                e.printStackTrace();
+                log.error("Error while closing reader iterator.", e);
             }
         }
-        if (kuduReader != null) {
-            kuduReader.close();
-            kuduReader = null;
-        }
+        closeKuduReader();
     }
 
     @Override
@@ -131,8 +135,12 @@ public abstract class AbstractKuduInputFormat<T> extends 
RichInputFormat<T, Kudu
 
     @Override
     public KuduInputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
-        startKuduReader();
-        return kuduReader.createInputSplits(minNumSplits);
+        try {
+            startKuduReader();
+            return kuduReader.createInputSplits(minNumSplits);
+        } finally {
+            closeKuduReader();
+        }
     }
 
     @Override
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
index d54f23a..d29588f 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
@@ -107,7 +107,7 @@ public class KuduRowDataLookupFunction extends 
TableFunction<RowData> {
                 ArrayList<RowData> rows = new ArrayList<>();
                 for (KuduInputSplit inputSplit : inputSplits) {
                     KuduReaderIterator<RowData> scanner = 
kuduReader.scanner(inputSplit.getScanToken());
-                    // 没有启用cache
+                    // not use cache
                     if (cache == null) {
                         while (scanner.hasNext()) {
                             collect(scanner.next());

Reply via email to