This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 75ad29cb9f4f377df27b71e67dbd33f36bb08bee Author: bowen.li <bowenl...@gmail.com> AuthorDate: Sun Mar 8 20:58:27 2020 -0700 [FLINK-16472] support precision of timestamp and time data types closes #11336 --- .../api/java/io/jdbc/catalog/PostgresCatalog.java | 15 +++++++------- .../io/jdbc/catalog/PostgresCatalogITCase.java | 24 +++++++++++----------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java index d12f254..c598073 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java @@ -55,8 +55,6 @@ public class PostgresCatalog extends AbstractJDBCCatalog { private static final Logger LOG = LoggerFactory.getLogger(PostgresCatalog.class); - public static final String POSTGRES_TABLE_TYPE = "postgres"; - public static final String DEFAULT_DATABASE = "postgres"; // ------ Postgres default objects that shouldn't be exposed to users ------ @@ -236,6 +234,7 @@ public class PostgresCatalog extends AbstractJDBCCatalog { String pgType = metadata.getColumnTypeName(colIndex); int precision = metadata.getPrecision(colIndex); + int scale = metadata.getScale(colIndex); switch (pgType) { case PG_BOOLEAN: @@ -286,17 +285,17 @@ public class PostgresCatalog extends AbstractJDBCCatalog { case PG_TEXT_ARRAY: return DataTypes.ARRAY(DataTypes.STRING()); case PG_TIMESTAMP: - return DataTypes.TIMESTAMP(); + return DataTypes.TIMESTAMP(scale); case PG_TIMESTAMP_ARRAY: - return DataTypes.ARRAY(DataTypes.TIMESTAMP()); + return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale)); case PG_TIMESTAMPTZ: - return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(); + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale); case PG_TIMESTAMPTZ_ARRAY: - return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()); + return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale)); case PG_TIME: - return DataTypes.TIME(); + return DataTypes.TIME(scale); case PG_TIME_ARRAY: - return DataTypes.ARRAY(DataTypes.TIME()); + return DataTypes.ARRAY(DataTypes.TIME(scale)); case PG_DATE: return DataTypes.DATE(); case PG_DATE_ARRAY: diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java index e103780..b197bf0 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java @@ -249,14 +249,14 @@ public class PostgresCatalogITCase { .field("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3))) .field("character_varying", DataTypes.VARCHAR(20)) .field("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20))) - .field("timestamp", DataTypes.TIMESTAMP()) - .field("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP())) - .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) - .field("timestamptz_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())) + .field("timestamp", DataTypes.TIMESTAMP(5)) + .field("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5))) + .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)) + .field("timestamptz_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))) .field("date", DataTypes.DATE()) .field("date_arr", DataTypes.ARRAY(DataTypes.DATE())) - .field("time", DataTypes.TIME()) - .field("time_arr", DataTypes.ARRAY(DataTypes.TIME())) + .field("time", DataTypes.TIME(3)) + .field("time_arr", DataTypes.ARRAY(DataTypes.TIME(3))) .build(), "int integer, " + "int_arr integer[], " + @@ -282,14 +282,14 @@ public class PostgresCatalogITCase { "character_arr character(3)[], " + "character_varying character varying(20), " + "character_varying_arr character varying(20)[], " + - "timestamp timestamp(6), " + - "timestamp_arr timestamp(6)[], " + - "timestamptz timestamptz, " + - "timestamptz_arr timestamptz[], " + + "timestamp timestamp(5), " + + "timestamp_arr timestamp(5)[], " + + "timestamptz timestamptz(4), " + + "timestamptz_arr timestamptz(4)[], " + "date date, " + "date_arr date[], " + - "time time(6), " + - "time_arr time(6)[]" + "time time(3), " + + "time_arr time(3)[]" ); }