This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 682d79153 [FLINK-38440][pipeline-connector][iceberg] fixed the problem
of writing SMALLINT/TINYINT data to Iceberg. (#4146)
682d79153 is described below
commit 682d791538972148c2d9f24f96fc2bd9b1a77666
Author: fcfangcc <[email protected]>
AuthorDate: Sun Sep 28 11:56:47 2025 +0800
[FLINK-38440][pipeline-connector][iceberg] fixed the problem of writing
SMALLINT/TINYINT data to Iceberg. (#4146)
---
.../flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java | 4 +++-
.../flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java | 6 +++++-
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
index e52c985b7..102b4dbad 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
@@ -140,8 +140,10 @@ public class IcebergTypeUtils {
};
break;
case TINYINT:
+ fieldGetter = row -> (int) row.getByte(fieldPos);
+ break;
case SMALLINT:
- fieldGetter = row -> row.getInt(fieldPos);
+ fieldGetter = row -> ((Short)
row.getShort(fieldPos)).intValue();
break;
case BIGINT:
fieldGetter = row -> row.getLong(fieldPos);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
index 4bcd45d9e..4a6c1581b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
@@ -291,7 +291,9 @@ public class IcebergWriterTest {
.physicalColumn("varbinary(10)",
DataTypes.BINARY(10))
.physicalColumn("decimal(10, 2)",
DataTypes.DECIMAL(10, 2))
.physicalColumn("tinyint", DataTypes.TINYINT())
+ .physicalColumn("negative_tinyint",
DataTypes.TINYINT())
.physicalColumn("smallint",
DataTypes.SMALLINT())
+ .physicalColumn("negative_smallint",
DataTypes.SMALLINT())
.physicalColumn("int", DataTypes.INT())
.physicalColumn("bigint", DataTypes.BIGINT())
.physicalColumn("float", DataTypes.FLOAT())
@@ -319,7 +321,9 @@ public class IcebergWriterTest {
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
DecimalData.zero(10, 2),
(byte) 1,
+ (byte) -1,
(short) 2,
+ (short) -2,
12345,
12345L,
123.456f,
@@ -342,7 +346,7 @@ public class IcebergWriterTest {
List<String> result = fetchTableContent(catalog, tableId);
Assertions.assertThat(result)
.containsExactlyInAnyOrder(
- "char, varchar, string, false, [1,2,3,4,5,],
[1,2,3,4,5,6,7,8,9,10,], 0.00, 1, 2, 12345, 12345, 123.456, 123456.789,
00:00:12.345, 2003-10-20, 1970-01-01T00:00, 1970-01-01T00:00Z,
1970-01-01T00:00Z");
+ "char, varchar, string, false, [1,2,3,4,5,],
[1,2,3,4,5,6,7,8,9,10,], 0.00, 1, -1, 2, -2, 12345, 12345, 123.456, 123456.789,
00:00:12.345, 2003-10-20, 1970-01-01T00:00, 1970-01-01T00:00Z,
1970-01-01T00:00Z");
}
/** Mock CommitRequestImpl. */