This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch FLINK-38185 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit b164f7c9e3f42a412cf373ee38148526d9ddb999 Author: lvyanquan <lvyanquan....@alibaba-inc.com> AuthorDate: Mon Aug 4 19:37:40 2025 +0800 [FLINK-38185][pipeline-connector][iceberg] Correctly handle the type conversion of TIMESTAMP_TITH_TIME_ZONE. --- .../flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java | 4 ++-- .../flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java index 470e7b15a..f24ead0df 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java @@ -179,9 +179,9 @@ public class IcebergTypeUtils { case TIMESTAMP_WITH_TIME_ZONE: fieldGetter = (row) -> - TimestampData.fromTimestamp( + TimestampData.fromInstant( row.getZonedTimestamp(fieldPos, getPrecision(fieldType)) - .toTimestamp()); + .toInstant()); break; case ROW: final int rowFieldCount = getFieldCount(fieldType); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java index 6b3305ff1..3512186d4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java @@ -273,8 +273,8 @@ public class IcebergWriterTest { Catalog catalog = CatalogUtil.buildIcebergCatalog( "cdc-iceberg-catalog", catalogOptions, new Configuration()); - IcebergWriter icebergWriter = - new IcebergWriter(catalogOptions, 1, 1, ZoneId.systemDefault()); + ZoneId pipelineZoneId = ZoneId.systemDefault(); + IcebergWriter icebergWriter = new IcebergWriter(catalogOptions, 1, 1, pipelineZoneId); IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions); TableId tableId = TableId.parse("test.iceberg_table"); @@ -330,7 +330,7 @@ public class IcebergWriterTest { LocalZonedTimestampData.fromInstant(Instant.ofEpochSecond(0)), ZonedTimestampData.fromZonedDateTime( ZonedDateTime.ofInstant( - Instant.ofEpochSecond(0), ZoneId.of("Asia/Shanghai"))) + Instant.ofEpochSecond(0), pipelineZoneId)) }); DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId, record1); icebergWriter.write(dataChangeEvent, null); @@ -342,7 +342,7 @@ public class IcebergWriterTest { List<String> result = fetchTableContent(catalog, tableId); Assertions.assertThat(result) .containsExactlyInAnyOrder( - "char, varchar, string, false, [1,2,3,4,5,], [1,2,3,4,5,6,7,8,9,10,], 0.00, true, 2, 12345, 12345, 123.456, 123456.789, 00:00:12.345, 2003-10-20, 1970-01-01T00:00, 1970-01-01T00:00Z, 1970-01-01T08:00Z"); + "char, varchar, string, false, [1,2,3,4,5,], [1,2,3,4,5,6,7,8,9,10,], 0.00, true, 2, 12345, 12345, 123.456, 123456.789, 00:00:12.345, 2003-10-20, 1970-01-01T00:00, 1970-01-01T00:00Z, 1970-01-01T00:00Z"); } /** Mock CommitRequestImpl. */