This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch FLINK-38835 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 2f32836a783f80f295c9dce339c11afec2a32dc2 Author: lvyanquan <[email protected]> AuthorDate: Thu Jan 8 15:13:06 2026 +0800 [FLINK-38835][postgres] Fix timestamp conversion to LocalDateTime for dates before 1970-01-01. --- .../postgresql/CustomPostgresValueConverter.java | 99 ++++++++++++++++++++++ .../connector/postgresql/PostgresObjectUtils.java | 2 +- .../postgresql/connection/PostgresConnection.java | 4 +- .../postgres/table/PostgreSQLConnectorITCase.java | 11 ++- .../src/test/resources/ddl/column_type_test.sql | 6 ++ 5 files changed, 116 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/CustomPostgresValueConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/CustomPostgresValueConverter.java new file mode 100644 index 000000000..b12bc48d7 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/CustomPostgresValueConverter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.postgresql; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.Column; +import org.apache.kafka.connect.data.Field; + +import java.nio.charset.Charset; +import java.sql.Timestamp; +import java.time.ZoneOffset; + +/** + * A custom PostgresValueConverter that correctly handles timestamp conversion to LocalDateTime for + * dates before 1970-01-01. + */ +public class CustomPostgresValueConverter extends PostgresValueConverter { + protected CustomPostgresValueConverter( + Charset databaseCharset, + DecimalMode decimalMode, + TemporalPrecisionMode temporalPrecisionMode, + ZoneOffset defaultOffset, + BigIntUnsignedMode bigIntUnsignedMode, + boolean includeUnknownDatatypes, + TypeRegistry typeRegistry, + PostgresConnectorConfig.HStoreHandlingMode hStoreMode, + CommonConnectorConfig.BinaryHandlingMode binaryMode, + PostgresConnectorConfig.IntervalHandlingMode intervalMode, + byte[] toastPlaceholder, + int moneyFractionDigits) { + super( + databaseCharset, + decimalMode, + temporalPrecisionMode, + defaultOffset, + bigIntUnsignedMode, + includeUnknownDatatypes, + typeRegistry, + hStoreMode, + binaryMode, + intervalMode, + toastPlaceholder, + moneyFractionDigits); + } + + public static CustomPostgresValueConverter of( + PostgresConnectorConfig connectorConfig, + Charset databaseCharset, + TypeRegistry typeRegistry) { + return new CustomPostgresValueConverter( + databaseCharset, + connectorConfig.getDecimalMode(), + connectorConfig.getTemporalPrecisionMode(), + ZoneOffset.UTC, + null, + connectorConfig.includeUnknownDatatypes(), + typeRegistry, + connectorConfig.hStoreHandlingMode(), + connectorConfig.binaryHandlingMode(), + connectorConfig.intervalHandlingMode(), + connectorConfig.getUnavailableValuePlaceholder(), + connectorConfig.moneyFractionDigits()); + } + + @Override + protected Object convertTimestampToLocalDateTime(Column column, Field fieldDefn, Object data) { + if (data == null) { + return null; + } + if (!(data instanceof Timestamp)) { + return data; + } + final Timestamp timestamp = (Timestamp) data; + + if (POSITIVE_INFINITY_TIMESTAMP.equals(timestamp)) { + return POSITIVE_INFINITY_LOCAL_DATE_TIME; + } else if (NEGATIVE_INFINITY_TIMESTAMP.equals(timestamp)) { + return NEGATIVE_INFINITY_LOCAL_DATE_TIME; + } + + return timestamp.toLocalDateTime(); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java index c50db0435..e8730c144 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java @@ -81,7 +81,7 @@ public class PostgresObjectUtils { public static PostgresConnection.PostgresValueConverterBuilder newPostgresValueConverterBuilder( PostgresConnectorConfig config) { return typeRegistry -> - PostgresValueConverter.of(config, StandardCharsets.UTF_8, typeRegistry); + CustomPostgresValueConverter.of(config, StandardCharsets.UTF_8, typeRegistry); } // modified from diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java index 687dcc9ae..eec9ac19f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -10,6 +10,7 @@ import com.zaxxer.hikari.pool.HikariProxyConnection; import io.debezium.DebeziumException; import io.debezium.annotation.VisibleForTesting; import io.debezium.config.Configuration; +import io.debezium.connector.postgresql.CustomPostgresValueConverter; import io.debezium.connector.postgresql.PgOid; import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresSchema; @@ -188,7 +189,8 @@ public class PostgresConnection extends JdbcConnection { } else { this.typeRegistry = typeRegistry; final PostgresValueConverter valueConverter = - PostgresValueConverter.of(config, this.getDatabaseCharset(), typeRegistry); + CustomPostgresValueConverter.of( + config, this.getDatabaseCharset(), typeRegistry); this.defaultValueConverter = new PostgresDefaultValueConverter(valueConverter, this.getTimestampUtils()); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 7f4329be0..b6377e073 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -689,18 +689,21 @@ class PostgreSQLConnectorITCase extends PostgresTestBase { // generate WAL try (Connection connection = getJdbcConnection(POSTGIS_CONTAINER); Statement statement = connection.createStatement()) { - statement.execute("UPDATE inventory.full_types SET small_c=0 WHERE id=1;"); + statement.execute("UPDATE inventory.full_types SET small_c=0 WHERE id<=2;"); } - waitForSinkSize("sink", 3); + waitForSinkSize("sink", 6); List<String> expected = Arrays.asList( "+I(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})", + "+I(2,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,1900-01-01T00:00:00.123,1900-01-01T00:00:00.123456,1900-01-01,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})", "-D(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})", - "+I(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})"); + "-D(2,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,1900-01-01T00:00:00.123,1900-01-01T00:00:00.123456,1900-01-01,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})", + "+I(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})", + "+I(2,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,1900-01-01T00:00:00.123,1900-01-01T00:00:00.123456,1900-01-01,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})"); List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); - Assertions.assertThat(actual).isEqualTo(expected); + Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); result.getJobClient().get().cancel().get(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql index 2d3005b8a..71b69a4a8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql @@ -56,4 +56,10 @@ INSERT INTO inventory.full_types VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true, 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17', '18:00:22', 500, 'SRID=3187;POINT(174.9479 -36.7208)'::geometry, + 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography); + +INSERT INTO inventory.full_types +VALUES (2, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true, + 'Hello World', 'a', 'abc', 'abcd..xyz', '1900-01-01 00:00:00.123', '1900-01-01 00:00:00.123456', + '1900-01-01', '18:00:22', 500, 'SRID=3187;POINT(174.9479 -36.7208)'::geometry, 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography); \ No newline at end of file
