This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch FLINK-38835
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 2f32836a783f80f295c9dce339c11afec2a32dc2
Author: lvyanquan <[email protected]>
AuthorDate: Thu Jan 8 15:13:06 2026 +0800

    [FLINK-38835][postgres] Fix timestamp conversion to LocalDateTime for dates 
before 1970-01-01.
---
 .../postgresql/CustomPostgresValueConverter.java   | 99 ++++++++++++++++++++++
 .../connector/postgresql/PostgresObjectUtils.java  |  2 +-
 .../postgresql/connection/PostgresConnection.java  |  4 +-
 .../postgres/table/PostgreSQLConnectorITCase.java  | 11 ++-
 .../src/test/resources/ddl/column_type_test.sql    |  6 ++
 5 files changed, 116 insertions(+), 6 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/CustomPostgresValueConverter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/CustomPostgresValueConverter.java
new file mode 100644
index 000000000..b12bc48d7
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/CustomPostgresValueConverter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.postgresql;
+
+import io.debezium.config.CommonConnectorConfig;
+import io.debezium.jdbc.TemporalPrecisionMode;
+import io.debezium.relational.Column;
+import org.apache.kafka.connect.data.Field;
+
+import java.nio.charset.Charset;
+import java.sql.Timestamp;
+import java.time.ZoneOffset;
+
+/**
+ * A custom PostgresValueConverter that correctly handles timestamp conversion 
to LocalDateTime for
+ * dates before 1970-01-01.
+ */
+public class CustomPostgresValueConverter extends PostgresValueConverter {
+    protected CustomPostgresValueConverter(
+            Charset databaseCharset,
+            DecimalMode decimalMode,
+            TemporalPrecisionMode temporalPrecisionMode,
+            ZoneOffset defaultOffset,
+            BigIntUnsignedMode bigIntUnsignedMode,
+            boolean includeUnknownDatatypes,
+            TypeRegistry typeRegistry,
+            PostgresConnectorConfig.HStoreHandlingMode hStoreMode,
+            CommonConnectorConfig.BinaryHandlingMode binaryMode,
+            PostgresConnectorConfig.IntervalHandlingMode intervalMode,
+            byte[] toastPlaceholder,
+            int moneyFractionDigits) {
+        super(
+                databaseCharset,
+                decimalMode,
+                temporalPrecisionMode,
+                defaultOffset,
+                bigIntUnsignedMode,
+                includeUnknownDatatypes,
+                typeRegistry,
+                hStoreMode,
+                binaryMode,
+                intervalMode,
+                toastPlaceholder,
+                moneyFractionDigits);
+    }
+
+    public static CustomPostgresValueConverter of(
+            PostgresConnectorConfig connectorConfig,
+            Charset databaseCharset,
+            TypeRegistry typeRegistry) {
+        return new CustomPostgresValueConverter(
+                databaseCharset,
+                connectorConfig.getDecimalMode(),
+                connectorConfig.getTemporalPrecisionMode(),
+                ZoneOffset.UTC,
+                null,
+                connectorConfig.includeUnknownDatatypes(),
+                typeRegistry,
+                connectorConfig.hStoreHandlingMode(),
+                connectorConfig.binaryHandlingMode(),
+                connectorConfig.intervalHandlingMode(),
+                connectorConfig.getUnavailableValuePlaceholder(),
+                connectorConfig.moneyFractionDigits());
+    }
+
+    @Override
+    protected Object convertTimestampToLocalDateTime(Column column, Field 
fieldDefn, Object data) {
+        if (data == null) {
+            return null;
+        }
+        if (!(data instanceof Timestamp)) {
+            return data;
+        }
+        final Timestamp timestamp = (Timestamp) data;
+
+        if (POSITIVE_INFINITY_TIMESTAMP.equals(timestamp)) {
+            return POSITIVE_INFINITY_LOCAL_DATE_TIME;
+        } else if (NEGATIVE_INFINITY_TIMESTAMP.equals(timestamp)) {
+            return NEGATIVE_INFINITY_LOCAL_DATE_TIME;
+        }
+
+        return timestamp.toLocalDateTime();
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java
index c50db0435..e8730c144 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java
@@ -81,7 +81,7 @@ public class PostgresObjectUtils {
     public static PostgresConnection.PostgresValueConverterBuilder 
newPostgresValueConverterBuilder(
             PostgresConnectorConfig config) {
         return typeRegistry ->
-                PostgresValueConverter.of(config, StandardCharsets.UTF_8, 
typeRegistry);
+                CustomPostgresValueConverter.of(config, 
StandardCharsets.UTF_8, typeRegistry);
     }
 
     // modified from
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
index 687dcc9ae..eec9ac19f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
@@ -10,6 +10,7 @@ import com.zaxxer.hikari.pool.HikariProxyConnection;
 import io.debezium.DebeziumException;
 import io.debezium.annotation.VisibleForTesting;
 import io.debezium.config.Configuration;
+import io.debezium.connector.postgresql.CustomPostgresValueConverter;
 import io.debezium.connector.postgresql.PgOid;
 import io.debezium.connector.postgresql.PostgresConnectorConfig;
 import io.debezium.connector.postgresql.PostgresSchema;
@@ -188,7 +189,8 @@ public class PostgresConnection extends JdbcConnection {
         } else {
             this.typeRegistry = typeRegistry;
             final PostgresValueConverter valueConverter =
-                    PostgresValueConverter.of(config, 
this.getDatabaseCharset(), typeRegistry);
+                    CustomPostgresValueConverter.of(
+                            config, this.getDatabaseCharset(), typeRegistry);
             this.defaultValueConverter =
                     new PostgresDefaultValueConverter(valueConverter, 
this.getTimestampUtils());
         }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
index 7f4329be0..b6377e073 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
@@ -689,18 +689,21 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
         // generate WAL
         try (Connection connection = getJdbcConnection(POSTGIS_CONTAINER);
                 Statement statement = connection.createStatement()) {
-            statement.execute("UPDATE inventory.full_types SET small_c=0 WHERE 
id=1;");
+            statement.execute("UPDATE inventory.full_types SET small_c=0 WHERE 
id<=2;");
         }
 
-        waitForSinkSize("sink", 3);
+        waitForSinkSize("sink", 6);
 
         List<String> expected =
                 Arrays.asList(
                         
"+I(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello 
World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
+                        
"+I(2,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello 
World,a,abc,abcd..xyz,1900-01-01T00:00:00.123,1900-01-01T00:00:00.123456,1900-01-01,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
                         
"-D(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello 
World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
-                        
"+I(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello 
World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})");
+                        
"-D(2,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello 
World,a,abc,abcd..xyz,1900-01-01T00:00:00.123,1900-01-01T00:00:00.123456,1900-01-01,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
+                        
"+I(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello 
World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
+                        
"+I(2,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello 
World,a,abc,abcd..xyz,1900-01-01T00:00:00.123,1900-01-01T00:00:00.123456,1900-01-01,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})");
         List<String> actual = 
TestValuesTableFactory.getRawResultsAsStrings("sink");
-        Assertions.assertThat(actual).isEqualTo(expected);
+        
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
 
         result.getJobClient().get().cancel().get();
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
index 2d3005b8a..71b69a4a8 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
@@ -56,4 +56,10 @@ INSERT INTO inventory.full_types
 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
         'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
         '2020-07-17', '18:00:22', 500, 'SRID=3187;POINT(174.9479 
-36.7208)'::geometry,
+        'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);
+
+INSERT INTO inventory.full_types
+VALUES (2, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
+        'Hello World', 'a', 'abc', 'abcd..xyz', '1900-01-01 00:00:00.123', 
'1900-01-01 00:00:00.123456',
+        '1900-01-01', '18:00:22', 500, 'SRID=3187;POINT(174.9479 
-36.7208)'::geometry,
         'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);
\ No newline at end of file

Reply via email to