This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch release-3.5
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/release-3.5 by this push:
new 0b6c2e891 [FLINK-38398][pipeline-connector][postgresql] Improve
PostgresSQL temporal field type supported with
debezium.time.precision.mode=connect config (#4132)
0b6c2e891 is described below
commit 0b6c2e891e3f10d466ace660dbcbd0821eca30d9
Author: ouyangwulin <[email protected]>
AuthorDate: Mon Sep 22 18:53:59 2025 +0800
[FLINK-38398][pipeline-connector][postgresql] Improve PostgresSQL temporal
field type supported with debezium.time.precision.mode=connect config (#4132)
---
.../connectors/pipeline-connectors/postgres.md | 72 +++++++++++-
.../connectors/pipeline-connectors/postgres.md | 72 +++++++++++-
.../postgres/utils/PostgresTypeUtils.java | 2 +-
.../postgres/source/PostgresFullTypesITCase.java | 124 ++++++++++++++++++++-
.../event/DebeziumEventDeserializationSchema.java | 14 +++
.../event/DebeziumSchemaDataTypeInference.java | 9 +-
6 files changed, 278 insertions(+), 15 deletions(-)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
index 5460a9fff..aa8ce7793 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
@@ -417,11 +417,11 @@ pipeline:
- debezium.time.precision.mode=adaptive_time_microseconds
- debezium.time.precision.mode=connect
-注意:
受限当前CDC对时间类型的支持,<code>debezium.time.precision.mode</code>为adaptive或adaptive_time_microseconds或connect
Time类型都转化为Integer类型,并精度为3,后续将进行完善。
+注意:
受限当前CDC对时间类型Time的精度为3,<code>debezium.time.precision.mode</code>为adaptive或adaptive_time_microseconds或connect
Time类型都转化为Time(3)类型。
<u>debezium.time.precision.mode=adaptive</u>
-当<code>debezium.time.precision.mode</code>属性设置为默认的
adaptive(自适应)时,连接器会根据列的数据类型定义来确定字面类型和语义类型。这可以确保事件能够精确地表示数据库中的值。
+当<code>debezium.time.precision.mode</code>属性设置为默认的
adaptive(自适应)时,TIME的精度为3,TIMESTAMP的精度为6。
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
@@ -440,7 +440,7 @@ pipeline:
<td>
TIME([P])
</td>
- <td>INTEGER</td>
+ <td>TIME(3)</td>
</tr>
<tr>
<td>
@@ -452,6 +452,72 @@ pipeline:
</table>
</div>
+<u>debezium.time.precision.mode=adaptive_time_microseconds</u>
+
+当<code>debezium.time.precision.mode</code>属性设置为默认的
adaptive_time_microseconds时,TIME的精度为3,TIMESTAMP的精度为6。
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+ <thead>
+ <tr>
+ <th class="text-left">PostgreSQL type<a
href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
+ <th class="text-left">CDC type<a href="{% link dev/table/types.md
%}"></a></th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>
+ DATE
+ <td>DATE</td>
+ </tr>
+ <tr>
+ <td>
+ TIME([P])
+ </td>
+ <td>TIME(3)</td>
+ </tr>
+ <tr>
+ <td>
+ TIMESTAMP([P])
+ </td>
+ <td>TIMESTAMP([P])</td>
+ </tr>
+ </tbody>
+</table>
+</div>
+
+<u>debezium.time.precision.mode=connect</u>
+
+当<code>debezium.time.precision.mode</code>属性设置为默认的
connect时,TIME和TIMESTAMP的精度都为3。
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+ <thead>
+ <tr>
+ <th class="text-left">PostgreSQL type<a
href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
+ <th class="text-left">CDC type<a href="{% link dev/table/types.md
%}"></a></th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>
+ DATE
+ <td>DATE</td>
+ </tr>
+ <tr>
+ <td>
+ TIME([P])
+ </td>
+ <td>TIME(3)</td>
+ </tr>
+ <tr>
+ <td>
+ TIMESTAMP([P])
+ </td>
+ <td>TIMESTAMP(3)</td>
+ </tr>
+ </tbody>
+</table>
+</div>
+
### Decimal types Mapping
PostgreSQL 连接器配置属性 <code>debezium.decimal.handling.mode</code>
的设置决定了连接器如何映射十进制类型。
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md
b/docs/content/docs/connectors/pipeline-connectors/postgres.md
index ff72598dc..1a51486f0 100644
--- a/docs/content/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md
@@ -412,11 +412,11 @@ Other than PostgreSQL’s TIMESTAMPTZ data types, which
contain time zone inform
- debezium.time.precision.mode=adaptive_time_microseconds
- debezium.time.precision.mode=connect
-Note: Due to current CDC limitations in supporting time types, when
<code>debezium.time.precision.mode</code> is set to "adaptive",
"adaptive_time_microseconds", or when using Connect time types, all time values
are converted to the Integer type with a precision of 3. This will be improved
in future updates.
+Note: Due to the current CDC limitation, the precision for the TIME type is
fixed at 3. Regardless of whether <code>debezium.time.precision.mode<code> is
set to adaptive, adaptive_time_microseconds, or connect, the TIME type will be
converted to TIME(3).
<u>debezium.time.precision.mode=adaptive</u>
-When the <code>debezium.time.precision.mode</code> property is set to
adaptive, the default, the connector determines the literal type and semantic
type based on the column’s data type definition. This ensures that events
exactly represent the values in the database.
+When the <code>debezium.time.precision.mode</code> property is set to the
default value `adaptive_time_microseconds`, the precision of `TIME` is 3, and
the precision of `TIMESTAMP` is 6.
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
@@ -435,7 +435,7 @@ When the <code>debezium.time.precision.mode</code> property
is set to adaptive,
<td>
TIME([P])
</td>
- <td>INTEGER</td>
+ <td>TIME(3)</td>
</tr>
<tr>
<td>
@@ -447,6 +447,72 @@ When the <code>debezium.time.precision.mode</code>
property is set to adaptive,
</table>
</div>
+<u>debezium.time.precision.mode=adaptive_time_microseconds</u>
+
+When the `debezium.time.precision.mode` property is set to the value
`adaptive_time_microseconds`, the precision of `TIME` is 3, and the precision
of `TIMESTAMP` is 6.
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+ <thead>
+ <tr>
+ <th class="text-left">PostgreSQL type<a
href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
+ <th class="text-left">CDC type<a href="{% link dev/table/types.md
%}"></a></th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>
+ DATE
+ <td>DATE</td>
+ </tr>
+ <tr>
+ <td>
+ TIME([P])
+ </td>
+ <td>TIME(3)</td>
+ </tr>
+ <tr>
+ <td>
+ TIMESTAMP([P])
+ </td>
+ <td>TIMESTAMP([P])</td>
+ </tr>
+ </tbody>
+</table>
+</div>
+
+<u>debezium.time.precision.mode=connect</u>
+
+When the <code>debezium.time.precision.mode</code> property is set to the
default value connect, both TIME and TIMESTAMP have a precision of 3.
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+ <thead>
+ <tr>
+ <th class="text-left">PostgreSQL type<a
href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
+ <th class="text-left">CDC type<a href="{% link dev/table/types.md
%}"></a></th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>
+ DATE
+ <td>DATE</td>
+ </tr>
+ <tr>
+ <td>
+ TIME([P])
+ </td>
+ <td>TIME(3)</td>
+ </tr>
+ <tr>
+ <td>
+ TIMESTAMP([P])
+ </td>
+ <td>TIMESTAMP(3)</td>
+ </tr>
+ </tbody>
+</table>
+</div>
+
### Decimal types Mapping
The setting of the PostgreSQL connector configuration property
<code>debezium.decimal.handling.mode</code> determines how the connector maps
decimal types.
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
index 0465563fb..3a7a19be6 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
@@ -277,7 +277,7 @@ public class PostgresTypeUtils {
case ADAPTIVE:
case ADAPTIVE_TIME_MICROSECONDS:
case CONNECT:
- return DataTypes.INT();
+ return DataTypes.TIME(scale);
default:
throw new IllegalArgumentException("Unknown temporal precision
mode: " + mode);
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
index fdd4fd253..0dac3c153 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
@@ -67,6 +67,7 @@ import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
@@ -289,9 +290,9 @@ public class PostgresFullTypesITCase extends
PostgresTestBase {
new Object[] {
2,
DateData.fromEpochDay(18460),
- 64822000,
- 64822123,
- 64822123,
+ TimeData.fromLocalTime(LocalTime.parse("18:00:22")),
+ TimeData.fromLocalTime(LocalTime.parse("18:00:22.123")),
+ TimeData.fromLocalTime(LocalTime.parse("18:00:22.123456")),
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
TimestampData.fromLocalDateTime(
@@ -306,6 +307,117 @@ public class PostgresFullTypesITCase extends
PostgresTestBase {
.isEqualTo(expectedSnapshot);
}
+ @Test
+ public void testTimeTypesWithTemporalModeMicroSeconds() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty("time.precision.mode",
"adaptive_time_microseconds");
+
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("inventory.time_types")
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(debeziumProps)
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new
PostgresDataSource(configFactory).getEventSourceProvider();
+
+ CloseableIterator<Event> events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ Object[] expectedSnapshot =
+ new Object[] {
+ 2,
+ DateData.fromEpochDay(18460),
+ TimeData.fromLocalTime(LocalTime.parse("18:00:22")),
+ TimeData.fromLocalTime(LocalTime.parse("18:00:22.123")),
+ TimeData.fromLocalTime(LocalTime.parse("18:00:22.123456")),
+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.parse("2020-07-17T18:00:22.123456")),
+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
+ LocalZonedTimestampData.fromInstant(toInstant("2020-07-17
18:00:22")),
+ };
+
+ List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events,
1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent)
snapshotResults.get(0)).after();
+ Assertions.assertThat(recordFields(snapshotRecord,
TIME_TYPES_WITH_ADAPTIVE))
+ .isEqualTo(expectedSnapshot);
+ }
+
+ @Test
+ public void testTimeTypesWithTemporalModeConnect() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty("time.precision.mode", "connect");
+
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("inventory.time_types")
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(debeziumProps)
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new
PostgresDataSource(configFactory).getEventSourceProvider();
+
+ CloseableIterator<Event> events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ Object[] expectedSnapshot =
+ new Object[] {
+ 2,
+ DateData.fromEpochDay(18460),
+ TimeData.fromLocalTime(LocalTime.parse("18:00:22")),
+ TimeData.fromLocalTime(LocalTime.parse("18:00:22.123")),
+ TimeData.fromLocalTime(LocalTime.parse("18:00:22.123")),
+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
+ LocalZonedTimestampData.fromInstant(toInstant("2020-07-17
18:00:22")),
+ };
+
+ List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events,
1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent)
snapshotResults.get(0)).after();
+ Assertions.assertThat(recordFields(snapshotRecord,
TIME_TYPES_WITH_ADAPTIVE))
+ .isEqualTo(expectedSnapshot);
+ }
+
@Test
public void testHandlingDecimalModePrecise() throws Exception {
initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test");
@@ -923,9 +1035,9 @@ public class PostgresFullTypesITCase extends
PostgresTestBase {
RowType.of(
DataTypes.INT(),
DataTypes.DATE(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
+ DataTypes.TIME(0),
+ DataTypes.TIME(3),
+ DataTypes.TIME(6),
DataTypes.TIMESTAMP(0),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(6),
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
index 8426fc337..c484709ff 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
@@ -66,6 +66,7 @@ import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -313,6 +314,10 @@ public abstract class DebeziumEventDeserializationSchema
extends SourceRecordEve
}
protected Object convertToDate(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Date) {
+ Instant instant = ((Date) dbzObj).toInstant();
+ return
DateData.fromLocalDate(instant.atZone(java.time.ZoneOffset.UTC).toLocalDate());
+ }
return DateData.fromLocalDate(TemporalConversions.toLocalDate(dbzObj));
}
@@ -326,6 +331,9 @@ public abstract class DebeziumEventDeserializationSchema
extends SourceRecordEve
}
} else if (dbzObj instanceof Integer) {
return TimeData.fromMillisOfDay((int) dbzObj);
+ } else if (dbzObj instanceof Date) {
+ long millisOfDay = ((Date) dbzObj).getTime() % (24 * 60 * 60 *
1000);
+ return TimeData.fromMillisOfDay((int) millisOfDay);
}
// get number of milliseconds of the day
return TimeData.fromLocalTime(TemporalConversions.toLocalTime(dbzObj));
@@ -346,6 +354,12 @@ public abstract class DebeziumEventDeserializationSchema
extends SourceRecordEve
Math.floorDiv(nano, 1000_000), (int)
(Math.floorMod(nano, 1000_000)));
}
}
+ if (dbzObj instanceof Date) {
+ if
(schema.name().equals(org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME)) {
+ Instant instant = ((Date) dbzObj).toInstant();
+ return TimestampData.fromMillis(instant.toEpochMilli());
+ }
+ }
throw new IllegalArgumentException(
"Unable to convert to TIMESTAMP from unexpected value '"
+ dbzObj
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
index 8b9ae71c2..0181f3fce 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
@@ -104,12 +104,16 @@ public class DebeziumSchemaDataTypeInference implements
SchemaDataTypeInference,
}
protected DataType inferInt32(Object value, Schema schema) {
- if (Date.SCHEMA_NAME.equals(schema.name())) {
+ if (Date.SCHEMA_NAME.equals(schema.name())
+ ||
org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(schema.name())) {
return DataTypes.DATE();
}
if (Time.SCHEMA_NAME.equals(schema.name())) {
return DataTypes.TIME(3);
}
+ if
(org.apache.kafka.connect.data.Time.LOGICAL_NAME.equals(schema.name())) {
+ return DataTypes.TIME(3);
+ }
return DataTypes.INT();
}
@@ -120,7 +124,8 @@ public class DebeziumSchemaDataTypeInference implements
SchemaDataTypeInference,
if (NanoTime.SCHEMA_NAME.equals(schema.name())) {
return DataTypes.TIME(9);
}
- if (Timestamp.SCHEMA_NAME.equals(schema.name())) {
+ if (Timestamp.SCHEMA_NAME.equals(schema.name())
+ ||
org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME.equals(schema.name())) {
return DataTypes.TIMESTAMP(3);
}
if (MicroTimestamp.SCHEMA_NAME.equals(schema.name())) {