[ 
https://issues.apache.org/jira/browse/FLINK-39415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Avichay Marciano updated FLINK-39415:
-------------------------------------
    Environment: 
Validated end-to-end on EKS with Flink 1.20 + CDC 3.6.0: TIMESTAMPTZ data 
successfully written to Apache Iceberg (S3 Tables) with correct microsecond 
precision and UTC timezone.

Pull Request - https://github.com/apache/flink-cdc/pull/4371

  was:Validated end-to-end on EKS with Flink 1.20 + CDC 3.6.0: TIMESTAMPTZ data 
successfully written to Apache Iceberg (S3 Tables) with correct microsecond 
precision and UTC timezone.


> [flink-cdc-postgres] TIMESTAMPTZ type mapping causes NumberFormatException in 
> Pipeline connector with Iceberg sink
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39415
>                 URL: https://issues.apache.org/jira/browse/FLINK-39415
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.6.0
>         Environment: Validated end-to-end on EKS with Flink 1.20 + CDC 3.6.0: 
> TIMESTAMPTZ data successfully written to Apache Iceberg (S3 Tables) with 
> correct microsecond precision and UTC timezone.
> Pull Request - https://github.com/apache/flink-cdc/pull/4371
>            Reporter: Avichay Marciano
>            Priority: Major
>              Labels: pull-request-available
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> h3. Problem
> When using the Flink CDC Pipeline connector (YAML mode) to replicate a 
> PostgreSQL table containing {{TIMESTAMPTZ}} columns to an Iceberg sink, the 
> job crashes with {{NumberFormatException}} during both snapshot and CDC 
> phases.
> h3. Root Cause
> {{PostgresTypeUtils}} maps {{TIMESTAMPTZ}} ({{{}PgOid.TIMESTAMPTZ{}}}) to 
> {{ZonedTimestampType}} ({{{}TIMESTAMP_WITH_TIME_ZONE{}}}), but the Debezium 
> deserializer ({{{}DebeziumEventDeserializationSchema{}}}) only has a 
> converter for {{{}TIMESTAMP_WITH_LOCAL_TIME_ZONE{}}}. This type mismatch 
> causes:
>  # The deserializer produces {{LocalZonedTimestampData}} for the field
>  # {{AbstractBinaryWriter.write()}} expects {{ZonedTimestampData}} based on 
> the declared type
>  # Binary data gets corrupted — field offset misalignment in 
> {{BinaryRecordData}}
>  # {{BinaryRecordData.getZonedTimestamp()}} reads garbage data, resulting in 
> {{NumberFormatException}}
> The existing test {{PostgresFullTypesITCase}}  already expects 
> {{DataTypes.TIMESTAMP_LTZ(0)}} for {{{}TIMESTAMPTZ{}}}, confirming the 
> correct type mapping should be {{{}TIMESTAMP_WITH_LOCAL_TIME_ZONE{}}}.
> h3. Steps to Reproduce
> {code:sql}
> CREATE TABLE test_tz (
>     id INT PRIMARY KEY,
>     name TEXT,
>     created_at TIMESTAMPTZ DEFAULT now()
> );
> ALTER TABLE test_tz REPLICA IDENTITY FULL;
> INSERT INTO test_tz VALUES (1, 'test', '2026-03-31 12:03:46.125062+00');
> {code}
> Configure a Flink CDC Pipeline YAML job with PostgreSQL source → Iceberg sink 
> targeting the above table. Submit the job — crashes immediately during 
> snapshot:
> {code:java}
> java.lang.NumberFormatException
>     at BinaryRecordData.getZonedTimestamp()
>     at IcebergTypeUtils.createFieldGetter()
> {code}
> h3. Workaround
> Use {{TIMESTAMP}} (without time zone) instead of {{{}TIMESTAMPTZ{}}}, or use 
> Flink SQL CDC mode instead of Pipeline YAML mode.
> h3. Fix
> The fix changes {{PostgresTypeUtils}} to map {{TIMESTAMPTZ}} → 
> {{DataTypes.TIMESTAMP_LTZ(scale)}} instead of {{{}ZonedTimestampType{}}}. 
> This aligns with the Debezium deserializer's existing converter and matches 
> the expectation in {{{}PostgresFullTypesITCase{}}}.
> Additionally adds {{convertToZonedTimestamp()}} to 
> {{DebeziumEventDeserializationSchema}} for future 
> {{TIMESTAMP_WITH_TIME_ZONE}} support.
> *Files changed:*
>  * {{{}PostgresTypeUtils.java{}}}: {{TIMESTAMPTZ}} → {{TIMESTAMP_LTZ(scale)}} 
> (was {{{}ZonedTimestampType{}}})
>  * {{{}DebeziumEventDeserializationSchema.java{}}}: add 
> {{convertToZonedTimestamp()}}
>  * New: {{PostgresTypeUtilsTimestamptzTest.java}} — validates type mapping
>  * {{IcebergTypeUtilsTest.java}} — validates field getter for 
> {{TIMESTAMP_LTZ}}
> Branch: 
> [https://github.com/avichaym/flink-cdc/tree/fix/timestamptz-iceberg-sink]
> Validated end-to-end on EKS with Flink 1.20 + CDC 3.6.0: TIMESTAMPTZ data 
> successfully written to Apache Iceberg (S3 Tables) with correct microsecond 
> precision and UTC timezone.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to