This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 74b8bdee9fb0bf7cfc27ca8d992dac2a07473a0c
Author: bowen.li <bowenl...@gmail.com>
AuthorDate: Fri Mar 6 14:05:27 2020 -0800

    [FLINK-16471][jdbc] develop JDBCCatalog and PostgresCatalog
    
    closes #11336
---
 .../src/test/resources/log4j2-test.properties      |  28 --
 flink-connectors/flink-jdbc/pom.xml                |  36 ++-
 .../java/io/jdbc/catalog/AbstractJDBCCatalog.java  | 277 ++++++++++++++++++
 .../api/java/io/jdbc/catalog/JDBCCatalog.java      |  84 ++++++
 .../api/java/io/jdbc/catalog/JDBCCatalogUtils.java |  54 ++++
 .../api/java/io/jdbc/catalog/PostgresCatalog.java  | 323 ++++++++++++++++++++
 .../java/io/jdbc/catalog/PostgresTablePath.java    |  95 ++++++
 .../api/java/io/jdbc/dialect/JDBCDialects.java     |  10 +-
 .../java/io/jdbc/catalog/JDBCCatalogUtilsTest.java |  44 +++
 .../io/jdbc/catalog/PostgresCatalogITCase.java     | 325 +++++++++++++++++++++
 .../io/jdbc/catalog/PostgresTablePathTest.java     |  33 +++
 11 files changed, 1275 insertions(+), 34 deletions(-)

diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties
 
b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties
deleted file mode 100644
index 835c2ec..0000000
--- 
a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-rootLogger.level = OFF
-rootLogger.appenderRef.test.ref = TestLogger
-
-appender.testlogger.name = TestLogger
-appender.testlogger.type = CONSOLE
-appender.testlogger.target = SYSTEM_ERR
-appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connectors/flink-jdbc/pom.xml 
b/flink-connectors/flink-jdbc/pom.xml
index 3e83311..cb7afab 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -35,6 +35,11 @@ under the License.
 
        <packaging>jar</packaging>
 
+       <properties>
+               <postgres.version>42.2.10</postgres.version>
+               <otj-pg-embedded.version>0.13.3</otj-pg-embedded.version>
+       </properties>
+
        <dependencies>
                <!-- Table ecosystem -->
                <!-- Projects depending on this project won't depend on 
flink-table-*. -->
@@ -53,13 +58,17 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
+               <!-- Postgres dependencies -->
+
                <dependency>
-                       <groupId>org.apache.derby</groupId>
-                       <artifactId>derby</artifactId>
-                       <version>10.14.2.0</version>
-                       <scope>test</scope>
+                       <groupId>org.postgresql</groupId>
+                       <artifactId>postgresql</artifactId>
+                       <version>${postgres.version}</version>
+                       <scope>provided</scope>
                </dependency>
 
+               <!-- test dependencies -->
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
@@ -89,5 +98,24 @@ under the License.
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+
+               <!-- Postgres test dependencies -->
+
+               <dependency>
+                       <groupId>com.opentable.components</groupId>
+                       <artifactId>otj-pg-embedded</artifactId>
+                       <version>${otj-pg-embedded.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- Derby test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.derby</groupId>
+                       <artifactId>derby</artifactId>
+                       <version>10.14.2.0</version>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 </project>
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
new file mode 100644
index 0000000..523de83
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Abstract catalog for any JDBC catalogs.
+ */
+public abstract class AbstractJDBCCatalog extends AbstractCatalog {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJDBCCatalog.class);
+
+       protected final String username;
+       protected final String pwd;
+       protected final String baseUrl;
+       protected final String defaultUrl;
+
+       public AbstractJDBCCatalog(String catalogName, String defaultDatabase, 
String username, String pwd, String baseUrl) {
+               super(catalogName, defaultDatabase);
+
+               checkArgument(!StringUtils.isNullOrWhitespaceOnly(username));
+               checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
+               checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
+
+               JDBCCatalogUtils.validateJDBCUrl(baseUrl);
+
+               this.username = username;
+               this.pwd = pwd;
+               this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
+               this.defaultUrl = baseUrl + defaultDatabase;
+       }
+
+       @Override
+       public void open() throws CatalogException {
+               // test connection, fail early if we cannot connect to database
+               try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+               } catch (SQLException e) {
+                       throw new ValidationException(
+                               String.format("Failed connecting to %s via 
JDBC.", defaultUrl), e);
+               }
+
+               LOG.info("Catalog {} established connection to {}", getName(), 
defaultUrl);
+       }
+
+       @Override
+       public void close() throws CatalogException {
+               LOG.info("Catalog {} closing", getName());
+       }
+
+       // ------ table factory ------
+
+       public Optional<TableFactory> getTableFactory() {
+               return Optional.of(new JDBCTableSourceSinkFactory());
+       }
+
+       // ------ databases ------
+
+       @Override
+       public boolean databaseExists(String databaseName) throws 
CatalogException {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+               return listDatabases().contains(databaseName);
+       }
+
+       @Override
+       public void createDatabase(String name, CatalogDatabase database, 
boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void dropDatabase(String name, boolean ignoreIfNotExists, 
boolean cascade) throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterDatabase(String name, CatalogDatabase newDatabase, 
boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       // ------ tables and views ------
+
+       @Override
+       public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) 
throws TableNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists) throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists) throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               return Collections.emptyList();
+       }
+
+       // ------ partitions ------
+
+       @Override
+       public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) 
throws TableNotExistException, TableNotPartitionedException, CatalogException {
+               return Collections.emptyList();
+       }
+
+       @Override
+       public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec) throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
+               return Collections.emptyList();
+       }
+
+       @Override
+       public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath 
tablePath, List<Expression> filters) throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
+               return Collections.emptyList();
+       }
+
+       @Override
+       public CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, 
CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec) throws CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws 
TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionAlreadyExistsException, 
CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, 
CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws 
PartitionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       // ------ functions ------
+
+       @Override
+       public List<String> listFunctions(String dbName) throws 
DatabaseNotExistException, CatalogException {
+               return Collections.emptyList();
+       }
+
+       @Override
+       public CatalogFunction getFunction(ObjectPath functionPath) throws 
FunctionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean functionExists(ObjectPath functionPath) throws 
CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void createFunction(ObjectPath functionPath, CatalogFunction 
function, boolean ignoreIfExists) throws FunctionAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterFunction(ObjectPath functionPath, CatalogFunction 
newFunction, boolean ignoreIfNotExists) throws FunctionNotExistException, 
CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void dropFunction(ObjectPath functionPath, boolean 
ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       // ------ stats ------
+
+       @Override
+       public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) 
throws TableNotExistException, CatalogException {
+               return CatalogTableStatistics.UNKNOWN;
+       }
+
+       @Override
+       public CatalogColumnStatistics getTableColumnStatistics(ObjectPath 
tablePath) throws TableNotExistException, CatalogException {
+               return CatalogColumnStatistics.UNKNOWN;
+       }
+
+       @Override
+       public CatalogTableStatistics getPartitionStatistics(ObjectPath 
tablePath, CatalogPartitionSpec partitionSpec) throws 
PartitionNotExistException, CatalogException {
+               return CatalogTableStatistics.UNKNOWN;
+       }
+
+       @Override
+       public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath 
tablePath, CatalogPartitionSpec partitionSpec) throws 
PartitionNotExistException, CatalogException {
+               return CatalogColumnStatistics.UNKNOWN;
+       }
+
+       @Override
+       public void alterTableStatistics(ObjectPath tablePath, 
CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws 
TableNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterTableColumnStatistics(ObjectPath tablePath, 
CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws 
TableNotExistException, CatalogException, TablePartitionedException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterPartitionStatistics(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, 
boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterPartitionColumnStatistics(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, 
boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+}
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java
new file mode 100644
index 0000000..629412c
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Catalogs for relational databases via JDBC.
+ */
+@PublicEvolving
+public class JDBCCatalog extends AbstractJDBCCatalog {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(JDBCCatalog.class);
+
+       private final Catalog internal;
+
+       public JDBCCatalog(String catalogName, String defaultDatabase, String 
username, String pwd, String baseUrl) {
+               super(catalogName, defaultDatabase, username, pwd, baseUrl);
+
+               internal = JDBCCatalogUtils.createCatalog(catalogName, 
defaultDatabase, username, pwd, baseUrl);
+       }
+
+       // ------ databases -----
+
+       @Override
+       public List<String> listDatabases() throws CatalogException {
+               return internal.listDatabases();
+       }
+
+       @Override
+       public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               return internal.getDatabase(databaseName);
+       }
+
+       // ------ tables and views ------
+
+       @Override
+       public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               return internal.listTables(databaseName);
+       }
+
+       @Override
+       public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
+               return internal.getTable(tablePath);
+       }
+
+       @Override
+       public boolean tableExists(ObjectPath tablePath) throws 
CatalogException {
+               try {
+                       return databaseExists(tablePath.getDatabaseName()) &&
+                               
listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName());
+               } catch (DatabaseNotExistException e) {
+                       return false;
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java
new file mode 100644
index 0000000..b9e3a19
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utils for {@link JDBCCatalog}.
+ */
+public class JDBCCatalogUtils {
+       /**
+        * URL has to be without database, like 
"jdbc:postgresql://localhost:5432/" or "jdbc:postgresql://localhost:5432"
+        * rather than "jdbc:postgresql://localhost:5432/db".
+        */
+       public static void validateJDBCUrl(String url) {
+               String[] parts = url.trim().split("\\/+");
+
+               checkArgument(parts.length == 2);
+       }
+
+       /**
+        * Create catalog instance from given information.
+        */
+       public static AbstractJDBCCatalog createCatalog(String catalogName, 
String defaultDatabase, String username, String pwd, String baseUrl) {
+               JDBCDialect dialect = JDBCDialects.get(baseUrl).get();
+
+               if (dialect instanceof JDBCDialects.PostgresDialect) {
+                       return new PostgresCatalog(catalogName, 
defaultDatabase, username, pwd, baseUrl);
+               } else {
+                       throw new UnsupportedOperationException(
+                               String.format("Catalog for '%s' is not 
supported yet.", dialect)
+                       );
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
new file mode 100644
index 0000000..d12f254
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Catalog for PostgreSQL.
+ */
+@Internal
+public class PostgresCatalog extends AbstractJDBCCatalog {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PostgresCatalog.class);
+
+       public static final String POSTGRES_TABLE_TYPE = "postgres";
+
+       public static final String DEFAULT_DATABASE = "postgres";
+
+       // ------ Postgres default objects that shouldn't be exposed to users 
------
+
+       private static final Set<String> builtinDatabases = new 
HashSet<String>() {{
+               add("template0");
+               add("template1");
+       }};
+
+       private static final Set<String> builtinSchemas = new HashSet<String>() 
{{
+               add("pg_toast");
+               add("pg_temp_1");
+               add("pg_toast_temp_1");
+               add("pg_catalog");
+               add("information_schema");
+       }};
+
+       protected PostgresCatalog(String catalogName, String defaultDatabase, 
String username, String pwd, String baseUrl) {
+               super(catalogName, defaultDatabase, username, pwd, baseUrl);
+       }
+
+       // ------ databases ------
+
+       @Override
+       public List<String> listDatabases() throws CatalogException {
+               List<String> pgDatabases = new ArrayList<>();
+
+               try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+
+                       PreparedStatement ps = conn.prepareStatement("SELECT 
datname FROM pg_database;");
+
+                       ResultSet rs = ps.executeQuery();
+
+                       while (rs.next()) {
+                               String dbName = rs.getString(1);
+                               if (!builtinDatabases.contains(dbName)) {
+                                       pgDatabases.add(rs.getString(1));
+                               }
+                       }
+
+                       return pgDatabases;
+               } catch (Exception e) {
+                       throw new CatalogException(
+                               String.format("Failed listing database in 
catalog %s", getName()), e);
+               }
+       }
+
+       @Override
+       public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               if (listDatabases().contains(databaseName)) {
+                       return new CatalogDatabaseImpl(Collections.emptyMap(), 
null);
+               } else {
+                       throw new DatabaseNotExistException(getName(), 
databaseName);
+               }
+       }
+
+       // ------ tables ------
+
+       @Override
+       public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               if (!databaseExists(databaseName)) {
+                       throw new DatabaseNotExistException(getName(), 
databaseName);
+               }
+
+               // get all schemas
+               try (Connection conn = DriverManager.getConnection(baseUrl + 
databaseName, username, pwd)) {
+                       PreparedStatement ps = conn.prepareStatement("SELECT 
schema_name FROM information_schema.schemata;");
+
+                       ResultSet rs = ps.executeQuery();
+
+                       List<String> schemas = new ArrayList<>();
+
+                       while (rs.next()) {
+                               String pgSchema = rs.getString(1);
+                               if (!builtinSchemas.contains(pgSchema)) {
+                                       schemas.add(pgSchema);
+                               }
+                       }
+
+                       List<String> tables = new ArrayList<>();
+
+                       for (String schema : schemas) {
+                               PreparedStatement stmt = conn.prepareStatement(
+                                       "SELECT * \n" +
+                                               "FROM information_schema.tables 
\n" +
+                                               "WHERE table_type = 'BASE 
TABLE' \n" +
+                                               "    AND table_schema = ? \n" +
+                                               "ORDER BY table_type, 
table_name;");
+
+                               stmt.setString(1, schema);
+
+                               ResultSet rstables = stmt.executeQuery();
+
+                               while (rstables.next()) {
+                                       // position 1 is database name, 
position 2 is schema name, position 3 is table name
+                                       tables.add(schema + "." + 
rstables.getString(3));
+                               }
+                       }
+
+                       return tables;
+               } catch (Exception e) {
+                       throw new CatalogException(
+                               String.format("Failed listing database in 
catalog %s", getName()), e);
+               }
+       }
+
+       @Override
+       public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
+               if (!tableExists(tablePath)) {
+                       throw new TableNotExistException(getName(), tablePath);
+               }
+
+               PostgresTablePath pgPath = 
PostgresTablePath.fromFlinkTableName(tablePath.getObjectName());
+
+               try (Connection conn = DriverManager.getConnection(baseUrl + 
tablePath.getDatabaseName(), username, pwd)) {
+
+                       PreparedStatement ps = conn.prepareStatement(
+                               String.format("SELECT * FROM %s;", 
pgPath.getFullPath()));
+
+                       ResultSetMetaData rsmd = ps.getMetaData();
+
+                       String[] names = new String[rsmd.getColumnCount()];
+                       DataType[] types = new DataType[rsmd.getColumnCount()];
+
+                       for (int i = 1; i <= rsmd.getColumnCount(); i++) {
+                               names[i - 1] = rsmd.getColumnName(i);
+                               types[i - 1] = fromJDBCType(rsmd, i);
+                       }
+
+                       TableSchema tableSchema = new 
TableSchema.Builder().fields(names, types).build();
+
+                       return new CatalogTableImpl(
+                               tableSchema,
+                               new HashMap<>(),
+                               ""
+                       );
+               } catch (Exception e) {
+                       throw new CatalogException(
+                               String.format("Failed getting table %s", 
tablePath.getFullName()), e);
+               }
+       }
+
+       public static final String PG_BYTEA = "bytea";
+       public static final String PG_BYTEA_ARRAY = "_bytea";
+       public static final String PG_SMALLINT = "int2";
+       public static final String PG_SMALLINT_ARRAY = "_int2";
+       public static final String PG_INTEGER = "int4";
+       public static final String PG_INTEGER_ARRAY = "_int4";
+       public static final String PG_BIGINT = "int8";
+       public static final String PG_BIGINT_ARRAY = "_int8";
+       public static final String PG_REAL = "float4";
+       public static final String PG_REAL_ARRAY = "_float4";
+       public static final String PG_DOUBLE_PRECISION = "float8";
+       public static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
+       public static final String PG_NUMERIC = "numeric";
+       public static final String PG_NUMERIC_ARRAY = "_numeric";
+       public static final String PG_BOOLEAN = "bool";
+       public static final String PG_BOOLEAN_ARRAY = "_bool";
+       public static final String PG_TIMESTAMP = "timestamp";
+       public static final String PG_TIMESTAMP_ARRAY = "_timestamp";
+       public static final String PG_TIMESTAMPTZ = "timestamptz";
+       public static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
+       public static final String PG_DATE = "date";
+       public static final String PG_DATE_ARRAY = "_date";
+       public static final String PG_TIME = "time";
+       public static final String PG_TIME_ARRAY = "_time";
+       public static final String PG_TEXT = "text";
+       public static final String PG_TEXT_ARRAY = "_text";
+       public static final String PG_CHAR = "bpchar";
+       public static final String PG_CHAR_ARRAY = "_bpchar";
+       public static final String PG_CHARACTER = "character";
+       public static final String PG_CHARACTER_ARRAY = "_character";
+       public static final String PG_CHARACTER_VARYING = "varchar";
+       public static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
+
+       private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) 
throws SQLException {
+               String pgType = metadata.getColumnTypeName(colIndex);
+
+               int precision = metadata.getPrecision(colIndex);
+
+               switch (pgType) {
+                       case PG_BOOLEAN:
+                               return DataTypes.BOOLEAN();
+                       case PG_BOOLEAN_ARRAY:
+                               return DataTypes.ARRAY(DataTypes.BOOLEAN());
+                       case PG_BYTEA:
+                               return DataTypes.BYTES();
+                       case PG_BYTEA_ARRAY:
+                               return DataTypes.ARRAY(DataTypes.BYTES());
+                       case PG_SMALLINT:
+                               return DataTypes.SMALLINT();
+                       case PG_SMALLINT_ARRAY:
+                               return DataTypes.ARRAY(DataTypes.SMALLINT());
+                       case PG_INTEGER:
+                               return DataTypes.INT();
+                       case PG_INTEGER_ARRAY:
+                               return DataTypes.ARRAY(DataTypes.INT());
+                       case PG_BIGINT:
+                               return DataTypes.BIGINT();
+                       case PG_BIGINT_ARRAY:
+                               return DataTypes.ARRAY(DataTypes.BIGINT());
+                       case PG_REAL:
+                               return DataTypes.FLOAT();
+                       case PG_REAL_ARRAY:
+                               return DataTypes.ARRAY(DataTypes.FLOAT());
+                       case PG_DOUBLE_PRECISION:
+                               return DataTypes.DOUBLE();
+                       case PG_DOUBLE_PRECISION_ARRAY:
+                               return DataTypes.ARRAY(DataTypes.DOUBLE());
+                       case PG_NUMERIC:
+                               return DataTypes.DECIMAL(precision, 
metadata.getScale(colIndex));
+                       case PG_NUMERIC_ARRAY:
+                               return DataTypes.ARRAY(
+                                       DataTypes.DECIMAL(precision, 
metadata.getScale(colIndex)));
+                       case PG_CHAR:
+                       case PG_CHARACTER:
+                               return DataTypes.CHAR(precision);
+                       case PG_CHAR_ARRAY:
+                       case PG_CHARACTER_ARRAY:
+                               return 
DataTypes.ARRAY(DataTypes.CHAR(precision));
+                       case PG_CHARACTER_VARYING:
+                               return DataTypes.VARCHAR(precision);
+                       case PG_CHARACTER_VARYING_ARRAY:
+                               return 
DataTypes.ARRAY(DataTypes.VARCHAR(precision));
+                       case PG_TEXT:
+                               return DataTypes.STRING();
+                       case PG_TEXT_ARRAY:
+                               return DataTypes.ARRAY(DataTypes.STRING());
+                       case PG_TIMESTAMP:
+                               return DataTypes.TIMESTAMP();
+                       case PG_TIMESTAMP_ARRAY:
+                               return DataTypes.ARRAY(DataTypes.TIMESTAMP());
+                       case PG_TIMESTAMPTZ:
+                               return 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+                       case PG_TIMESTAMPTZ_ARRAY:
+                               return 
DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+                       case PG_TIME:
+                               return DataTypes.TIME();
+                       case PG_TIME_ARRAY:
+                               return DataTypes.ARRAY(DataTypes.TIME());
+                       case PG_DATE:
+                               return DataTypes.DATE();
+                       case PG_DATE_ARRAY:
+                               return DataTypes.ARRAY(DataTypes.DATE());
+                       default:
+                               throw new UnsupportedOperationException(
+                                       String.format("Doesn't support Postgres 
type '%s' yet", pgType));
+               }
+       }
+
+       @Override
+       public boolean tableExists(ObjectPath tablePath) throws 
CatalogException {
+
+               List<String> tables = null;
+               try {
+                       tables = listTables(tablePath.getDatabaseName());
+               } catch (DatabaseNotExistException e) {
+                       return false;
+               }
+
+               return 
tables.contains(PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath());
+       }
+
+}
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java
new file mode 100644
index 0000000..99cc2b4
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Table path of PostgreSQL in Flink. Can be of formats "table_name" or 
"schema_name.table_name".
+ * When it's "table_name", the schema name defaults to "public".
+ */
+public class PostgresTablePath {
+
+       private static final String DEFAULT_POSTGRES_SCHEMA_NAME = "public";
+
+       private final String pgSchemaName;
+       private final String pgTableName;
+
+       public PostgresTablePath(String pgSchemaName, String pgTableName) {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgSchemaName));
+               checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgTableName));
+
+               this.pgSchemaName = pgSchemaName;
+               this.pgTableName = pgTableName;
+       }
+
+       public static PostgresTablePath fromFlinkTableName(String 
flinkTableName) {
+               if (flinkTableName.contains(".")) {
+                       String[] path = flinkTableName.split("\\.");
+
+                       checkArgument(path != null && path.length == 2,
+                               String.format("Table name '%s' is not valid. 
The parsed length is %d", flinkTableName, path.length));
+
+                       return new PostgresTablePath(path[0], path[1]);
+               } else {
+                       return new 
PostgresTablePath(DEFAULT_POSTGRES_SCHEMA_NAME, flinkTableName);
+               }
+       }
+
+       public static String toFlinkTableName(String schema, String table) {
+               return new PostgresTablePath(schema, table).getFullPath();
+       }
+
+       public String getFullPath() {
+               return String.format("%s.%s", pgSchemaName, pgTableName);
+       }
+
+       public String getFullPathWithQuotes() {
+               return String.format("`%s.%s`", pgSchemaName, pgTableName);
+       }
+
+       @Override
+       public String toString() {
+               return getFullPathWithQuotes();
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               PostgresTablePath that = (PostgresTablePath) o;
+               return Objects.equals(pgSchemaName, that.pgSchemaName) &&
+                       Objects.equals(pgTableName, that.pgTableName);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(pgSchemaName, pgTableName);
+       }
+}
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
index fa7192e..743d16a 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
@@ -203,7 +203,10 @@ public final class JDBCDialects {
                }
        }
 
-       private static class MySQLDialect extends AbstractDialect {
+       /**
+        * MySQL dialect.
+        */
+       public static class MySQLDialect extends AbstractDialect {
 
                private static final long serialVersionUID = 1L;
 
@@ -301,7 +304,10 @@ public final class JDBCDialects {
                }
        }
 
-       private static class PostgresDialect extends AbstractDialect {
+       /**
+        * Postgres dialect.
+        */
+       public static class PostgresDialect extends AbstractDialect {
 
                private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java
new file mode 100644
index 0000000..7a4132b
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test for {@link JDBCCatalogUtils}.
+ */
+public class JDBCCatalogUtilsTest {
+       @Rule
+       public ExpectedException exception = ExpectedException.none();
+
+       @Test
+       public void testJDBCUrl() {
+               
JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432/");
+
+               
JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432");
+       }
+
+       @Test
+       public void testInvalidJDBCUrl() {
+               exception.expect(IllegalArgumentException.class);
+               
JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432/db");
+       }
+}
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
new file mode 100644
index 0000000..e103780
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
+import com.opentable.db.postgres.junit.SingleInstancePostgresRule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link PostgresCatalog}.
+ */
+public class PostgresCatalogITCase {
+       @Rule
+       public ExpectedException exception = ExpectedException.none();
+
+       @ClassRule
+       public static SingleInstancePostgresRule pg = 
EmbeddedPostgresRules.singleInstance();
+
+       protected static final String TEST_USERNAME = "postgres";
+       protected static final String TEST_PWD = "postgres";
+       protected static final String TEST_DB = "test";
+       protected static final String TEST_SCHEMA = "test_schema";
+       protected static final String TABLE1 = "t1";
+       protected static final String TABLE2 = "t2";
+       protected static final String TABLE3 = "t3";
+
+       protected static String baseUrl;
+       protected static Catalog catalog;
+
+       public static Catalog createCatalog(String name, String defaultDb, 
String username, String pwd, String jdbcUrl) {
+               return new PostgresCatalog("mypg", 
PostgresCatalog.DEFAULT_DATABASE, username, pwd, jdbcUrl);
+       }
+
+       @BeforeClass
+       public static void setup() throws SQLException {
+               // jdbc:postgresql://localhost:50807/postgres?user=postgres
+               String embeddedJdbcUrl = 
pg.getEmbeddedPostgres().getJdbcUrl(TEST_USERNAME, TEST_PWD);
+               // jdbc:postgresql://localhost:50807/
+               baseUrl = embeddedJdbcUrl.substring(0, 
embeddedJdbcUrl.lastIndexOf("/") + 1);
+
+               catalog = createCatalog("mypg", 
PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, TEST_PWD, baseUrl);
+
+               // create test database and schema
+               createDatabase(TEST_DB);
+               createSchema(TEST_DB, TEST_SCHEMA);
+
+               // create test tables
+               // table: postgres.public.user1
+               createTable(PostgresTablePath.fromFlinkTableName(TABLE1), 
getSimpleTable().pgSchemaSql);
+
+               // table: testdb.public.user2
+               // table: testdb.testschema.user3
+               // table: testdb.public.datatypes
+               createTable(TEST_DB, 
PostgresTablePath.fromFlinkTableName(TABLE2), getSimpleTable().pgSchemaSql);
+               createTable(TEST_DB, new PostgresTablePath(TEST_SCHEMA, 
TABLE3), getSimpleTable().pgSchemaSql);
+               createTable(TEST_DB, 
PostgresTablePath.fromFlinkTableName("datatypes"), 
getDataTypesTable().pgSchemaSql);
+       }
+
+       // ------ databases ------
+
+       @Test
+       public void testGetDb_DatabaseNotExistException() throws Exception {
+               exception.expect(DatabaseNotExistException.class);
+               exception.expectMessage("Database nonexistent does not exist in 
Catalog");
+               catalog.getDatabase("nonexistent");
+       }
+
+       @Test
+       public void testListDatabases() {
+               List<String> actual = catalog.listDatabases();
+
+               assertEquals(
+                       Arrays.asList("postgres", "test"),
+                       actual
+               );
+       }
+
+       @Test
+       public void testDbExists() throws Exception {
+               assertFalse(catalog.databaseExists("nonexistent"));
+
+               
assertTrue(catalog.databaseExists(PostgresCatalog.DEFAULT_DATABASE));
+       }
+
+       // ------ tables ------
+
+       @Test
+       public void testListTables() throws DatabaseNotExistException {
+               List<String> actual = 
catalog.listTables(PostgresCatalog.DEFAULT_DATABASE);
+
+               assertEquals(Arrays.asList("public.t1"), actual);
+
+               actual = catalog.listTables(TEST_DB);
+
+               assertEquals(Arrays.asList("public.datatypes", "public.t2", 
"test_schema.t3"), actual);
+       }
+
+       @Test
+       public void testListTables_DatabaseNotExistException() throws 
DatabaseNotExistException {
+               exception.expect(DatabaseNotExistException.class);
+               catalog.listTables("postgres/nonexistschema");
+       }
+
+       @Test
+       public void testTableExists() {
+               assertFalse(catalog.tableExists(new ObjectPath(TEST_DB, 
"nonexist")));
+
+               assertTrue(catalog.tableExists(new 
ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE1)));
+               assertTrue(catalog.tableExists(new ObjectPath(TEST_DB, 
TABLE2)));
+               assertTrue(catalog.tableExists(new ObjectPath(TEST_DB, 
"test_schema.t3")));
+       }
+
+       @Test
+       public void testGetTables_TableNotExistException() throws 
TableNotExistException {
+               exception.expect(TableNotExistException.class);
+               catalog.getTable(new ObjectPath(TEST_DB, 
PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
+       }
+
+       @Test
+       public void testGetTables_TableNotExistException_NoSchema() throws 
TableNotExistException {
+               exception.expect(TableNotExistException.class);
+               catalog.getTable(new ObjectPath(TEST_DB, 
PostgresTablePath.toFlinkTableName("nonexistschema", "anytable")));
+       }
+
+       @Test
+       public void testGetTables_TableNotExistException_NoDb() throws 
TableNotExistException {
+               exception.expect(TableNotExistException.class);
+               catalog.getTable(new ObjectPath("nonexistdb", 
PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
+       }
+
+       @Test
+       public void testGetTable() throws 
org.apache.flink.table.catalog.exceptions.TableNotExistException {
+               // test postgres.public.user1
+               TableSchema schema = getSimpleTable().schema;
+
+               CatalogBaseTable table = catalog.getTable(new 
ObjectPath("postgres", TABLE1));
+
+               assertEquals(schema, table.getSchema());
+
+               table = catalog.getTable(new ObjectPath("postgres", 
"public.t1"));
+
+               assertEquals(schema, table.getSchema());
+
+               // test testdb.public.user2
+               table = catalog.getTable(new ObjectPath(TEST_DB, TABLE2));
+
+               assertEquals(schema, table.getSchema());
+
+               table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2"));
+
+               assertEquals(schema, table.getSchema());
+
+               // test testdb.testschema.user2
+               table = catalog.getTable(new ObjectPath(TEST_DB, TEST_SCHEMA + 
".t3"));
+
+               assertEquals(schema, table.getSchema());
+
+       }
+
+       @Test
+       public void testDataTypes() throws TableNotExistException {
+               CatalogBaseTable table = catalog.getTable(new 
ObjectPath(TEST_DB, "datatypes"));
+
+               assertEquals(getDataTypesTable().schema, table.getSchema());
+       }
+
+       private static class TestTable {
+               TableSchema schema;
+               String pgSchemaSql;
+
+               public TestTable(TableSchema schema, String pgSchemaSql) {
+                       this.schema = schema;
+                       this.pgSchemaSql = pgSchemaSql;
+               }
+       }
+
+       private static TestTable getSimpleTable() {
+               return new TestTable(
+                       TableSchema.builder()
+                               .field("name", DataTypes.INT())
+                               .build(),
+                       "name integer"
+               );
+       }
+
+       private static TestTable getDataTypesTable() {
+               return new TestTable(
+                       TableSchema.builder()
+                               .field("int", DataTypes.INT())
+                               .field("int_arr", 
DataTypes.ARRAY(DataTypes.INT()))
+                               .field("bytea", DataTypes.BYTES())
+                               .field("bytea_arr", 
DataTypes.ARRAY(DataTypes.BYTES()))
+                               .field("short", DataTypes.SMALLINT())
+                               .field("short_arr", 
DataTypes.ARRAY(DataTypes.SMALLINT()))
+                               .field("long", DataTypes.BIGINT())
+                               .field("long_arr", 
DataTypes.ARRAY(DataTypes.BIGINT()))
+                               .field("real", DataTypes.FLOAT())
+                               .field("real_arr", 
DataTypes.ARRAY(DataTypes.FLOAT()))
+                               .field("double_precision", DataTypes.DOUBLE())
+                               .field("double_precision_arr", 
DataTypes.ARRAY(DataTypes.DOUBLE()))
+                               .field("numeric", DataTypes.DECIMAL(10, 5))
+                               .field("numeric_arr", 
DataTypes.ARRAY(DataTypes.DECIMAL(10, 5)))
+                               .field("boolean", DataTypes.BOOLEAN())
+                               .field("boolean_arr", 
DataTypes.ARRAY(DataTypes.BOOLEAN()))
+                               .field("text", DataTypes.STRING())
+                               .field("text_arr", 
DataTypes.ARRAY(DataTypes.STRING()))
+                               .field("char", DataTypes.CHAR(1))
+                               .field("char_arr", 
DataTypes.ARRAY(DataTypes.CHAR(1)))
+                               .field("character", DataTypes.CHAR(3))
+                               .field("character_arr", 
DataTypes.ARRAY(DataTypes.CHAR(3)))
+                               .field("character_varying", 
DataTypes.VARCHAR(20))
+                               .field("character_varying_arr", 
DataTypes.ARRAY(DataTypes.VARCHAR(20)))
+                               .field("timestamp", DataTypes.TIMESTAMP())
+                               .field("timestamp_arr", 
DataTypes.ARRAY(DataTypes.TIMESTAMP()))
+                               .field("timestamptz", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+                               .field("timestamptz_arr", 
DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()))
+                               .field("date", DataTypes.DATE())
+                               .field("date_arr", 
DataTypes.ARRAY(DataTypes.DATE()))
+                               .field("time", DataTypes.TIME())
+                               .field("time_arr", 
DataTypes.ARRAY(DataTypes.TIME()))
+                               .build(),
+                       "int integer, " +
+                               "int_arr integer[], " +
+                               "bytea bytea, " +
+                               "bytea_arr bytea[], " +
+                               "short smallint, " +
+                               "short_arr smallint[], " +
+                               "long bigint, " +
+                               "long_arr bigint[], " +
+                               "real real, " +
+                               "real_arr real[], " +
+                               "double_precision double precision, " +
+                               "double_precision_arr double precision[], " +
+                               "numeric numeric(10, 5), " +
+                               "numeric_arr numeric(10, 5)[], " +
+                               "boolean boolean, " +
+                               "boolean_arr boolean[], " +
+                               "text text, " +
+                               "text_arr text[], " +
+                               "char char, " +
+                               "char_arr char[], " +
+                               "character character(3), " +
+                               "character_arr character(3)[], " +
+                               "character_varying character varying(20), " +
+                               "character_varying_arr character varying(20)[], 
" +
+                               "timestamp timestamp(6), " +
+                               "timestamp_arr timestamp(6)[], " +
+                               "timestamptz timestamptz, " +
+                               "timestamptz_arr timestamptz[], " +
+                               "date date, " +
+                               "date_arr date[], " +
+                               "time time(6), " +
+                               "time_arr time(6)[]"
+               );
+       }
+
+       private static void createTable(PostgresTablePath tablePath, String 
tableSchemaSql) throws SQLException {
+               executeSQL(PostgresCatalog.DEFAULT_DATABASE, 
String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
+       }
+
+       private static void createTable(String db, PostgresTablePath tablePath, 
String tableSchemaSql) throws SQLException {
+               executeSQL(db, String.format("CREATE TABLE %s(%s);", 
tablePath.getFullPath(), tableSchemaSql));
+       }
+
+       private static void createSchema(String db, String schema) throws 
SQLException {
+               executeSQL(db, String.format("CREATE SCHEMA %s", schema));
+       }
+
+       private static void createDatabase(String database) throws SQLException 
{
+               executeSQL(String.format("CREATE DATABASE %s;", database));
+       }
+
+       private static void executeSQL(String sql) throws SQLException {
+               executeSQL("", sql);
+       }
+
+       private static void executeSQL(String db, String sql) throws 
SQLException {
+               try (Connection conn = DriverManager.getConnection(baseUrl + 
db, TEST_USERNAME, TEST_PWD);
+                               Statement statement = conn.createStatement()) {
+                       statement.executeUpdate(sql);
+               } catch (SQLException e) {
+                       throw e;
+               }
+       }
+
+}
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java
new file mode 100644
index 0000000..46f32bc
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link PostgresTablePath}.
+ */
+public class PostgresTablePathTest {
+       @Test
+       public void testFromFlinkTableName() {
+               assertEquals(new PostgresTablePath("public", "topic"), 
PostgresTablePath.fromFlinkTableName("public.topic"));
+       }
+}

Reply via email to