This is an automated email from the ASF dual-hosted git repository.

gustavodemorais pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c033b17fae27117fbfc6b007348e9499d7830023
Author: Ramin Gharib <[email protected]>
AuthorDate: Thu May 28 16:00:22 2026 +0200

    [FLINK-39636][table] Widen output column nullability for partial deletes
    
    When produces_full_deletes=false, the operator nulls out columns that are 
not part of the upsert key or PARTITION BY on DELETE rows. The output schema 
previously declared those columns as NOT NULL, mirroring the input, which 
conflicts with the values the operator emits at runtime.
    
    Read produces_full_deletes in OUTPUT_TYPE_STRATEGY and widen non-preserved 
columns to nullable. Preserved columns (upsert key plus partition keys) keep 
their declared nullability. The new 
ChangelogTypeStrategyUtils#computePreservedColumnIndices helper centralizes the 
index computation so the runtime and the type strategy stay
      in sync.
    
    Add ToChangelogOutputTypeStrategyTest covering all three cases: full 
deletes preserves NOT NULL, partial deletes with an upsert key widens only the 
non-key columns, and partial deletes without an upsert key widens everything.
---
 .../docs/sql/reference/queries/changelog.md        |   4 +
 .../strategies/ChangelogTypeStrategyUtils.java     |  24 +++++
 .../strategies/ToChangelogTypeStrategy.java        |  39 +++++++-
 .../ToChangelogOutputTypeStrategyTest.java         | 110 +++++++++++++++++++++
 4 files changed, 174 insertions(+), 3 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index 641bb7245f7..71539a9b8cb 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -449,6 +449,10 @@ SELECT * FROM TO_CHANGELOG(input => TABLE retract_source 
PARTITION BY id)
 
 The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. This 
requires an [upsert key](#upsert-key) to be present for the input table (row 
semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected 
with a validation error.
 
+{{< hint warning >}}
+**Output nullability changes when `produces_full_deletes => false`.** Columns 
that are not part of the upsert key (or `PARTITION BY`) are nulled on DELETE 
rows at runtime, so the type system widens them to nullable in the output 
schema. Columns declared `NOT NULL` on the input therefore appear as nullable 
in the output of a `TO_CHANGELOG(..., produces_full_deletes => false)` call. 
The upsert-key (or partition-key) columns keep their declared nullability. Use 
the default `produces_full_de [...]
+{{< /hint >}}
+
 **Row semantics** (no `PARTITION BY`): the function preserves the 
planner-derived upsert key columns on DELETE rows and nulls the rest. The 
upsert key is typically a declared `PRIMARY KEY` when directly reading from a 
source or the key provided in a `GROUP BY <key>`.
 
 ```sql
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
index d4e20c30210..6adc885dd78 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
@@ -22,9 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.functions.TableSemantics;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.utils.UpsertKeyUtils;
 import org.apache.flink.types.ColumnList;
 
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.OptionalInt;
 import java.util.Set;
@@ -80,6 +82,28 @@ public final class ChangelogTypeStrategyUtils {
         return computeOutputIndices(tableSemantics, opIndex);
     }
 
+    /**
+     * Returns the set of input column indices whose values the function 
preserves on partial DELETE
+     * rows (i.e. those that {@code TO_CHANGELOG} keeps when {@code 
produces_full_deletes=false}).
+     * The set is the union of {@code PARTITION BY} columns (the framework 
prepends them outside the
+     * projected output) and the smallest upsert-key candidate exposed by 
{@link
+     * TableSemantics#upsertKeyColumns()}.
+     *
+     * <p>Returns only the partition keys when no upsert key candidate is 
exposed yet, for example
+     * during type inference where planner-derived metadata is not yet 
populated.
+     */
+    public static Set<Integer> computePreservedColumnIndices(final 
TableSemantics tableSemantics) {
+        final Set<Integer> preserved = new 
HashSet<>(collectPartitionKeyIndices(tableSemantics));
+        final List<int[]> upsertKeys = tableSemantics.upsertKeyColumns();
+        if (upsertKeys == null || upsertKeys.isEmpty()) {
+            return preserved;
+        }
+        for (final int column : UpsertKeyUtils.smallestKey(upsertKeys)) {
+            preserved.add(column);
+        }
+        return preserved;
+    }
+
     private static int[] computeOutputIndices(
             final TableSemantics tableSemantics, final int extraExcludedIndex) 
{
         final Set<Integer> excluded = 
collectPartitionKeyIndices(tableSemantics);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
index 1434b221077..a2b8e13681b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -32,6 +32,7 @@ import org.apache.flink.types.RowKind;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -84,8 +85,13 @@ public final class ToChangelogTypeStrategy {
 
                 final String opColumnName =
                         
ChangelogTypeStrategyUtils.resolveOpColumnName(callContext);
+                final boolean producesFullDeletes =
+                        callContext
+                                .getArgumentValue(ARG_PRODUCES_FULL_DELETES, 
Boolean.class)
+                                .orElse(true);
 
-                final List<Field> outputFields = buildOutputFields(semantics, 
opColumnName);
+                final List<Field> outputFields =
+                        buildOutputFields(semantics, opColumnName, 
producesFullDeletes);
 
                 return Optional.of(DataTypes.ROW(outputFields).notNull());
             };
@@ -208,15 +214,42 @@ public final class ToChangelogTypeStrategy {
         return false;
     }
 
+    /**
+     * Builds the output {@link Field}s for the {@code TO_CHANGELOG} call.
+     *
+     * <p>When the function emits partial DELETE rows ({@code 
produces_full_deletes=false}), columns
+     * that are not part of the upsert key (or {@code PARTITION BY}) are 
nulled out at runtime.
+     * Those columns are widened to nullable here so the output schema matches 
what the operator can
+     * emit; preserved columns keep their declared nullability.
+     *
+     * <p>At type inference time the upsert key may not be exposed yet; in 
that case the helper
+     * returns only the partition keys and all projected columns are widened 
conservatively.
+     */
     private static List<Field> buildOutputFields(
-            final TableSemantics semantics, final String opColumnName) {
+            final TableSemantics semantics,
+            final String opColumnName,
+            final boolean producesFullDeletes) {
         final List<Field> inputFields = 
DataType.getFields(semantics.dataType());
         final int[] outputIndices = 
ChangelogTypeStrategyUtils.computeOutputIndices(semantics);
         final List<Field> outputFields = new ArrayList<>();
         outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING()));
-        
Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add);
+        final Set<Integer> preserved =
+                producesFullDeletes
+                        ? Collections.emptySet()
+                        : 
ChangelogTypeStrategyUtils.computePreservedColumnIndices(semantics);
+        Arrays.stream(outputIndices)
+                .mapToObj(
+                        idx ->
+                                producesFullDeletes || preserved.contains(idx)
+                                        ? inputFields.get(idx)
+                                        : asNullable(inputFields.get(idx)))
+                .forEach(outputFields::add);
         return outputFields;
     }
 
+    private static Field asNullable(final Field field) {
+        return DataTypes.FIELD(field.getName(), 
field.getDataType().nullable());
+    }
+
     private ToChangelogTypeStrategy() {}
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java
new file mode 100644
index 00000000000..e912370ef00
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeStrategiesTestBase;
+import org.apache.flink.table.types.inference.utils.TableSemanticsMock;
+import org.apache.flink.types.ColumnList;
+
+import java.util.List;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.TO_CHANGELOG_OUTPUT_TYPE_STRATEGY;
+import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP;
+import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP_MAPPING;
+import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_PRODUCES_FULL_DELETES;
+import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_TABLE;
+
+/** Tests for {@link ToChangelogTypeStrategy#OUTPUT_TYPE_STRATEGY}. */
+class ToChangelogOutputTypeStrategyTest extends TypeStrategiesTestBase {
+
+    private static final DataType TABLE_TYPE_NOT_NULL_SCORE =
+            DataTypes.ROW(
+                    DataTypes.FIELD("name", DataTypes.STRING().notNull()),
+                    DataTypes.FIELD("score", DataTypes.BIGINT().notNull()));
+
+    private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR();
+    private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING());
+    private static final DataType BOOLEAN_TYPE = DataTypes.BOOLEAN();
+
+    @Override
+    protected Stream<TestSpec> testData() {
+        return Stream.of(
+                TestSpec.forStrategy(
+                                "produces_full_deletes=true preserves NOT NULL 
on input columns",
+                                TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+                        .inputTypes(
+                                TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(
+                                ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE_NOT_NULL_SCORE))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, null)
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true)
+                        .expectDataType(
+                                DataTypes.ROW(
+                                                DataTypes.FIELD("op", 
DataTypes.STRING()),
+                                                DataTypes.FIELD(
+                                                        "name", 
DataTypes.STRING().notNull()),
+                                                DataTypes.FIELD(
+                                                        "score", 
DataTypes.BIGINT().notNull()))
+                                        .notNull()),
+                TestSpec.forStrategy(
+                                "produces_full_deletes=false widens 
non-upsert-key columns to nullable",
+                                TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+                        .inputTypes(
+                                TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(
+                                ARG_TABLE,
+                                new TableSemanticsMock(
+                                        TABLE_TYPE_NOT_NULL_SCORE,
+                                        new int[0],
+                                        new int[0],
+                                        -1,
+                                        null,
+                                        List.of(new int[] {0})))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, null)
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, false)
+                        .expectDataType(
+                                DataTypes.ROW(
+                                                DataTypes.FIELD("op", 
DataTypes.STRING()),
+                                                DataTypes.FIELD(
+                                                        "name", 
DataTypes.STRING().notNull()),
+                                                DataTypes.FIELD("score", 
DataTypes.BIGINT()))
+                                        .notNull()),
+                TestSpec.forStrategy(
+                                "produces_full_deletes=false without upsert 
key widens all columns",
+                                TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+                        .inputTypes(
+                                TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(
+                                ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE_NOT_NULL_SCORE))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, null)
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, false)
+                        .expectDataType(
+                                DataTypes.ROW(
+                                                DataTypes.FIELD("op", 
DataTypes.STRING()),
+                                                DataTypes.FIELD("name", 
DataTypes.STRING()),
+                                                DataTypes.FIELD("score", 
DataTypes.BIGINT()))
+                                        .notNull()));
+    }
+}

Reply via email to