This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 682d79153 [FLINK-38440][pipeline-connector][iceberg] fixed the problem 
of writing SMALLINT/TINYINT data to Iceberg. (#4146)
682d79153 is described below

commit 682d791538972148c2d9f24f96fc2bd9b1a77666
Author: fcfangcc <[email protected]>
AuthorDate: Sun Sep 28 11:56:47 2025 +0800

    [FLINK-38440][pipeline-connector][iceberg] fixed the problem of writing 
SMALLINT/TINYINT data to Iceberg. (#4146)
---
 .../flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java   | 4 +++-
 .../flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java     | 6 +++++-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
index e52c985b7..102b4dbad 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
@@ -140,8 +140,10 @@ public class IcebergTypeUtils {
                         };
                 break;
             case TINYINT:
+                fieldGetter = row -> (int) row.getByte(fieldPos);
+                break;
             case SMALLINT:
-                fieldGetter = row -> row.getInt(fieldPos);
+                fieldGetter = row -> ((Short) 
row.getShort(fieldPos)).intValue();
                 break;
             case BIGINT:
                 fieldGetter = row -> row.getLong(fieldPos);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
index 4bcd45d9e..4a6c1581b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
@@ -291,7 +291,9 @@ public class IcebergWriterTest {
                                 .physicalColumn("varbinary(10)", 
DataTypes.BINARY(10))
                                 .physicalColumn("decimal(10, 2)", 
DataTypes.DECIMAL(10, 2))
                                 .physicalColumn("tinyint", DataTypes.TINYINT())
+                                .physicalColumn("negative_tinyint", 
DataTypes.TINYINT())
                                 .physicalColumn("smallint", 
DataTypes.SMALLINT())
+                                .physicalColumn("negative_smallint", 
DataTypes.SMALLINT())
                                 .physicalColumn("int", DataTypes.INT())
                                 .physicalColumn("bigint", DataTypes.BIGINT())
                                 .physicalColumn("float", DataTypes.FLOAT())
@@ -319,7 +321,9 @@ public class IcebergWriterTest {
                             new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
                             DecimalData.zero(10, 2),
                             (byte) 1,
+                            (byte) -1,
                             (short) 2,
+                            (short) -2,
                             12345,
                             12345L,
                             123.456f,
@@ -342,7 +346,7 @@ public class IcebergWriterTest {
         List<String> result = fetchTableContent(catalog, tableId);
         Assertions.assertThat(result)
                 .containsExactlyInAnyOrder(
-                        "char, varchar, string, false, [1,2,3,4,5,], 
[1,2,3,4,5,6,7,8,9,10,], 0.00, 1, 2, 12345, 12345, 123.456, 123456.789, 
00:00:12.345, 2003-10-20, 1970-01-01T00:00, 1970-01-01T00:00Z, 
1970-01-01T00:00Z");
+                        "char, varchar, string, false, [1,2,3,4,5,], 
[1,2,3,4,5,6,7,8,9,10,], 0.00, 1, -1, 2, -2, 12345, 12345, 123.456, 123456.789, 
00:00:12.345, 2003-10-20, 1970-01-01T00:00, 1970-01-01T00:00Z, 
1970-01-01T00:00Z");
     }
 
     /** Mock CommitRequestImpl. */

Reply via email to