This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0053b41d5d0 [FLINK-37643] Support partial deletes when converting to 
external data structures (#26436)
0053b41d5d0 is described below

commit 0053b41d5d07252fef468e43c33f30b7fd1e27da
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Apr 11 10:17:33 2025 +0200

    [FLINK-37643] Support partial deletes when converting to external data 
structures (#26436)
---
 .../org/apache/flink/table/data/ArrayData.java     |  9 ++-
 .../java/org/apache/flink/table/data/RowData.java  |  8 ++-
 .../table/data/DataStructureConvertersTest.java    | 64 +++++++++++++++++++++-
 .../runtime/typeutils/RowDataSerializerTest.java   | 60 ++++++++++++++++++++
 4 files changed, 134 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
index 496a8728438..6ebde124f7e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
@@ -215,9 +215,6 @@ public interface ArrayData {
             default:
                 throw new IllegalArgumentException();
         }
-        if (!elementType.isNullable()) {
-            return elementGetter;
-        }
         return (array, pos) -> {
             if (array.isNullAt(pos)) {
                 return null;
@@ -233,6 +230,12 @@ public interface ArrayData {
      */
     @PublicEvolving
     interface ElementGetter extends Serializable {
+
+        /**
+         * Converters and serializers always support nullability. The NOT NULL 
constraint is only
+         * considered on SQL semantic level but not data transfer. E.g. 
partial deletes (i.e.
+         * key-only upserts) set all non-key fields to null, regardless of 
logical type.
+         */
         @Nullable
         Object getElementOrNull(ArrayData array, int pos);
     }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
index 387276d94b3..614583adead 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
@@ -287,9 +287,6 @@ public interface RowData {
             default:
                 throw new IllegalArgumentException();
         }
-        if (!fieldType.isNullable()) {
-            return fieldGetter;
-        }
         return row -> {
             if (row.isNullAt(fieldPos)) {
                 return null;
@@ -305,6 +302,11 @@ public interface RowData {
      */
     @PublicEvolving
     interface FieldGetter extends Serializable {
+        /**
+         * Converters and serializers always support nullability. The NOT NULL 
constraint is only
+         * considered on SQL semantic level but not data transfer. E.g. 
partial deletes (i.e.
+         * key-only upserts) set all non-key fields to null, regardless of 
logical type.
+         */
         @Nullable
         Object getFieldOrNull(RowData row);
     }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
index d4a1c13c4b9..9da6e7c18c4 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -355,7 +355,69 @@ class DataStructureConvertersTest {
                 TestSpec.forDataType(
                                 DataTypes.STRUCTURED(GenericPojo.class, 
FIELD("value", DATE())))
                         .convertedTo(
-                                GenericPojo.class, new 
GenericPojo<>(LocalDate.ofEpochDay(123))));
+                                GenericPojo.class, new 
GenericPojo<>(LocalDate.ofEpochDay(123))),
+
+                // partial delete messages
+                TestSpec.forDataType(
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f0", 
DataTypes.TINYINT().notNull()),
+                                        DataTypes.FIELD("f1", 
DataTypes.SMALLINT().notNull()),
+                                        DataTypes.FIELD("f2", 
DataTypes.INT().notNull()),
+                                        DataTypes.FIELD("f3", 
DataTypes.BIGINT().notNull()),
+                                        DataTypes.FIELD("f4", 
DataTypes.DOUBLE().notNull()),
+                                        DataTypes.FIELD("f5", 
DataTypes.FLOAT().notNull()),
+                                        DataTypes.FIELD("f6", 
DataTypes.DATE().notNull()),
+                                        DataTypes.FIELD("f7", 
DataTypes.BINARY(12).notNull()),
+                                        DataTypes.FIELD("f8", 
DataTypes.VARBINARY(12).notNull()),
+                                        DataTypes.FIELD("f9", 
DataTypes.CHAR(12).notNull()),
+                                        DataTypes.FIELD("f10", 
DataTypes.VARCHAR(12).notNull()),
+                                        DataTypes.FIELD("f11", 
DataTypes.BOOLEAN().notNull()),
+                                        DataTypes.FIELD("f12", 
DataTypes.TIME().notNull()),
+                                        DataTypes.FIELD("f13", 
DataTypes.TIMESTAMP().notNull()),
+                                        DataTypes.FIELD("f14", 
DataTypes.TIMESTAMP_LTZ().notNull()),
+                                        DataTypes.FIELD("f15", 
DataTypes.DECIMAL(10, 2).notNull()),
+                                        DataTypes.FIELD(
+                                                "f16",
+                                                DataTypes.MAP(
+                                                                
DataTypes.INT().notNull(),
+                                                                
DataTypes.STRING().notNull())
+                                                        .notNull()),
+                                        DataTypes.FIELD(
+                                                "f17",
+                                                
DataTypes.ARRAY(DataTypes.INT().notNull())
+                                                        .notNull()),
+                                        DataTypes.FIELD(
+                                                "f18",
+                                                
DataTypes.ARRAY(DataTypes.INT().notNull())
+                                                        .notNull()),
+                                        DataTypes.FIELD(
+                                                "f19",
+                                                
DataTypes.MULTISET(DataTypes.INT().notNull())
+                                                        .notNull())))
+                        .convertedTo(
+                                Row.class,
+                                Row.ofKind(
+                                        RowKind.DELETE,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        new Integer[] {null, null},
+                                        null)));
     }
 
     @ParameterizedTest(name = "{index}: {0}")
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
index b70c5497cef..6d6379ed046 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
@@ -33,11 +33,21 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryArrayWriter;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.testutils.DeeplyEqualsChecker;
 
@@ -398,4 +408,54 @@ abstract class RowDataSerializerTest extends 
SerializerTestInstance<RowData> {
                     
InternalSerializers.<RowData>create(NESTED_DATA_TYPE.getLogicalType());
         }
     }
+
+    /**
+     * Converters and serializers always support nullability. The NOT NULL 
constraint is only
+     * considered on SQL semantic level but not data transfer. E.g. partial 
deletes (i.e. key-only
+     * upserts) set all non-key fields to null, regardless of logical type.
+     */
+    static final class RowDataSerializerWithNullForNotNullTypeTest extends 
RowDataSerializerTest {
+        public RowDataSerializerWithNullForNotNullTypeTest() {
+            super(getRowSerializer(), getData());
+        }
+
+        private static RowData[] getData() {
+            GenericRowData row = new GenericRowData(13);
+            row.setField(0, 2);
+            row.setField(1, null);
+            row.setField(3, null);
+            row.setField(4, null);
+            row.setField(5, null);
+            row.setField(6, null);
+            row.setField(7, null);
+            row.setField(8, null);
+            row.setField(9, null);
+            row.setField(10, null);
+            row.setField(11, null);
+            row.setField(12, null);
+
+            return new RowData[] {row};
+        }
+
+        private static RowDataSerializer getRowSerializer() {
+            InternalTypeInfo<RowData> typeInfo =
+                    InternalTypeInfo.ofFields(
+                            new IntType(false),
+                            new SmallIntType(false),
+                            new BigIntType(false),
+                            new VarCharType(false, VarCharType.MAX_LENGTH),
+                            new CharType(false, CharType.MAX_LENGTH),
+                            new BinaryType(false, BinaryType.MAX_LENGTH),
+                            new VarBinaryType(false, VarBinaryType.MAX_LENGTH),
+                            new DateType(false),
+                            new DayTimeIntervalType(
+                                    false, 
DayTimeIntervalType.DayTimeResolution.DAY, 1, 6),
+                            new DecimalType(false, 10, 2),
+                            new FloatType(false),
+                            new DoubleType(false),
+                            new LocalZonedTimestampType(false, 3));
+
+            return typeInfo.toRowSerializer();
+        }
+    }
 }

Reply via email to