This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
The following commit(s) were added to refs/heads/main by this push: new 7fc202b [FLINK-29750][Connector/JDBC] Improve PostgresCatalog#listTables() by reusing resources. This closes #4 7fc202b is described below commit 7fc202be3dfcdb6510f9855a6943dd97fa2bd3af Author: Mingliang Liu <lium...@apache.org> AuthorDate: Fri May 26 02:37:33 2023 -0700 [FLINK-29750][Connector/JDBC] Improve PostgresCatalog#listTables() by reusing resources. This closes #4 * [FLINK-29750][Connector/JDBC] Improve PostgresCatalog#listTables() by reusing resources --- .../jdbc/catalog/AbstractJdbcCatalog.java | 33 +++++++++------ .../postgres/catalog/PostgresCatalog.java | 49 ++++++++++++---------- 2 files changed, 47 insertions(+), 35 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java index 482ce79..7ba0c06 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java @@ -497,29 +497,36 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog { Predicate<String> filterFunc, Object... params) { - List<String> columnValues = Lists.newArrayList(); - try (Connection conn = DriverManager.getConnection(connUrl, username, pwd); PreparedStatement ps = conn.prepareStatement(sql)) { - if (Objects.nonNull(params) && params.length > 0) { - for (int i = 0; i < params.length; i++) { - ps.setObject(i + 1, params[i]); - } + return extractColumnValuesByStatement(ps, columnIndex, filterFunc, params); + + } catch (Exception e) { + throw new CatalogException( + String.format( + "The following SQL query could not be executed (%s): %s", connUrl, sql), + e); + } + } + + protected static List<String> extractColumnValuesByStatement( + PreparedStatement ps, int columnIndex, Predicate<String> filterFunc, Object... params) + throws SQLException { + List<String> columnValues = Lists.newArrayList(); + if (Objects.nonNull(params) && params.length > 0) { + for (int i = 0; i < params.length; i++) { + ps.setObject(i + 1, params[i]); } - ResultSet rs = ps.executeQuery(); + } + try (ResultSet rs = ps.executeQuery()) { while (rs.next()) { String columnValue = rs.getString(columnIndex); if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) { columnValues.add(columnValue); } } - return columnValues; - } catch (Exception e) { - throw new CatalogException( - String.format( - "The following SQL query could not be executed (%s): %s", connUrl, sql), - e); } + return columnValues; } protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java index 0b5cbeb..61f43ba 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java @@ -32,12 +32,14 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; /** Catalog for PostgreSQL. */ @Internal @@ -107,31 +109,34 @@ public class PostgresCatalog extends AbstractJdbcCatalog { List<String> tables = Lists.newArrayList(); - // get all schemas - List<String> schemas = - extractColumnValuesBySQL( - baseUrl + databaseName, - "SELECT schema_name FROM information_schema.schemata;", - 1, - pgSchema -> !builtinSchemas.contains(pgSchema)); - - // get all tables - for (String schema : schemas) { - // position 1 is database name, position 2 is schema name, position 3 is table name - List<String> pureTables = - extractColumnValuesBySQL( - baseUrl + databaseName, + final String url = baseUrl + databaseName; + try (Connection conn = DriverManager.getConnection(url, username, pwd)) { + // get all schemas + List<String> schemas; + try (PreparedStatement ps = + conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;")) { + schemas = + extractColumnValuesByStatement( + ps, 1, pgSchema -> !builtinSchemas.contains(pgSchema)); + } + + // get all tables + try (PreparedStatement ps = + conn.prepareStatement( "SELECT * FROM information_schema.tables " + "WHERE table_type = 'BASE TABLE' " + "AND table_schema = ? " - + "ORDER BY table_type, table_name;", - 3, - null, - schema); - tables.addAll( - pureTables.stream() + + "ORDER BY table_type, table_name;")) { + for (String schema : schemas) { + // Column index 1 is database name, 2 is schema name, 3 is table name + extractColumnValuesByStatement(ps, 3, null, schema).stream() .map(pureTable -> schema + "." + pureTable) - .collect(Collectors.toList())); + .forEach(tables::add); + } + } + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to list tables for database %s", databaseName), e); } return tables; }