This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 74b8bdee9fb0bf7cfc27ca8d992dac2a07473a0c Author: bowen.li <bowenl...@gmail.com> AuthorDate: Fri Mar 6 14:05:27 2020 -0800 [FLINK-16471][jdbc] develop JDBCCatalog and PostgresCatalog closes #11336 --- .../src/test/resources/log4j2-test.properties | 28 -- flink-connectors/flink-jdbc/pom.xml | 36 ++- .../java/io/jdbc/catalog/AbstractJDBCCatalog.java | 277 ++++++++++++++++++ .../api/java/io/jdbc/catalog/JDBCCatalog.java | 84 ++++++ .../api/java/io/jdbc/catalog/JDBCCatalogUtils.java | 54 ++++ .../api/java/io/jdbc/catalog/PostgresCatalog.java | 323 ++++++++++++++++++++ .../java/io/jdbc/catalog/PostgresTablePath.java | 95 ++++++ .../api/java/io/jdbc/dialect/JDBCDialects.java | 10 +- .../java/io/jdbc/catalog/JDBCCatalogUtilsTest.java | 44 +++ .../io/jdbc/catalog/PostgresCatalogITCase.java | 325 +++++++++++++++++++++ .../io/jdbc/catalog/PostgresTablePathTest.java | 33 +++ 11 files changed, 1275 insertions(+), 34 deletions(-) diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties deleted file mode 100644 index 835c2ec..0000000 --- a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties +++ /dev/null @@ -1,28 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Set root logger level to OFF to not flood build logs -# set manually to INFO for debugging purposes -rootLogger.level = OFF -rootLogger.appenderRef.test.ref = TestLogger - -appender.testlogger.name = TestLogger -appender.testlogger.type = CONSOLE -appender.testlogger.target = SYSTEM_ERR -appender.testlogger.layout.type = PatternLayout -appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml index 3e83311..cb7afab 100644 --- a/flink-connectors/flink-jdbc/pom.xml +++ b/flink-connectors/flink-jdbc/pom.xml @@ -35,6 +35,11 @@ under the License. <packaging>jar</packaging> + <properties> + <postgres.version>42.2.10</postgres.version> + <otj-pg-embedded.version>0.13.3</otj-pg-embedded.version> + </properties> + <dependencies> <!-- Table ecosystem --> <!-- Projects depending on this project won't depend on flink-table-*. --> @@ -53,13 +58,17 @@ under the License. <scope>provided</scope> </dependency> + <!-- Postgres dependencies --> + <dependency> - <groupId>org.apache.derby</groupId> - <artifactId>derby</artifactId> - <version>10.14.2.0</version> - <scope>test</scope> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>${postgres.version}</version> + <scope>provided</scope> </dependency> + <!-- test dependencies --> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_${scala.binary.version}</artifactId> @@ -89,5 +98,24 @@ under the License. <version>${project.version}</version> <scope>test</scope> </dependency> + + <!-- Postgres test dependencies --> + + <dependency> + <groupId>com.opentable.components</groupId> + <artifactId>otj-pg-embedded</artifactId> + <version>${otj-pg-embedded.version}</version> + <scope>test</scope> + </dependency> + + <!-- Derby test dependencies --> + + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>10.14.2.0</version> + <scope>test</scope> + </dependency> + </dependencies> </project> diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java new file mode 100644 index 0000000..523de83 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.TableFactory; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Abstract catalog for any JDBC catalogs. + */ +public abstract class AbstractJDBCCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCCatalog.class); + + protected final String username; + protected final String pwd; + protected final String baseUrl; + protected final String defaultUrl; + + public AbstractJDBCCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { + super(catalogName, defaultDatabase); + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(username)); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd)); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl)); + + JDBCCatalogUtils.validateJDBCUrl(baseUrl); + + this.username = username; + this.pwd = pwd; + this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; + this.defaultUrl = baseUrl + defaultDatabase; + } + + @Override + public void open() throws CatalogException { + // test connection, fail early if we cannot connect to database + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + } catch (SQLException e) { + throw new ValidationException( + String.format("Failed connecting to %s via JDBC.", defaultUrl), e); + } + + LOG.info("Catalog {} established connection to {}", getName(), defaultUrl); + } + + @Override + public void close() throws CatalogException { + LOG.info("Catalog {} closing", getName()); + } + + // ------ table factory ------ + + public Optional<TableFactory> getTableFactory() { + return Optional.of(new JDBCTableSourceSinkFactory()); + } + + // ------ databases ------ + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + + return listDatabases().contains(databaseName); + } + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ tables and views ------ + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + // ------ partitions ------ + + @Override + public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ functions ------ + + @Override + public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ stats ------ + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java new file mode 100644 index 0000000..629412c --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Catalogs for relational databases via JDBC. + */ +@PublicEvolving +public class JDBCCatalog extends AbstractJDBCCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(JDBCCatalog.class); + + private final Catalog internal; + + public JDBCCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { + super(catalogName, defaultDatabase, username, pwd, baseUrl); + + internal = JDBCCatalogUtils.createCatalog(catalogName, defaultDatabase, username, pwd, baseUrl); + } + + // ------ databases ----- + + @Override + public List<String> listDatabases() throws CatalogException { + return internal.listDatabases(); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + return internal.getDatabase(databaseName); + } + + // ------ tables and views ------ + + @Override + public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + return internal.listTables(databaseName); + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { + return internal.getTable(tablePath); + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + try { + return databaseExists(tablePath.getDatabaseName()) && + listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName()); + } catch (DatabaseNotExistException e) { + return false; + } + } +} diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java new file mode 100644 index 0000000..b9e3a19 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Utils for {@link JDBCCatalog}. + */ +public class JDBCCatalogUtils { + /** + * URL has to be without database, like "jdbc:postgresql://localhost:5432/" or "jdbc:postgresql://localhost:5432" + * rather than "jdbc:postgresql://localhost:5432/db". + */ + public static void validateJDBCUrl(String url) { + String[] parts = url.trim().split("\\/+"); + + checkArgument(parts.length == 2); + } + + /** + * Create catalog instance from given information. + */ + public static AbstractJDBCCatalog createCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { + JDBCDialect dialect = JDBCDialects.get(baseUrl).get(); + + if (dialect instanceof JDBCDialects.PostgresDialect) { + return new PostgresCatalog(catalogName, defaultDatabase, username, pwd, baseUrl); + } else { + throw new UnsupportedOperationException( + String.format("Catalog for '%s' is not supported yet.", dialect) + ); + } + } +} diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java new file mode 100644 index 0000000..d12f254 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.types.DataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Catalog for PostgreSQL. + */ +@Internal +public class PostgresCatalog extends AbstractJDBCCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(PostgresCatalog.class); + + public static final String POSTGRES_TABLE_TYPE = "postgres"; + + public static final String DEFAULT_DATABASE = "postgres"; + + // ------ Postgres default objects that shouldn't be exposed to users ------ + + private static final Set<String> builtinDatabases = new HashSet<String>() {{ + add("template0"); + add("template1"); + }}; + + private static final Set<String> builtinSchemas = new HashSet<String>() {{ + add("pg_toast"); + add("pg_temp_1"); + add("pg_toast_temp_1"); + add("pg_catalog"); + add("information_schema"); + }}; + + protected PostgresCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { + super(catalogName, defaultDatabase, username, pwd, baseUrl); + } + + // ------ databases ------ + + @Override + public List<String> listDatabases() throws CatalogException { + List<String> pgDatabases = new ArrayList<>(); + + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + + PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;"); + + ResultSet rs = ps.executeQuery(); + + while (rs.next()) { + String dbName = rs.getString(1); + if (!builtinDatabases.contains(dbName)) { + pgDatabases.add(rs.getString(1)); + } + } + + return pgDatabases; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", getName()), e); + } + } + + @Override + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + if (listDatabases().contains(databaseName)) { + return new CatalogDatabaseImpl(Collections.emptyMap(), null); + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + // ------ tables ------ + + @Override + public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + // get all schemas + try (Connection conn = DriverManager.getConnection(baseUrl + databaseName, username, pwd)) { + PreparedStatement ps = conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;"); + + ResultSet rs = ps.executeQuery(); + + List<String> schemas = new ArrayList<>(); + + while (rs.next()) { + String pgSchema = rs.getString(1); + if (!builtinSchemas.contains(pgSchema)) { + schemas.add(pgSchema); + } + } + + List<String> tables = new ArrayList<>(); + + for (String schema : schemas) { + PreparedStatement stmt = conn.prepareStatement( + "SELECT * \n" + + "FROM information_schema.tables \n" + + "WHERE table_type = 'BASE TABLE' \n" + + " AND table_schema = ? \n" + + "ORDER BY table_type, table_name;"); + + stmt.setString(1, schema); + + ResultSet rstables = stmt.executeQuery(); + + while (rstables.next()) { + // position 1 is database name, position 2 is schema name, position 3 is table name + tables.add(schema + "." + rstables.getString(3)); + } + } + + return tables; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", getName()), e); + } + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + PostgresTablePath pgPath = PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()); + + try (Connection conn = DriverManager.getConnection(baseUrl + tablePath.getDatabaseName(), username, pwd)) { + + PreparedStatement ps = conn.prepareStatement( + String.format("SELECT * FROM %s;", pgPath.getFullPath())); + + ResultSetMetaData rsmd = ps.getMetaData(); + + String[] names = new String[rsmd.getColumnCount()]; + DataType[] types = new DataType[rsmd.getColumnCount()]; + + for (int i = 1; i <= rsmd.getColumnCount(); i++) { + names[i - 1] = rsmd.getColumnName(i); + types[i - 1] = fromJDBCType(rsmd, i); + } + + TableSchema tableSchema = new TableSchema.Builder().fields(names, types).build(); + + return new CatalogTableImpl( + tableSchema, + new HashMap<>(), + "" + ); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting table %s", tablePath.getFullName()), e); + } + } + + public static final String PG_BYTEA = "bytea"; + public static final String PG_BYTEA_ARRAY = "_bytea"; + public static final String PG_SMALLINT = "int2"; + public static final String PG_SMALLINT_ARRAY = "_int2"; + public static final String PG_INTEGER = "int4"; + public static final String PG_INTEGER_ARRAY = "_int4"; + public static final String PG_BIGINT = "int8"; + public static final String PG_BIGINT_ARRAY = "_int8"; + public static final String PG_REAL = "float4"; + public static final String PG_REAL_ARRAY = "_float4"; + public static final String PG_DOUBLE_PRECISION = "float8"; + public static final String PG_DOUBLE_PRECISION_ARRAY = "_float8"; + public static final String PG_NUMERIC = "numeric"; + public static final String PG_NUMERIC_ARRAY = "_numeric"; + public static final String PG_BOOLEAN = "bool"; + public static final String PG_BOOLEAN_ARRAY = "_bool"; + public static final String PG_TIMESTAMP = "timestamp"; + public static final String PG_TIMESTAMP_ARRAY = "_timestamp"; + public static final String PG_TIMESTAMPTZ = "timestamptz"; + public static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz"; + public static final String PG_DATE = "date"; + public static final String PG_DATE_ARRAY = "_date"; + public static final String PG_TIME = "time"; + public static final String PG_TIME_ARRAY = "_time"; + public static final String PG_TEXT = "text"; + public static final String PG_TEXT_ARRAY = "_text"; + public static final String PG_CHAR = "bpchar"; + public static final String PG_CHAR_ARRAY = "_bpchar"; + public static final String PG_CHARACTER = "character"; + public static final String PG_CHARACTER_ARRAY = "_character"; + public static final String PG_CHARACTER_VARYING = "varchar"; + public static final String PG_CHARACTER_VARYING_ARRAY = "_varchar"; + + private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException { + String pgType = metadata.getColumnTypeName(colIndex); + + int precision = metadata.getPrecision(colIndex); + + switch (pgType) { + case PG_BOOLEAN: + return DataTypes.BOOLEAN(); + case PG_BOOLEAN_ARRAY: + return DataTypes.ARRAY(DataTypes.BOOLEAN()); + case PG_BYTEA: + return DataTypes.BYTES(); + case PG_BYTEA_ARRAY: + return DataTypes.ARRAY(DataTypes.BYTES()); + case PG_SMALLINT: + return DataTypes.SMALLINT(); + case PG_SMALLINT_ARRAY: + return DataTypes.ARRAY(DataTypes.SMALLINT()); + case PG_INTEGER: + return DataTypes.INT(); + case PG_INTEGER_ARRAY: + return DataTypes.ARRAY(DataTypes.INT()); + case PG_BIGINT: + return DataTypes.BIGINT(); + case PG_BIGINT_ARRAY: + return DataTypes.ARRAY(DataTypes.BIGINT()); + case PG_REAL: + return DataTypes.FLOAT(); + case PG_REAL_ARRAY: + return DataTypes.ARRAY(DataTypes.FLOAT()); + case PG_DOUBLE_PRECISION: + return DataTypes.DOUBLE(); + case PG_DOUBLE_PRECISION_ARRAY: + return DataTypes.ARRAY(DataTypes.DOUBLE()); + case PG_NUMERIC: + return DataTypes.DECIMAL(precision, metadata.getScale(colIndex)); + case PG_NUMERIC_ARRAY: + return DataTypes.ARRAY( + DataTypes.DECIMAL(precision, metadata.getScale(colIndex))); + case PG_CHAR: + case PG_CHARACTER: + return DataTypes.CHAR(precision); + case PG_CHAR_ARRAY: + case PG_CHARACTER_ARRAY: + return DataTypes.ARRAY(DataTypes.CHAR(precision)); + case PG_CHARACTER_VARYING: + return DataTypes.VARCHAR(precision); + case PG_CHARACTER_VARYING_ARRAY: + return DataTypes.ARRAY(DataTypes.VARCHAR(precision)); + case PG_TEXT: + return DataTypes.STRING(); + case PG_TEXT_ARRAY: + return DataTypes.ARRAY(DataTypes.STRING()); + case PG_TIMESTAMP: + return DataTypes.TIMESTAMP(); + case PG_TIMESTAMP_ARRAY: + return DataTypes.ARRAY(DataTypes.TIMESTAMP()); + case PG_TIMESTAMPTZ: + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(); + case PG_TIMESTAMPTZ_ARRAY: + return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()); + case PG_TIME: + return DataTypes.TIME(); + case PG_TIME_ARRAY: + return DataTypes.ARRAY(DataTypes.TIME()); + case PG_DATE: + return DataTypes.DATE(); + case PG_DATE_ARRAY: + return DataTypes.ARRAY(DataTypes.DATE()); + default: + throw new UnsupportedOperationException( + String.format("Doesn't support Postgres type '%s' yet", pgType)); + } + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + + List<String> tables = null; + try { + tables = listTables(tablePath.getDatabaseName()); + } catch (DatabaseNotExistException e) { + return false; + } + + return tables.contains(PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath()); + } + +} diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java new file mode 100644 index 0000000..99cc2b4 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.apache.flink.util.StringUtils; + +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Table path of PostgreSQL in Flink. Can be of formats "table_name" or "schema_name.table_name". + * When it's "table_name", the schema name defaults to "public". + */ +public class PostgresTablePath { + + private static final String DEFAULT_POSTGRES_SCHEMA_NAME = "public"; + + private final String pgSchemaName; + private final String pgTableName; + + public PostgresTablePath(String pgSchemaName, String pgTableName) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgSchemaName)); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgTableName)); + + this.pgSchemaName = pgSchemaName; + this.pgTableName = pgTableName; + } + + public static PostgresTablePath fromFlinkTableName(String flinkTableName) { + if (flinkTableName.contains(".")) { + String[] path = flinkTableName.split("\\."); + + checkArgument(path != null && path.length == 2, + String.format("Table name '%s' is not valid. The parsed length is %d", flinkTableName, path.length)); + + return new PostgresTablePath(path[0], path[1]); + } else { + return new PostgresTablePath(DEFAULT_POSTGRES_SCHEMA_NAME, flinkTableName); + } + } + + public static String toFlinkTableName(String schema, String table) { + return new PostgresTablePath(schema, table).getFullPath(); + } + + public String getFullPath() { + return String.format("%s.%s", pgSchemaName, pgTableName); + } + + public String getFullPathWithQuotes() { + return String.format("`%s.%s`", pgSchemaName, pgTableName); + } + + @Override + public String toString() { + return getFullPathWithQuotes(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + PostgresTablePath that = (PostgresTablePath) o; + return Objects.equals(pgSchemaName, that.pgSchemaName) && + Objects.equals(pgTableName, that.pgTableName); + } + + @Override + public int hashCode() { + return Objects.hash(pgSchemaName, pgTableName); + } +} diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java index fa7192e..743d16a 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java @@ -203,7 +203,10 @@ public final class JDBCDialects { } } - private static class MySQLDialect extends AbstractDialect { + /** + * MySQL dialect. + */ + public static class MySQLDialect extends AbstractDialect { private static final long serialVersionUID = 1L; @@ -301,7 +304,10 @@ public final class JDBCDialects { } } - private static class PostgresDialect extends AbstractDialect { + /** + * Postgres dialect. + */ + public static class PostgresDialect extends AbstractDialect { private static final long serialVersionUID = 1L; diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java new file mode 100644 index 0000000..7a4132b --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Test for {@link JDBCCatalogUtils}. + */ +public class JDBCCatalogUtilsTest { + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testJDBCUrl() { + JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432/"); + + JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432"); + } + + @Test + public void testInvalidJDBCUrl() { + exception.expect(IllegalArgumentException.class); + JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432/db"); + } +} diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java new file mode 100644 index 0000000..e103780 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import com.opentable.db.postgres.junit.EmbeddedPostgresRules; +import com.opentable.db.postgres.junit.SingleInstancePostgresRule; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test for {@link PostgresCatalog}. + */ +public class PostgresCatalogITCase { + @Rule + public ExpectedException exception = ExpectedException.none(); + + @ClassRule + public static SingleInstancePostgresRule pg = EmbeddedPostgresRules.singleInstance(); + + protected static final String TEST_USERNAME = "postgres"; + protected static final String TEST_PWD = "postgres"; + protected static final String TEST_DB = "test"; + protected static final String TEST_SCHEMA = "test_schema"; + protected static final String TABLE1 = "t1"; + protected static final String TABLE2 = "t2"; + protected static final String TABLE3 = "t3"; + + protected static String baseUrl; + protected static Catalog catalog; + + public static Catalog createCatalog(String name, String defaultDb, String username, String pwd, String jdbcUrl) { + return new PostgresCatalog("mypg", PostgresCatalog.DEFAULT_DATABASE, username, pwd, jdbcUrl); + } + + @BeforeClass + public static void setup() throws SQLException { + // jdbc:postgresql://localhost:50807/postgres?user=postgres + String embeddedJdbcUrl = pg.getEmbeddedPostgres().getJdbcUrl(TEST_USERNAME, TEST_PWD); + // jdbc:postgresql://localhost:50807/ + baseUrl = embeddedJdbcUrl.substring(0, embeddedJdbcUrl.lastIndexOf("/") + 1); + + catalog = createCatalog("mypg", PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, TEST_PWD, baseUrl); + + // create test database and schema + createDatabase(TEST_DB); + createSchema(TEST_DB, TEST_SCHEMA); + + // create test tables + // table: postgres.public.user1 + createTable(PostgresTablePath.fromFlinkTableName(TABLE1), getSimpleTable().pgSchemaSql); + + // table: testdb.public.user2 + // table: testdb.testschema.user3 + // table: testdb.public.datatypes + createTable(TEST_DB, PostgresTablePath.fromFlinkTableName(TABLE2), getSimpleTable().pgSchemaSql); + createTable(TEST_DB, new PostgresTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().pgSchemaSql); + createTable(TEST_DB, PostgresTablePath.fromFlinkTableName("datatypes"), getDataTypesTable().pgSchemaSql); + } + + // ------ databases ------ + + @Test + public void testGetDb_DatabaseNotExistException() throws Exception { + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database nonexistent does not exist in Catalog"); + catalog.getDatabase("nonexistent"); + } + + @Test + public void testListDatabases() { + List<String> actual = catalog.listDatabases(); + + assertEquals( + Arrays.asList("postgres", "test"), + actual + ); + } + + @Test + public void testDbExists() throws Exception { + assertFalse(catalog.databaseExists("nonexistent")); + + assertTrue(catalog.databaseExists(PostgresCatalog.DEFAULT_DATABASE)); + } + + // ------ tables ------ + + @Test + public void testListTables() throws DatabaseNotExistException { + List<String> actual = catalog.listTables(PostgresCatalog.DEFAULT_DATABASE); + + assertEquals(Arrays.asList("public.t1"), actual); + + actual = catalog.listTables(TEST_DB); + + assertEquals(Arrays.asList("public.datatypes", "public.t2", "test_schema.t3"), actual); + } + + @Test + public void testListTables_DatabaseNotExistException() throws DatabaseNotExistException { + exception.expect(DatabaseNotExistException.class); + catalog.listTables("postgres/nonexistschema"); + } + + @Test + public void testTableExists() { + assertFalse(catalog.tableExists(new ObjectPath(TEST_DB, "nonexist"))); + + assertTrue(catalog.tableExists(new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE1))); + assertTrue(catalog.tableExists(new ObjectPath(TEST_DB, TABLE2))); + assertTrue(catalog.tableExists(new ObjectPath(TEST_DB, "test_schema.t3"))); + } + + @Test + public void testGetTables_TableNotExistException() throws TableNotExistException { + exception.expect(TableNotExistException.class); + catalog.getTable(new ObjectPath(TEST_DB, PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable"))); + } + + @Test + public void testGetTables_TableNotExistException_NoSchema() throws TableNotExistException { + exception.expect(TableNotExistException.class); + catalog.getTable(new ObjectPath(TEST_DB, PostgresTablePath.toFlinkTableName("nonexistschema", "anytable"))); + } + + @Test + public void testGetTables_TableNotExistException_NoDb() throws TableNotExistException { + exception.expect(TableNotExistException.class); + catalog.getTable(new ObjectPath("nonexistdb", PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable"))); + } + + @Test + public void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException { + // test postgres.public.user1 + TableSchema schema = getSimpleTable().schema; + + CatalogBaseTable table = catalog.getTable(new ObjectPath("postgres", TABLE1)); + + assertEquals(schema, table.getSchema()); + + table = catalog.getTable(new ObjectPath("postgres", "public.t1")); + + assertEquals(schema, table.getSchema()); + + // test testdb.public.user2 + table = catalog.getTable(new ObjectPath(TEST_DB, TABLE2)); + + assertEquals(schema, table.getSchema()); + + table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2")); + + assertEquals(schema, table.getSchema()); + + // test testdb.testschema.user2 + table = catalog.getTable(new ObjectPath(TEST_DB, TEST_SCHEMA + ".t3")); + + assertEquals(schema, table.getSchema()); + + } + + @Test + public void testDataTypes() throws TableNotExistException { + CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, "datatypes")); + + assertEquals(getDataTypesTable().schema, table.getSchema()); + } + + private static class TestTable { + TableSchema schema; + String pgSchemaSql; + + public TestTable(TableSchema schema, String pgSchemaSql) { + this.schema = schema; + this.pgSchemaSql = pgSchemaSql; + } + } + + private static TestTable getSimpleTable() { + return new TestTable( + TableSchema.builder() + .field("name", DataTypes.INT()) + .build(), + "name integer" + ); + } + + private static TestTable getDataTypesTable() { + return new TestTable( + TableSchema.builder() + .field("int", DataTypes.INT()) + .field("int_arr", DataTypes.ARRAY(DataTypes.INT())) + .field("bytea", DataTypes.BYTES()) + .field("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) + .field("short", DataTypes.SMALLINT()) + .field("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT())) + .field("long", DataTypes.BIGINT()) + .field("long_arr", DataTypes.ARRAY(DataTypes.BIGINT())) + .field("real", DataTypes.FLOAT()) + .field("real_arr", DataTypes.ARRAY(DataTypes.FLOAT())) + .field("double_precision", DataTypes.DOUBLE()) + .field("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE())) + .field("numeric", DataTypes.DECIMAL(10, 5)) + .field("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5))) + .field("boolean", DataTypes.BOOLEAN()) + .field("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN())) + .field("text", DataTypes.STRING()) + .field("text_arr", DataTypes.ARRAY(DataTypes.STRING())) + .field("char", DataTypes.CHAR(1)) + .field("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1))) + .field("character", DataTypes.CHAR(3)) + .field("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3))) + .field("character_varying", DataTypes.VARCHAR(20)) + .field("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20))) + .field("timestamp", DataTypes.TIMESTAMP()) + .field("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP())) + .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) + .field("timestamptz_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())) + .field("date", DataTypes.DATE()) + .field("date_arr", DataTypes.ARRAY(DataTypes.DATE())) + .field("time", DataTypes.TIME()) + .field("time_arr", DataTypes.ARRAY(DataTypes.TIME())) + .build(), + "int integer, " + + "int_arr integer[], " + + "bytea bytea, " + + "bytea_arr bytea[], " + + "short smallint, " + + "short_arr smallint[], " + + "long bigint, " + + "long_arr bigint[], " + + "real real, " + + "real_arr real[], " + + "double_precision double precision, " + + "double_precision_arr double precision[], " + + "numeric numeric(10, 5), " + + "numeric_arr numeric(10, 5)[], " + + "boolean boolean, " + + "boolean_arr boolean[], " + + "text text, " + + "text_arr text[], " + + "char char, " + + "char_arr char[], " + + "character character(3), " + + "character_arr character(3)[], " + + "character_varying character varying(20), " + + "character_varying_arr character varying(20)[], " + + "timestamp timestamp(6), " + + "timestamp_arr timestamp(6)[], " + + "timestamptz timestamptz, " + + "timestamptz_arr timestamptz[], " + + "date date, " + + "date_arr date[], " + + "time time(6), " + + "time_arr time(6)[]" + ); + } + + private static void createTable(PostgresTablePath tablePath, String tableSchemaSql) throws SQLException { + executeSQL(PostgresCatalog.DEFAULT_DATABASE, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + } + + private static void createTable(String db, PostgresTablePath tablePath, String tableSchemaSql) throws SQLException { + executeSQL(db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + } + + private static void createSchema(String db, String schema) throws SQLException { + executeSQL(db, String.format("CREATE SCHEMA %s", schema)); + } + + private static void createDatabase(String database) throws SQLException { + executeSQL(String.format("CREATE DATABASE %s;", database)); + } + + private static void executeSQL(String sql) throws SQLException { + executeSQL("", sql); + } + + private static void executeSQL(String db, String sql) throws SQLException { + try (Connection conn = DriverManager.getConnection(baseUrl + db, TEST_USERNAME, TEST_PWD); + Statement statement = conn.createStatement()) { + statement.executeUpdate(sql); + } catch (SQLException e) { + throw e; + } + } + +} diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java new file mode 100644 index 0000000..46f32bc --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test for {@link PostgresTablePath}. + */ +public class PostgresTablePathTest { + @Test + public void testFromFlinkTableName() { + assertEquals(new PostgresTablePath("public", "topic"), PostgresTablePath.fromFlinkTableName("public.topic")); + } +}