[
https://issues.apache.org/jira/browse/FLINK-39415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Avichay Marciano updated FLINK-39415:
-------------------------------------
Environment:
Validated end-to-end on EKS with Flink 1.20 + CDC 3.6.0: TIMESTAMPTZ data
successfully written to Apache Iceberg (S3 Tables) with correct microsecond
precision and UTC timezone.
Pull Request - https://github.com/apache/flink-cdc/pull/4371
was:Validated end-to-end on EKS with Flink 1.20 + CDC 3.6.0: TIMESTAMPTZ data
successfully written to Apache Iceberg (S3 Tables) with correct microsecond
precision and UTC timezone.
> [flink-cdc-postgres] TIMESTAMPTZ type mapping causes NumberFormatException in
> Pipeline connector with Iceberg sink
> ------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39415
> URL: https://issues.apache.org/jira/browse/FLINK-39415
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.6.0
> Environment: Validated end-to-end on EKS with Flink 1.20 + CDC 3.6.0:
> TIMESTAMPTZ data successfully written to Apache Iceberg (S3 Tables) with
> correct microsecond precision and UTC timezone.
> Pull Request - https://github.com/apache/flink-cdc/pull/4371
> Reporter: Avichay Marciano
> Priority: Major
> Labels: pull-request-available
> Original Estimate: 336h
> Remaining Estimate: 336h
>
> h3. Problem
> When using the Flink CDC Pipeline connector (YAML mode) to replicate a
> PostgreSQL table containing {{TIMESTAMPTZ}} columns to an Iceberg sink, the
> job crashes with {{NumberFormatException}} during both snapshot and CDC
> phases.
> h3. Root Cause
> {{PostgresTypeUtils}} maps {{TIMESTAMPTZ}} ({{{}PgOid.TIMESTAMPTZ{}}}) to
> {{ZonedTimestampType}} ({{{}TIMESTAMP_WITH_TIME_ZONE{}}}), but the Debezium
> deserializer ({{{}DebeziumEventDeserializationSchema{}}}) only has a
> converter for {{{}TIMESTAMP_WITH_LOCAL_TIME_ZONE{}}}. This type mismatch
> causes:
> # The deserializer produces {{LocalZonedTimestampData}} for the field
> # {{AbstractBinaryWriter.write()}} expects {{ZonedTimestampData}} based on
> the declared type
> # Binary data gets corrupted — field offset misalignment in
> {{BinaryRecordData}}
> # {{BinaryRecordData.getZonedTimestamp()}} reads garbage data, resulting in
> {{NumberFormatException}}
> The existing test {{PostgresFullTypesITCase}} already expects
> {{DataTypes.TIMESTAMP_LTZ(0)}} for {{{}TIMESTAMPTZ{}}}, confirming the
> correct type mapping should be {{{}TIMESTAMP_WITH_LOCAL_TIME_ZONE{}}}.
> h3. Steps to Reproduce
> {code:sql}
> CREATE TABLE test_tz (
> id INT PRIMARY KEY,
> name TEXT,
> created_at TIMESTAMPTZ DEFAULT now()
> );
> ALTER TABLE test_tz REPLICA IDENTITY FULL;
> INSERT INTO test_tz VALUES (1, 'test', '2026-03-31 12:03:46.125062+00');
> {code}
> Configure a Flink CDC Pipeline YAML job with PostgreSQL source → Iceberg sink
> targeting the above table. Submit the job — crashes immediately during
> snapshot:
> {code:java}
> java.lang.NumberFormatException
> at BinaryRecordData.getZonedTimestamp()
> at IcebergTypeUtils.createFieldGetter()
> {code}
> h3. Workaround
> Use {{TIMESTAMP}} (without time zone) instead of {{{}TIMESTAMPTZ{}}}, or use
> Flink SQL CDC mode instead of Pipeline YAML mode.
> h3. Fix
> The fix changes {{PostgresTypeUtils}} to map {{TIMESTAMPTZ}} →
> {{DataTypes.TIMESTAMP_LTZ(scale)}} instead of {{{}ZonedTimestampType{}}}.
> This aligns with the Debezium deserializer's existing converter and matches
> the expectation in {{{}PostgresFullTypesITCase{}}}.
> Additionally adds {{convertToZonedTimestamp()}} to
> {{DebeziumEventDeserializationSchema}} for future
> {{TIMESTAMP_WITH_TIME_ZONE}} support.
> *Files changed:*
> * {{{}PostgresTypeUtils.java{}}}: {{TIMESTAMPTZ}} → {{TIMESTAMP_LTZ(scale)}}
> (was {{{}ZonedTimestampType{}}})
> * {{{}DebeziumEventDeserializationSchema.java{}}}: add
> {{convertToZonedTimestamp()}}
> * New: {{PostgresTypeUtilsTimestamptzTest.java}} — validates type mapping
> * {{IcebergTypeUtilsTest.java}} — validates field getter for
> {{TIMESTAMP_LTZ}}
> Branch:
> [https://github.com/avichaym/flink-cdc/tree/fix/timestamptz-iceberg-sink]
> Validated end-to-end on EKS with Flink 1.20 + CDC 3.6.0: TIMESTAMPTZ data
> successfully written to Apache Iceberg (S3 Tables) with correct microsecond
> precision and UTC timezone.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)