This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0053b41d5d0 [FLINK-37643] Support partial deletes when converting to
external data structures (#26436)
0053b41d5d0 is described below
commit 0053b41d5d07252fef468e43c33f30b7fd1e27da
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Apr 11 10:17:33 2025 +0200
[FLINK-37643] Support partial deletes when converting to external data
structures (#26436)
---
.../org/apache/flink/table/data/ArrayData.java | 9 ++-
.../java/org/apache/flink/table/data/RowData.java | 8 ++-
.../table/data/DataStructureConvertersTest.java | 64 +++++++++++++++++++++-
.../runtime/typeutils/RowDataSerializerTest.java | 60 ++++++++++++++++++++
4 files changed, 134 insertions(+), 7 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
index 496a8728438..6ebde124f7e 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java
@@ -215,9 +215,6 @@ public interface ArrayData {
default:
throw new IllegalArgumentException();
}
- if (!elementType.isNullable()) {
- return elementGetter;
- }
return (array, pos) -> {
if (array.isNullAt(pos)) {
return null;
@@ -233,6 +230,12 @@ public interface ArrayData {
*/
@PublicEvolving
interface ElementGetter extends Serializable {
+
+ /**
+ * Converters and serializers always support nullability. The NOT NULL
constraint is only
+ * considered on SQL semantic level but not data transfer. E.g.
partial deletes (i.e.
+ * key-only upserts) set all non-key fields to null, regardless of
logical type.
+ */
@Nullable
Object getElementOrNull(ArrayData array, int pos);
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
index 387276d94b3..614583adead 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java
@@ -287,9 +287,6 @@ public interface RowData {
default:
throw new IllegalArgumentException();
}
- if (!fieldType.isNullable()) {
- return fieldGetter;
- }
return row -> {
if (row.isNullAt(fieldPos)) {
return null;
@@ -305,6 +302,11 @@ public interface RowData {
*/
@PublicEvolving
interface FieldGetter extends Serializable {
+ /**
+ * Converters and serializers always support nullability. The NOT NULL
constraint is only
+ * considered on SQL semantic level but not data transfer. E.g.
partial deletes (i.e.
+ * key-only upserts) set all non-key fields to null, regardless of
logical type.
+ */
@Nullable
Object getFieldOrNull(RowData row);
}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
index d4a1c13c4b9..9da6e7c18c4 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -355,7 +355,69 @@ class DataStructureConvertersTest {
TestSpec.forDataType(
DataTypes.STRUCTURED(GenericPojo.class,
FIELD("value", DATE())))
.convertedTo(
- GenericPojo.class, new
GenericPojo<>(LocalDate.ofEpochDay(123))));
+ GenericPojo.class, new
GenericPojo<>(LocalDate.ofEpochDay(123))),
+
+ // partial delete messages
+ TestSpec.forDataType(
+ DataTypes.ROW(
+ DataTypes.FIELD("f0",
DataTypes.TINYINT().notNull()),
+ DataTypes.FIELD("f1",
DataTypes.SMALLINT().notNull()),
+ DataTypes.FIELD("f2",
DataTypes.INT().notNull()),
+ DataTypes.FIELD("f3",
DataTypes.BIGINT().notNull()),
+ DataTypes.FIELD("f4",
DataTypes.DOUBLE().notNull()),
+ DataTypes.FIELD("f5",
DataTypes.FLOAT().notNull()),
+ DataTypes.FIELD("f6",
DataTypes.DATE().notNull()),
+ DataTypes.FIELD("f7",
DataTypes.BINARY(12).notNull()),
+ DataTypes.FIELD("f8",
DataTypes.VARBINARY(12).notNull()),
+ DataTypes.FIELD("f9",
DataTypes.CHAR(12).notNull()),
+ DataTypes.FIELD("f10",
DataTypes.VARCHAR(12).notNull()),
+ DataTypes.FIELD("f11",
DataTypes.BOOLEAN().notNull()),
+ DataTypes.FIELD("f12",
DataTypes.TIME().notNull()),
+ DataTypes.FIELD("f13",
DataTypes.TIMESTAMP().notNull()),
+ DataTypes.FIELD("f14",
DataTypes.TIMESTAMP_LTZ().notNull()),
+ DataTypes.FIELD("f15",
DataTypes.DECIMAL(10, 2).notNull()),
+ DataTypes.FIELD(
+ "f16",
+ DataTypes.MAP(
+
DataTypes.INT().notNull(),
+
DataTypes.STRING().notNull())
+ .notNull()),
+ DataTypes.FIELD(
+ "f17",
+
DataTypes.ARRAY(DataTypes.INT().notNull())
+ .notNull()),
+ DataTypes.FIELD(
+ "f18",
+
DataTypes.ARRAY(DataTypes.INT().notNull())
+ .notNull()),
+ DataTypes.FIELD(
+ "f19",
+
DataTypes.MULTISET(DataTypes.INT().notNull())
+ .notNull())))
+ .convertedTo(
+ Row.class,
+ Row.ofKind(
+ RowKind.DELETE,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ new Integer[] {null, null},
+ null)));
}
@ParameterizedTest(name = "{index}: {0}")
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
index b70c5497cef..6d6379ed046 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
@@ -33,11 +33,21 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryArrayWriter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.testutils.DeeplyEqualsChecker;
@@ -398,4 +408,54 @@ abstract class RowDataSerializerTest extends
SerializerTestInstance<RowData> {
InternalSerializers.<RowData>create(NESTED_DATA_TYPE.getLogicalType());
}
}
+
+ /**
+ * Converters and serializers always support nullability. The NOT NULL
constraint is only
+ * considered on SQL semantic level but not data transfer. E.g. partial
deletes (i.e. key-only
+ * upserts) set all non-key fields to null, regardless of logical type.
+ */
+ static final class RowDataSerializerWithNullForNotNullTypeTest extends
RowDataSerializerTest {
+ public RowDataSerializerWithNullForNotNullTypeTest() {
+ super(getRowSerializer(), getData());
+ }
+
+ private static RowData[] getData() {
+ GenericRowData row = new GenericRowData(13);
+ row.setField(0, 2);
+ row.setField(1, null);
+ row.setField(3, null);
+ row.setField(4, null);
+ row.setField(5, null);
+ row.setField(6, null);
+ row.setField(7, null);
+ row.setField(8, null);
+ row.setField(9, null);
+ row.setField(10, null);
+ row.setField(11, null);
+ row.setField(12, null);
+
+ return new RowData[] {row};
+ }
+
+ private static RowDataSerializer getRowSerializer() {
+ InternalTypeInfo<RowData> typeInfo =
+ InternalTypeInfo.ofFields(
+ new IntType(false),
+ new SmallIntType(false),
+ new BigIntType(false),
+ new VarCharType(false, VarCharType.MAX_LENGTH),
+ new CharType(false, CharType.MAX_LENGTH),
+ new BinaryType(false, BinaryType.MAX_LENGTH),
+ new VarBinaryType(false, VarBinaryType.MAX_LENGTH),
+ new DateType(false),
+ new DayTimeIntervalType(
+ false,
DayTimeIntervalType.DayTimeResolution.DAY, 1, 6),
+ new DecimalType(false, 10, 2),
+ new FloatType(false),
+ new DoubleType(false),
+ new LocalZonedTimestampType(false, 3));
+
+ return typeInfo.toRowSerializer();
+ }
+ }
}