This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch FLINK-38185
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit b164f7c9e3f42a412cf373ee38148526d9ddb999
Author: lvyanquan <lvyanquan....@alibaba-inc.com>
AuthorDate: Mon Aug 4 19:37:40 2025 +0800

    [FLINK-38185][pipeline-connector][iceberg] Correctly handle the type 
conversion of TIMESTAMP_TITH_TIME_ZONE.
---
 .../flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java | 4 ++--
 .../flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java   | 8 ++++----
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
index 470e7b15a..f24ead0df 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
@@ -179,9 +179,9 @@ public class IcebergTypeUtils {
             case TIMESTAMP_WITH_TIME_ZONE:
                 fieldGetter =
                         (row) ->
-                                TimestampData.fromTimestamp(
+                                TimestampData.fromInstant(
                                         row.getZonedTimestamp(fieldPos, 
getPrecision(fieldType))
-                                                .toTimestamp());
+                                                .toInstant());
                 break;
             case ROW:
                 final int rowFieldCount = getFieldCount(fieldType);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
index 6b3305ff1..3512186d4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
@@ -273,8 +273,8 @@ public class IcebergWriterTest {
         Catalog catalog =
                 CatalogUtil.buildIcebergCatalog(
                         "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
-        IcebergWriter icebergWriter =
-                new IcebergWriter(catalogOptions, 1, 1, 
ZoneId.systemDefault());
+        ZoneId pipelineZoneId = ZoneId.systemDefault();
+        IcebergWriter icebergWriter = new IcebergWriter(catalogOptions, 1, 1, 
pipelineZoneId);
         IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
         TableId tableId = TableId.parse("test.iceberg_table");
 
@@ -330,7 +330,7 @@ public class IcebergWriterTest {
                             
LocalZonedTimestampData.fromInstant(Instant.ofEpochSecond(0)),
                             ZonedTimestampData.fromZonedDateTime(
                                     ZonedDateTime.ofInstant(
-                                            Instant.ofEpochSecond(0), 
ZoneId.of("Asia/Shanghai")))
+                                            Instant.ofEpochSecond(0), 
pipelineZoneId))
                         });
         DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId, 
record1);
         icebergWriter.write(dataChangeEvent, null);
@@ -342,7 +342,7 @@ public class IcebergWriterTest {
         List<String> result = fetchTableContent(catalog, tableId);
         Assertions.assertThat(result)
                 .containsExactlyInAnyOrder(
-                        "char, varchar, string, false, [1,2,3,4,5,], 
[1,2,3,4,5,6,7,8,9,10,], 0.00, true, 2, 12345, 12345, 123.456, 123456.789, 
00:00:12.345, 2003-10-20, 1970-01-01T00:00, 1970-01-01T00:00Z, 
1970-01-01T08:00Z");
+                        "char, varchar, string, false, [1,2,3,4,5,], 
[1,2,3,4,5,6,7,8,9,10,], 0.00, true, 2, 12345, 12345, 123.456, 123456.789, 
00:00:12.345, 2003-10-20, 1970-01-01T00:00, 1970-01-01T00:00Z, 
1970-01-01T00:00Z");
     }
 
     /** Mock CommitRequestImpl. */

Reply via email to