This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 75ad29cb9f4f377df27b71e67dbd33f36bb08bee
Author: bowen.li <bowenl...@gmail.com>
AuthorDate: Sun Mar 8 20:58:27 2020 -0700

    [FLINK-16472] support precision of timestamp and time data types
    
    closes #11336
---
 .../api/java/io/jdbc/catalog/PostgresCatalog.java  | 15 +++++++-------
 .../io/jdbc/catalog/PostgresCatalogITCase.java     | 24 +++++++++++-----------
 2 files changed, 19 insertions(+), 20 deletions(-)

diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
index d12f254..c598073 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java
@@ -55,8 +55,6 @@ public class PostgresCatalog extends AbstractJDBCCatalog {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(PostgresCatalog.class);
 
-       public static final String POSTGRES_TABLE_TYPE = "postgres";
-
        public static final String DEFAULT_DATABASE = "postgres";
 
        // ------ Postgres default objects that shouldn't be exposed to users 
------
@@ -236,6 +234,7 @@ public class PostgresCatalog extends AbstractJDBCCatalog {
                String pgType = metadata.getColumnTypeName(colIndex);
 
                int precision = metadata.getPrecision(colIndex);
+               int scale = metadata.getScale(colIndex);
 
                switch (pgType) {
                        case PG_BOOLEAN:
@@ -286,17 +285,17 @@ public class PostgresCatalog extends AbstractJDBCCatalog {
                        case PG_TEXT_ARRAY:
                                return DataTypes.ARRAY(DataTypes.STRING());
                        case PG_TIMESTAMP:
-                               return DataTypes.TIMESTAMP();
+                               return DataTypes.TIMESTAMP(scale);
                        case PG_TIMESTAMP_ARRAY:
-                               return DataTypes.ARRAY(DataTypes.TIMESTAMP());
+                               return 
DataTypes.ARRAY(DataTypes.TIMESTAMP(scale));
                        case PG_TIMESTAMPTZ:
-                               return 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+                               return 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale);
                        case PG_TIMESTAMPTZ_ARRAY:
-                               return 
DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+                               return 
DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale));
                        case PG_TIME:
-                               return DataTypes.TIME();
+                               return DataTypes.TIME(scale);
                        case PG_TIME_ARRAY:
-                               return DataTypes.ARRAY(DataTypes.TIME());
+                               return DataTypes.ARRAY(DataTypes.TIME(scale));
                        case PG_DATE:
                                return DataTypes.DATE();
                        case PG_DATE_ARRAY:
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
index e103780..b197bf0 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java
@@ -249,14 +249,14 @@ public class PostgresCatalogITCase {
                                .field("character_arr", 
DataTypes.ARRAY(DataTypes.CHAR(3)))
                                .field("character_varying", 
DataTypes.VARCHAR(20))
                                .field("character_varying_arr", 
DataTypes.ARRAY(DataTypes.VARCHAR(20)))
-                               .field("timestamp", DataTypes.TIMESTAMP())
-                               .field("timestamp_arr", 
DataTypes.ARRAY(DataTypes.TIMESTAMP()))
-                               .field("timestamptz", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
-                               .field("timestamptz_arr", 
DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()))
+                               .field("timestamp", DataTypes.TIMESTAMP(5))
+                               .field("timestamp_arr", 
DataTypes.ARRAY(DataTypes.TIMESTAMP(5)))
+                               .field("timestamptz", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))
+                               .field("timestamptz_arr", 
DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)))
                                .field("date", DataTypes.DATE())
                                .field("date_arr", 
DataTypes.ARRAY(DataTypes.DATE()))
-                               .field("time", DataTypes.TIME())
-                               .field("time_arr", 
DataTypes.ARRAY(DataTypes.TIME()))
+                               .field("time", DataTypes.TIME(3))
+                               .field("time_arr", 
DataTypes.ARRAY(DataTypes.TIME(3)))
                                .build(),
                        "int integer, " +
                                "int_arr integer[], " +
@@ -282,14 +282,14 @@ public class PostgresCatalogITCase {
                                "character_arr character(3)[], " +
                                "character_varying character varying(20), " +
                                "character_varying_arr character varying(20)[], 
" +
-                               "timestamp timestamp(6), " +
-                               "timestamp_arr timestamp(6)[], " +
-                               "timestamptz timestamptz, " +
-                               "timestamptz_arr timestamptz[], " +
+                               "timestamp timestamp(5), " +
+                               "timestamp_arr timestamp(5)[], " +
+                               "timestamptz timestamptz(4), " +
+                               "timestamptz_arr timestamptz(4)[], " +
                                "date date, " +
                                "date_arr date[], " +
-                               "time time(6), " +
-                               "time_arr time(6)[]"
+                               "time time(3), " +
+                               "time_arr time(3)[]"
                );
        }
 

Reply via email to