This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 7fc202b  [FLINK-29750][Connector/JDBC] Improve 
PostgresCatalog#listTables() by reusing resources. This closes #4
7fc202b is described below

commit 7fc202be3dfcdb6510f9855a6943dd97fa2bd3af
Author: Mingliang Liu <lium...@apache.org>
AuthorDate: Fri May 26 02:37:33 2023 -0700

    [FLINK-29750][Connector/JDBC] Improve PostgresCatalog#listTables() by 
reusing resources. This closes #4
    
    * [FLINK-29750][Connector/JDBC] Improve PostgresCatalog#listTables() by 
reusing resources
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 33 +++++++++------
 .../postgres/catalog/PostgresCatalog.java          | 49 ++++++++++++----------
 2 files changed, 47 insertions(+), 35 deletions(-)

diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index 482ce79..7ba0c06 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -497,29 +497,36 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog {
             Predicate<String> filterFunc,
             Object... params) {
 
-        List<String> columnValues = Lists.newArrayList();
-
         try (Connection conn = DriverManager.getConnection(connUrl, username, 
pwd);
                 PreparedStatement ps = conn.prepareStatement(sql)) {
-            if (Objects.nonNull(params) && params.length > 0) {
-                for (int i = 0; i < params.length; i++) {
-                    ps.setObject(i + 1, params[i]);
-                }
+            return extractColumnValuesByStatement(ps, columnIndex, filterFunc, 
params);
+
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "The following SQL query could not be executed 
(%s): %s", connUrl, sql),
+                    e);
+        }
+    }
+
+    protected static List<String> extractColumnValuesByStatement(
+            PreparedStatement ps, int columnIndex, Predicate<String> 
filterFunc, Object... params)
+            throws SQLException {
+        List<String> columnValues = Lists.newArrayList();
+        if (Objects.nonNull(params) && params.length > 0) {
+            for (int i = 0; i < params.length; i++) {
+                ps.setObject(i + 1, params[i]);
             }
-            ResultSet rs = ps.executeQuery();
+        }
+        try (ResultSet rs = ps.executeQuery()) {
             while (rs.next()) {
                 String columnValue = rs.getString(columnIndex);
                 if (Objects.isNull(filterFunc) || 
filterFunc.test(columnValue)) {
                     columnValues.add(columnValue);
                 }
             }
-            return columnValues;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "The following SQL query could not be executed 
(%s): %s", connUrl, sql),
-                    e);
         }
+        return columnValues;
     }
 
     protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData 
metadata, int colIndex)
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java
index 0b5cbeb..61f43ba 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java
@@ -32,12 +32,14 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /** Catalog for PostgreSQL. */
 @Internal
@@ -107,31 +109,34 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
 
         List<String> tables = Lists.newArrayList();
 
-        // get all schemas
-        List<String> schemas =
-                extractColumnValuesBySQL(
-                        baseUrl + databaseName,
-                        "SELECT schema_name FROM information_schema.schemata;",
-                        1,
-                        pgSchema -> !builtinSchemas.contains(pgSchema));
-
-        // get all tables
-        for (String schema : schemas) {
-            // position 1 is database name, position 2 is schema name, 
position 3 is table name
-            List<String> pureTables =
-                    extractColumnValuesBySQL(
-                            baseUrl + databaseName,
+        final String url = baseUrl + databaseName;
+        try (Connection conn = DriverManager.getConnection(url, username, 
pwd)) {
+            // get all schemas
+            List<String> schemas;
+            try (PreparedStatement ps =
+                    conn.prepareStatement("SELECT schema_name FROM 
information_schema.schemata;")) {
+                schemas =
+                        extractColumnValuesByStatement(
+                                ps, 1, pgSchema -> 
!builtinSchemas.contains(pgSchema));
+            }
+
+            // get all tables
+            try (PreparedStatement ps =
+                    conn.prepareStatement(
                             "SELECT * FROM information_schema.tables "
                                     + "WHERE table_type = 'BASE TABLE' "
                                     + "AND table_schema = ? "
-                                    + "ORDER BY table_type, table_name;",
-                            3,
-                            null,
-                            schema);
-            tables.addAll(
-                    pureTables.stream()
+                                    + "ORDER BY table_type, table_name;")) {
+                for (String schema : schemas) {
+                    // Column index 1 is database name, 2 is schema name, 3 is 
table name
+                    extractColumnValuesByStatement(ps, 3, null, 
schema).stream()
                             .map(pureTable -> schema + "." + pureTable)
-                            .collect(Collectors.toList()));
+                            .forEach(tables::add);
+                }
+            }
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed to list tables for database %s", 
databaseName), e);
         }
         return tables;
     }

Reply via email to