This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch release-3.5
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.5 by this push:
     new 0b6c2e891 [FLINK-38398][pipeline-connector][postgresql] Improve 
PostgresSQL temporal field type supported with 
debezium.time.precision.mode=connect config (#4132)
0b6c2e891 is described below

commit 0b6c2e891e3f10d466ace660dbcbd0821eca30d9
Author: ouyangwulin <[email protected]>
AuthorDate: Mon Sep 22 18:53:59 2025 +0800

    [FLINK-38398][pipeline-connector][postgresql] Improve PostgresSQL temporal 
field type supported with debezium.time.precision.mode=connect config (#4132)
---
 .../connectors/pipeline-connectors/postgres.md     |  72 +++++++++++-
 .../connectors/pipeline-connectors/postgres.md     |  72 +++++++++++-
 .../postgres/utils/PostgresTypeUtils.java          |   2 +-
 .../postgres/source/PostgresFullTypesITCase.java   | 124 ++++++++++++++++++++-
 .../event/DebeziumEventDeserializationSchema.java  |  14 +++
 .../event/DebeziumSchemaDataTypeInference.java     |   9 +-
 6 files changed, 278 insertions(+), 15 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
index 5460a9fff..aa8ce7793 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
@@ -417,11 +417,11 @@ pipeline:
 - debezium.time.precision.mode=adaptive_time_microseconds
 - debezium.time.precision.mode=connect
 
-注意: 
受限当前CDC对时间类型的支持,<code>debezium.time.precision.mode</code>为adaptive或adaptive_time_microseconds或connect
 Time类型都转化为Integer类型,并精度为3,后续将进行完善。
+注意: 
受限当前CDC对时间类型Time的精度为3,<code>debezium.time.precision.mode</code>为adaptive或adaptive_time_microseconds或connect
 Time类型都转化为Time(3)类型。
 
 <u>debezium.time.precision.mode=adaptive</u>
 
-当<code>debezium.time.precision.mode</code>属性设置为默认的 
adaptive(自适应)时,连接器会根据列的数据类型定义来确定字面类型和语义类型。这可以确保事件能够精确地表示数据库中的值。
+当<code>debezium.time.precision.mode</code>属性设置为默认的 
adaptive(自适应)时,TIME的精度为3,TIMESTAMP的精度为6。
 <div class="wy-table-responsive">
 <table class="colwidths-auto docutils">
     <thead>
@@ -440,7 +440,7 @@ pipeline:
         <td>
           TIME([P])
         </td>
-        <td>INTEGER</td>
+        <td>TIME(3)</td>
       </tr>
       <tr>
         <td>
@@ -452,6 +452,72 @@ pipeline:
 </table>
 </div>
 
+<u>debezium.time.precision.mode=adaptive_time_microseconds</u>
+
+当<code>debezium.time.precision.mode</code>属性设置为默认的 
adaptive_time_microseconds时,TIME的精度为3,TIMESTAMP的精度为6。
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left">PostgreSQL type<a 
href="https://www.postgresql.org/docs/12/datatype.html";></a></th>
+        <th class="text-left">CDC type<a href="{% link dev/table/types.md 
%}"></a></th>
+      </tr>
+    </thead>
+    <tbody>
+       <tr>
+        <td>
+          DATE
+        <td>DATE</td>
+      </tr>
+      <tr>
+        <td>
+          TIME([P])
+        </td>
+        <td>TIME(3)</td>
+      </tr>
+      <tr>
+        <td>
+          TIMESTAMP([P])
+        </td>
+        <td>TIMESTAMP([P])</td>
+      </tr>
+    </tbody>
+</table>
+</div>
+
+<u>debezium.time.precision.mode=connect</u>
+
+当<code>debezium.time.precision.mode</code>属性设置为默认的 
connect时,TIME和TIMESTAMP的精度都为3。
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left">PostgreSQL type<a 
href="https://www.postgresql.org/docs/12/datatype.html";></a></th>
+        <th class="text-left">CDC type<a href="{% link dev/table/types.md 
%}"></a></th>
+      </tr>
+    </thead>
+    <tbody>
+       <tr>
+        <td>
+          DATE
+        <td>DATE</td>
+      </tr>
+      <tr>
+        <td>
+          TIME([P])
+        </td>
+        <td>TIME(3)</td>
+      </tr>
+      <tr>
+        <td>
+          TIMESTAMP([P])
+        </td>
+        <td>TIMESTAMP(3)</td>
+      </tr>
+    </tbody>
+</table>
+</div>
+
 ### Decimal types Mapping
 PostgreSQL 连接器配置属性 <code>debezium.decimal.handling.mode</code> 
的设置决定了连接器如何映射十进制类型。
 
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md 
b/docs/content/docs/connectors/pipeline-connectors/postgres.md
index ff72598dc..1a51486f0 100644
--- a/docs/content/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md
@@ -412,11 +412,11 @@ Other than PostgreSQL’s TIMESTAMPTZ data types, which 
contain time zone inform
 - debezium.time.precision.mode=adaptive_time_microseconds
 - debezium.time.precision.mode=connect 
 
-Note: Due to current CDC limitations in supporting time types, when 
<code>debezium.time.precision.mode</code> is set to "adaptive", 
"adaptive_time_microseconds", or when using Connect time types, all time values 
are converted to the Integer type with a precision of 3. This will be improved 
in future updates.
+Note: Due to the current CDC limitation, the precision for the TIME type is 
fixed at 3. Regardless of whether <code>debezium.time.precision.mode<code> is 
set to adaptive, adaptive_time_microseconds, or connect, the TIME type will be 
converted to TIME(3).
 
 <u>debezium.time.precision.mode=adaptive</u>
 
-When the <code>debezium.time.precision.mode</code> property is set to 
adaptive, the default, the connector determines the literal type and semantic 
type based on the column’s data type definition. This ensures that events 
exactly represent the values in the database.
+When the <code>debezium.time.precision.mode</code> property is set to the 
default value `adaptive_time_microseconds`, the precision of `TIME` is 3, and 
the precision of `TIMESTAMP` is 6.
 <div class="wy-table-responsive">
 <table class="colwidths-auto docutils">
     <thead>
@@ -435,7 +435,7 @@ When the <code>debezium.time.precision.mode</code> property 
is set to adaptive,
         <td>
           TIME([P])
         </td>
-        <td>INTEGER</td>
+        <td>TIME(3)</td>
       </tr>
       <tr>
         <td>
@@ -447,6 +447,72 @@ When the <code>debezium.time.precision.mode</code> 
property is set to adaptive,
 </table>
 </div>
 
+<u>debezium.time.precision.mode=adaptive_time_microseconds</u>
+
+When the `debezium.time.precision.mode` property is set to the value 
`adaptive_time_microseconds`, the precision of `TIME` is 3, and the precision 
of `TIMESTAMP` is 6.
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left">PostgreSQL type<a 
href="https://www.postgresql.org/docs/12/datatype.html";></a></th>
+        <th class="text-left">CDC type<a href="{% link dev/table/types.md 
%}"></a></th>
+      </tr>
+    </thead>
+    <tbody>
+       <tr>
+        <td>
+          DATE
+        <td>DATE</td>
+      </tr>
+      <tr>
+        <td>
+          TIME([P])
+        </td>
+        <td>TIME(3)</td>
+      </tr>
+      <tr>
+        <td>
+          TIMESTAMP([P])
+        </td>
+        <td>TIMESTAMP([P])</td>
+      </tr>
+    </tbody>
+</table>
+</div>
+
+<u>debezium.time.precision.mode=connect</u>
+
+When the <code>debezium.time.precision.mode</code> property is set to the 
default value connect, both TIME and TIMESTAMP have a precision of 3.
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left">PostgreSQL type<a 
href="https://www.postgresql.org/docs/12/datatype.html";></a></th>
+        <th class="text-left">CDC type<a href="{% link dev/table/types.md 
%}"></a></th>
+      </tr>
+    </thead>
+    <tbody>
+       <tr>
+        <td>
+          DATE
+        <td>DATE</td>
+      </tr>
+      <tr>
+        <td>
+          TIME([P])
+        </td>
+        <td>TIME(3)</td>
+      </tr>
+      <tr>
+        <td>
+          TIMESTAMP([P])
+        </td>
+        <td>TIMESTAMP(3)</td>
+      </tr>
+    </tbody>
+</table>
+</div>
+
 ### Decimal types Mapping
 The setting of the PostgreSQL connector configuration property 
<code>debezium.decimal.handling.mode</code> determines how the connector maps 
decimal types.
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
index 0465563fb..3a7a19be6 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
@@ -277,7 +277,7 @@ public class PostgresTypeUtils {
             case ADAPTIVE:
             case ADAPTIVE_TIME_MICROSECONDS:
             case CONNECT:
-                return DataTypes.INT();
+                return DataTypes.TIME(scale);
             default:
                 throw new IllegalArgumentException("Unknown temporal precision 
mode: " + mode);
         }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
index fdd4fd253..0dac3c153 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
@@ -67,6 +67,7 @@ import java.sql.Statement;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -289,9 +290,9 @@ public class PostgresFullTypesITCase extends 
PostgresTestBase {
                 new Object[] {
                     2,
                     DateData.fromEpochDay(18460),
-                    64822000,
-                    64822123,
-                    64822123,
+                    TimeData.fromLocalTime(LocalTime.parse("18:00:22")),
+                    TimeData.fromLocalTime(LocalTime.parse("18:00:22.123")),
+                    TimeData.fromLocalTime(LocalTime.parse("18:00:22.123456")),
                     
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
                     
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
                     TimestampData.fromLocalDateTime(
@@ -306,6 +307,117 @@ public class PostgresFullTypesITCase extends 
PostgresTestBase {
                 .isEqualTo(expectedSnapshot);
     }
 
+    @Test
+    public void testTimeTypesWithTemporalModeMicroSeconds() throws Exception {
+        initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+
+        Properties debeziumProps = new Properties();
+        debeziumProps.setProperty("time.precision.mode", 
"adaptive_time_microseconds");
+
+        PostgresSourceConfigFactory configFactory =
+                (PostgresSourceConfigFactory)
+                        new PostgresSourceConfigFactory()
+                                .hostname(POSTGIS_CONTAINER.getHost())
+                                
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+                                .username(TEST_USER)
+                                .password(TEST_PASSWORD)
+                                
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
+                                .tableList("inventory.time_types")
+                                .startupOptions(StartupOptions.initial())
+                                .debeziumProperties(debeziumProps)
+                                .serverTimeZone("UTC");
+        configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+        configFactory.slotName(slotName);
+        configFactory.decodingPluginName("pgoutput");
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider)
+                        new 
PostgresDataSource(configFactory).getEventSourceProvider();
+
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                PostgresDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+
+        Object[] expectedSnapshot =
+                new Object[] {
+                    2,
+                    DateData.fromEpochDay(18460),
+                    TimeData.fromLocalTime(LocalTime.parse("18:00:22")),
+                    TimeData.fromLocalTime(LocalTime.parse("18:00:22.123")),
+                    TimeData.fromLocalTime(LocalTime.parse("18:00:22.123456")),
+                    
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
+                    
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
+                    TimestampData.fromLocalDateTime(
+                            LocalDateTime.parse("2020-07-17T18:00:22.123456")),
+                    
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
+                    LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 
18:00:22")),
+                };
+
+        List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 
1).f0;
+        RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(0)).after();
+        Assertions.assertThat(recordFields(snapshotRecord, 
TIME_TYPES_WITH_ADAPTIVE))
+                .isEqualTo(expectedSnapshot);
+    }
+
+    @Test
+    public void testTimeTypesWithTemporalModeConnect() throws Exception {
+        initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+
+        Properties debeziumProps = new Properties();
+        debeziumProps.setProperty("time.precision.mode", "connect");
+
+        PostgresSourceConfigFactory configFactory =
+                (PostgresSourceConfigFactory)
+                        new PostgresSourceConfigFactory()
+                                .hostname(POSTGIS_CONTAINER.getHost())
+                                
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+                                .username(TEST_USER)
+                                .password(TEST_PASSWORD)
+                                
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
+                                .tableList("inventory.time_types")
+                                .startupOptions(StartupOptions.initial())
+                                .debeziumProperties(debeziumProps)
+                                .serverTimeZone("UTC");
+        configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+        configFactory.slotName(slotName);
+        configFactory.decodingPluginName("pgoutput");
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider)
+                        new 
PostgresDataSource(configFactory).getEventSourceProvider();
+
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                PostgresDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+
+        Object[] expectedSnapshot =
+                new Object[] {
+                    2,
+                    DateData.fromEpochDay(18460),
+                    TimeData.fromLocalTime(LocalTime.parse("18:00:22")),
+                    TimeData.fromLocalTime(LocalTime.parse("18:00:22.123")),
+                    TimeData.fromLocalTime(LocalTime.parse("18:00:22.123")),
+                    
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
+                    
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
+                    
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
+                    
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
+                    LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 
18:00:22")),
+                };
+
+        List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 
1).f0;
+        RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(0)).after();
+        Assertions.assertThat(recordFields(snapshotRecord, 
TIME_TYPES_WITH_ADAPTIVE))
+                .isEqualTo(expectedSnapshot);
+    }
+
     @Test
     public void testHandlingDecimalModePrecise() throws Exception {
         initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test");
@@ -923,9 +1035,9 @@ public class PostgresFullTypesITCase extends 
PostgresTestBase {
             RowType.of(
                     DataTypes.INT(),
                     DataTypes.DATE(),
-                    DataTypes.INT(),
-                    DataTypes.INT(),
-                    DataTypes.INT(),
+                    DataTypes.TIME(0),
+                    DataTypes.TIME(3),
+                    DataTypes.TIME(6),
                     DataTypes.TIMESTAMP(0),
                     DataTypes.TIMESTAMP(3),
                     DataTypes.TIMESTAMP(6),
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
index 8426fc337..c484709ff 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
@@ -66,6 +66,7 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -313,6 +314,10 @@ public abstract class DebeziumEventDeserializationSchema 
extends SourceRecordEve
     }
 
     protected Object convertToDate(Object dbzObj, Schema schema) {
+        if (dbzObj instanceof Date) {
+            Instant instant = ((Date) dbzObj).toInstant();
+            return 
DateData.fromLocalDate(instant.atZone(java.time.ZoneOffset.UTC).toLocalDate());
+        }
         return DateData.fromLocalDate(TemporalConversions.toLocalDate(dbzObj));
     }
 
@@ -326,6 +331,9 @@ public abstract class DebeziumEventDeserializationSchema 
extends SourceRecordEve
             }
         } else if (dbzObj instanceof Integer) {
             return TimeData.fromMillisOfDay((int) dbzObj);
+        } else if (dbzObj instanceof Date) {
+            long millisOfDay = ((Date) dbzObj).getTime() % (24 * 60 * 60 * 
1000);
+            return TimeData.fromMillisOfDay((int) millisOfDay);
         }
         // get number of milliseconds of the day
         return TimeData.fromLocalTime(TemporalConversions.toLocalTime(dbzObj));
@@ -346,6 +354,12 @@ public abstract class DebeziumEventDeserializationSchema 
extends SourceRecordEve
                             Math.floorDiv(nano, 1000_000), (int) 
(Math.floorMod(nano, 1000_000)));
             }
         }
+        if (dbzObj instanceof Date) {
+            if 
(schema.name().equals(org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME)) {
+                Instant instant = ((Date) dbzObj).toInstant();
+                return TimestampData.fromMillis(instant.toEpochMilli());
+            }
+        }
         throw new IllegalArgumentException(
                 "Unable to convert to TIMESTAMP from unexpected value '"
                         + dbzObj
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
index 8b9ae71c2..0181f3fce 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
@@ -104,12 +104,16 @@ public class DebeziumSchemaDataTypeInference implements 
SchemaDataTypeInference,
     }
 
     protected DataType inferInt32(Object value, Schema schema) {
-        if (Date.SCHEMA_NAME.equals(schema.name())) {
+        if (Date.SCHEMA_NAME.equals(schema.name())
+                || 
org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(schema.name())) {
             return DataTypes.DATE();
         }
         if (Time.SCHEMA_NAME.equals(schema.name())) {
             return DataTypes.TIME(3);
         }
+        if 
(org.apache.kafka.connect.data.Time.LOGICAL_NAME.equals(schema.name())) {
+            return DataTypes.TIME(3);
+        }
         return DataTypes.INT();
     }
 
@@ -120,7 +124,8 @@ public class DebeziumSchemaDataTypeInference implements 
SchemaDataTypeInference,
         if (NanoTime.SCHEMA_NAME.equals(schema.name())) {
             return DataTypes.TIME(9);
         }
-        if (Timestamp.SCHEMA_NAME.equals(schema.name())) {
+        if (Timestamp.SCHEMA_NAME.equals(schema.name())
+                || 
org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME.equals(schema.name())) {
             return DataTypes.TIMESTAMP(3);
         }
         if (MicroTimestamp.SCHEMA_NAME.equals(schema.name())) {

Reply via email to