This is an automated email from the ASF dual-hosted git repository.

yuxiqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new a24b34f48 [FLINK-39684] Fix PaimonWriterHelper NullPointerException 
when handling REPLACE events (#4398)
a24b34f48 is described below

commit a24b34f48fe900b134b8f4a9c7081d9aea5cbed4
Author: yuxiqian <[email protected]>
AuthorDate: Mon May 18 10:39:41 2026 +0800

    [FLINK-39684] Fix PaimonWriterHelper NullPointerException when handling 
REPLACE events (#4398)
---
 .../paimon/sink/v2/PaimonWriterHelper.java         |  8 +-
 .../paimon/sink/v2/PaimonWriterHelperTest.java     | 85 ++++++++++++++++++++++
 2 files changed, 92 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
index 1e34c689b..abed564fe 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
@@ -250,7 +250,6 @@ public class PaimonWriterHelper {
                     break;
                 }
             case UPDATE:
-            case REPLACE:
                 {
                     if (hasPrimaryKey) {
                         fullGenericRows.add(
@@ -264,6 +263,13 @@ public class PaimonWriterHelper {
                                     dataChangeEvent.after(), fieldGetters, 
RowKind.UPDATE_AFTER));
                     break;
                 }
+            case REPLACE:
+                {
+                    fullGenericRows.add(
+                            convertRecordDataToGenericRow(
+                                    dataChangeEvent.after(), fieldGetters, 
RowKind.UPDATE_AFTER));
+                    break;
+                }
             case DELETE:
                 {
                     if (hasPrimaryKey) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
index b48761970..41accd73a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
@@ -200,6 +200,91 @@ class PaimonWriterHelperTest {
         
Assertions.assertThat(genericRow.getRowKind()).isEqualTo(RowKind.INSERT);
     }
 
+    @Test
+    void testConvertEventToFullGenericRowsOfDataChangeTypes() {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("col1", DataTypes.STRING())
+                        .physicalColumn("col2", DataTypes.STRING())
+                        .build();
+        List<RecordData.FieldGetter> fieldGetters =
+                PaimonWriterHelper.createFieldGetters(schema, 
ZoneId.systemDefault());
+        TableId tableId = TableId.parse("database.table");
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), 
DataTypes.STRING()));
+        BinaryRecordData beforeData =
+                generator.generate(
+                        new Object[] {
+                            BinaryStringData.fromString("1"), 
BinaryStringData.fromString("old")
+                        });
+        BinaryRecordData afterData =
+                generator.generate(
+                        new Object[] {
+                            BinaryStringData.fromString("1"), 
BinaryStringData.fromString("new")
+                        });
+
+        // INSERT: single INSERT row regardless of hasPrimaryKey
+        DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId, 
afterData);
+        List<GenericRow> rows =
+                PaimonWriterHelper.convertEventToFullGenericRows(
+                        dataChangeEvent, fieldGetters, true);
+        Assertions.assertThat(rows).hasSize(1);
+        
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.INSERT);
+
+        rows =
+                PaimonWriterHelper.convertEventToFullGenericRows(
+                        dataChangeEvent, fieldGetters, false);
+        Assertions.assertThat(rows).hasSize(1);
+        
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.INSERT);
+
+        // REPLACE: single INSERT row regardless of hasPrimaryKey (same as 
INSERT)
+        dataChangeEvent = DataChangeEvent.replaceEvent(tableId, afterData, 
null);
+        rows =
+                PaimonWriterHelper.convertEventToFullGenericRows(
+                        dataChangeEvent, fieldGetters, true);
+        Assertions.assertThat(rows).hasSize(1);
+        
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.UPDATE_AFTER);
+        
Assertions.assertThat(rows.get(0).getString(1)).isEqualTo(BinaryString.fromString("new"));
+
+        rows =
+                PaimonWriterHelper.convertEventToFullGenericRows(
+                        dataChangeEvent, fieldGetters, false);
+        Assertions.assertThat(rows).hasSize(1);
+        
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.UPDATE_AFTER);
+
+        // UPDATE with primary key: UPDATE_BEFORE + UPDATE_AFTER
+        dataChangeEvent = DataChangeEvent.updateEvent(tableId, beforeData, 
afterData);
+        rows =
+                PaimonWriterHelper.convertEventToFullGenericRows(
+                        dataChangeEvent, fieldGetters, true);
+        Assertions.assertThat(rows).hasSize(2);
+        
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.UPDATE_BEFORE);
+        
Assertions.assertThat(rows.get(0).getString(1)).isEqualTo(BinaryString.fromString("old"));
+        
Assertions.assertThat(rows.get(1).getRowKind()).isEqualTo(RowKind.UPDATE_AFTER);
+        
Assertions.assertThat(rows.get(1).getString(1)).isEqualTo(BinaryString.fromString("new"));
+
+        // UPDATE without primary key: only UPDATE_AFTER
+        rows =
+                PaimonWriterHelper.convertEventToFullGenericRows(
+                        dataChangeEvent, fieldGetters, false);
+        Assertions.assertThat(rows).hasSize(1);
+        
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.UPDATE_AFTER);
+
+        // DELETE with primary key: single DELETE row
+        dataChangeEvent = DataChangeEvent.deleteEvent(tableId, beforeData);
+        rows =
+                PaimonWriterHelper.convertEventToFullGenericRows(
+                        dataChangeEvent, fieldGetters, true);
+        Assertions.assertThat(rows).hasSize(1);
+        
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.DELETE);
+
+        // DELETE without primary key: empty (no rows)
+        rows =
+                PaimonWriterHelper.convertEventToFullGenericRows(
+                        dataChangeEvent, fieldGetters, false);
+        Assertions.assertThat(rows).isEmpty();
+    }
+
     @Test
     void testConvertEventToGenericRowWithNestedRow() {
         // Define the inner row type with an integer and a map

Reply via email to