This is an automated email from the ASF dual-hosted git repository.

gustavodemorais pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d763453e045ad445c7d5a8382559d34a9429a1e9
Author: Ramin Gharib <[email protected]>
AuthorDate: Tue May 26 10:56:29 2026 +0200

    [FLINK-39735][table] Expose input upsert key on TableSemantics
    
    Adds upsertKeyColumns() to TableSemantics so ProcessTableFunctions can read 
the planner-derived upsert key candidates of each table input at specialization 
time. The planner exposes all candidates; picking one is the function's 
responsibility. UpsertKeyUtils provides a stable smallestKey helper for that 
choice.
    
    Co-Authored-By: Jubin Soni <[email protected]>
---
 .../resolver/rules/ResolveCallByArgumentsRule.java |  5 ++
 .../flink/table/functions/TableSemantics.java      | 24 ++++++++
 .../apache/flink/table/utils/UpsertKeyUtils.java   | 67 ++++++++++++++++++++++
 .../types/inference/utils/TableSemanticsMock.java  | 25 ++++++++
 .../functions/bridging/BridgingSqlFunction.java    | 19 +++++-
 .../inference/CallBindingCallContext.java          |  6 ++
 .../inference/OperatorBindingCallContext.java      | 47 +++++++++++++--
 .../stream/StreamExecProcessTableFunction.java     | 32 +++++++++--
 .../stream/StreamPhysicalProcessTableFunction.java | 19 +++++-
 .../table/planner/plan/utils/UpsertKeyUtil.java    | 23 +++-----
 .../codegen/ProcessTableRunnerGenerator.scala      | 10 +++-
 .../plan/to-changelog-retract-restore.json         |  3 +-
 .../operators/process/RuntimeTableSemantics.java   | 11 +++-
 ...essSetTableOperatorInterruptibleTimersTest.java |  3 +-
 .../functions/TestHarnessTableSemantics.java       | 14 +++++
 15 files changed, 273 insertions(+), 35 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index be8e578deea..5e4a02cb9fb 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -781,6 +781,11 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
             return Optional.empty();
         }
 
+        @Override
+        public List<int[]> upsertKeyColumns() {
+            return Collections.emptyList();
+        }
+
         private PartitionQueryOperation findPartitionOperation(QueryOperation 
op) {
             if (op instanceof PartitionQueryOperation) {
                 return (PartitionQueryOperation) op;
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
index f63566befce..3d17401c5ef 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.types.DataType;
 
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -128,6 +129,29 @@ public interface TableSemantics {
      */
     Optional<ChangelogMode> changelogMode();
 
+    /**
+     * Upsert key candidates derived from the passed table's metadata.
+     *
+     * <p>Returns a list of 0-based column index arrays that uniquely identify 
a row for upsert
+     * semantics. This is distinct from {@link #partitionByColumns()}: 
partition keys describe
+     * distribution and co-location, upsert keys describe row identity. Useful 
for functions that
+     * need to emit key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER 
pairs, or want to have a
+     * unique identifier to interact with state.
+     *
+     * <p>Returns an empty list when no upsert key is derivable, or when the 
planner has not yet
+     * computed metadata (during type inference).
+     *
+     * <p>When the planner derives multiple candidate upsert keys for the same 
input (e.g., a table
+     * with several primary key constraints), all of them are returned. 
Picking which candidate to
+     * use is the function's responsibility, and the choice must be stable 
across releases to keep
+     * PTF state consistent after job restarts and upgrades. The order of the 
returned list is not
+     * part of the contract; PTF authors should not depend on it. A typical 
choice is the smallest
+     * candidate by cardinality, with ties broken by the column indices in 
ascending order.
+     *
+     * @return Candidate upsert keys of the passed table, or an empty list if 
none.
+     */
+    List<int[]> upsertKeyColumns();
+
     /** The sort direction for ORDER BY columns in table arguments with set 
semantics. */
     @PublicEvolving
     enum SortDirection {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/UpsertKeyUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/UpsertKeyUtils.java
new file mode 100644
index 00000000000..528a977d037
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/UpsertKeyUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Comparator;
+import java.util.List;
+
+/** Helpers for working with upsert key candidates. */
+@Internal
+public final class UpsertKeyUtils {
+
+    /**
+     * Comparator that orders upsert-key candidates deterministically and 
stably across releases:
+     * candidates with fewer columns come first; ties between 
equal-cardinality candidates are
+     * broken by the column indices in ascending order, leading column first.
+     */
+    private static final Comparator<int[]> SMALLEST_FIRST =
+            Comparator.<int[]>comparingInt(a -> a.length)
+                    .thenComparing(
+                            (a, b) -> {
+                                for (int i = 0; i < a.length; i++) {
+                                    final int cmp = Integer.compare(a[i], 
b[i]);
+                                    if (cmp != 0) {
+                                        return cmp;
+                                    }
+                                }
+                                return 0;
+                            });
+
+    /**
+     * Picks the smallest upsert key from the given candidates using {@link 
#SMALLEST_FIRST}.
+     * Returns an empty array when the candidate list is empty. The returned 
reference is one of the
+     * input arrays; callers must not mutate it.
+     */
+    public static int[] smallestKey(final List<int[]> candidates) {
+        if (candidates.isEmpty()) {
+            return new int[0];
+        }
+        int[] smallest = candidates.get(0);
+        for (int i = 1; i < candidates.size(); i++) {
+            if (SMALLEST_FIRST.compare(candidates.get(i), smallest) < 0) {
+                smallest = candidates.get(i);
+            }
+        }
+        return smallest;
+    }
+
+    private UpsertKeyUtils() {}
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java
index fe881f8fd1f..9b9830870fd 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.types.DataType;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 
 /** Mock implementation of {@link TableSemantics} for testing purposes. */
@@ -35,6 +37,7 @@ public class TableSemanticsMock implements TableSemantics {
     private final SortDirection[] orderByDirections;
     private final int timeColumn;
     private final ChangelogMode changelogMode;
+    private final List<int[]> upsertKeyColumns;
 
     public TableSemanticsMock(DataType dataType) {
         this(dataType, new int[0], new int[0], -1, null);
@@ -46,6 +49,22 @@ public class TableSemanticsMock implements TableSemantics {
             int[] orderByColumns,
             int timeColumn,
             @Nullable ChangelogMode changelogMode) {
+        this(
+                dataType,
+                partitionByColumns,
+                orderByColumns,
+                timeColumn,
+                changelogMode,
+                Collections.emptyList());
+    }
+
+    public TableSemanticsMock(
+            DataType dataType,
+            int[] partitionByColumns,
+            int[] orderByColumns,
+            int timeColumn,
+            @Nullable ChangelogMode changelogMode,
+            List<int[]> upsertKeyColumns) {
         this.dataType = dataType;
         this.partitionByColumns = partitionByColumns;
         this.orderByColumns = orderByColumns;
@@ -55,6 +74,7 @@ public class TableSemanticsMock implements TableSemantics {
         }
         this.timeColumn = timeColumn;
         this.changelogMode = changelogMode;
+        this.upsertKeyColumns = upsertKeyColumns;
     }
 
     @Override
@@ -86,4 +106,9 @@ public class TableSemanticsMock implements TableSemantics {
     public Optional<ChangelogMode> changelogMode() {
         return Optional.ofNullable(changelogMode);
     }
+
+    @Override
+    public List<int[]> upsertKeyColumns() {
+        return upsertKeyColumns;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
index 2c393adf875..900d8b6a0bd 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
@@ -335,7 +335,7 @@ public class BridgingSqlFunction extends SqlFunction {
      * scalar arguments through the same coercion path as validation.
      */
     public CallContext toCallContext(RexCall call) {
-        return toCallContext(call, null, null, null);
+        return toCallContext(call, null, null, null, null);
     }
 
     /**
@@ -348,6 +348,20 @@ public class BridgingSqlFunction extends SqlFunction {
             @Nullable List<Integer> inputTimeColumns,
             @Nullable List<ChangelogMode> inputChangelogModes,
             @Nullable ChangelogMode outputChangelogMode) {
+        return toCallContext(
+                call, inputTimeColumns, inputChangelogModes, 
outputChangelogMode, null);
+    }
+
+    /**
+     * Variant that additionally exposes the call's input upsert keys. Used by 
the streaming codegen
+     * path so PTFs can specialize themselves on the input's row-identity 
information.
+     */
+    public CallContext toCallContext(
+            RexCall call,
+            @Nullable List<Integer> inputTimeColumns,
+            @Nullable List<ChangelogMode> inputChangelogModes,
+            @Nullable ChangelogMode outputChangelogMode,
+            @Nullable List<List<int[]>> inputUpsertKeys) {
         return new OperatorBindingCallContext(
                 dataTypeFactory,
                 getDefinition(),
@@ -355,7 +369,8 @@ public class BridgingSqlFunction extends SqlFunction {
                 call.getType(),
                 inputTimeColumns,
                 inputChangelogModes,
-                outputChangelogMode);
+                outputChangelogMode,
+                inputUpsertKeys);
     }
 
     /**
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
index 065a3033545..b86fe9b4175 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
@@ -50,6 +50,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import javax.annotation.Nullable;
 
 import java.util.AbstractList;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -312,6 +313,11 @@ public final class CallBindingCallContext extends 
AbstractSqlCallContext {
         public Optional<ChangelogMode> changelogMode() {
             return Optional.empty();
         }
+
+        @Override
+        public List<int[]> upsertKeyColumns() {
+            return Collections.emptyList();
+        }
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
index f31406dad19..7d1d3940899 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
@@ -64,13 +64,14 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
     private final @Nullable List<Integer> inputTimeColumns;
     private final @Nullable List<ChangelogMode> inputChangelogModes;
     private final @Nullable ChangelogMode outputChangelogMode;
+    private final @Nullable List<List<int[]>> inputUpsertKeys;
 
     public OperatorBindingCallContext(
             DataTypeFactory dataTypeFactory,
             FunctionDefinition definition,
             SqlOperatorBinding binding,
             RelDataType returnRelDataType) {
-        this(dataTypeFactory, definition, binding, returnRelDataType, null, 
null, null);
+        this(dataTypeFactory, definition, binding, returnRelDataType, null, 
null, null, null);
     }
 
     public OperatorBindingCallContext(
@@ -81,6 +82,26 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
             @Nullable List<Integer> inputTimeColumns,
             @Nullable List<ChangelogMode> inputChangelogModes,
             @Nullable ChangelogMode outputChangelogMode) {
+        this(
+                dataTypeFactory,
+                definition,
+                binding,
+                returnRelDataType,
+                inputTimeColumns,
+                inputChangelogModes,
+                outputChangelogMode,
+                null);
+    }
+
+    public OperatorBindingCallContext(
+            DataTypeFactory dataTypeFactory,
+            FunctionDefinition definition,
+            SqlOperatorBinding binding,
+            RelDataType returnRelDataType,
+            @Nullable List<Integer> inputTimeColumns,
+            @Nullable List<ChangelogMode> inputChangelogModes,
+            @Nullable ChangelogMode outputChangelogMode,
+            @Nullable List<List<int[]>> inputUpsertKeys) {
         super(
                 dataTypeFactory,
                 definition,
@@ -109,6 +130,7 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
         this.inputTimeColumns = inputTimeColumns;
         this.inputChangelogModes = inputChangelogModes;
         this.outputChangelogMode = outputChangelogMode;
+        this.inputUpsertKeys = inputUpsertKeys;
     }
 
     @Override
@@ -173,13 +195,18 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
                 Optional.ofNullable(inputChangelogModes)
                         .map(m -> m.get(tableArgCall.getInputIndex()))
                         .orElse(null);
+        final List<int[]> upsertKeys =
+                Optional.ofNullable(inputUpsertKeys)
+                        .map(m -> m.get(tableArgCall.getInputIndex()))
+                        .orElse(List.of());
         return Optional.of(
                 OperatorBindingTableSemantics.create(
                         argumentDataTypes.get(pos),
                         staticArg,
                         tableArgCall,
                         timeColumn,
-                        changelogMode));
+                        changelogMode,
+                        upsertKeys));
     }
 
     @Override
@@ -283,20 +310,23 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
         private final SortDirection[] orderByDirections;
         private final int timeColumn;
         private final @Nullable ChangelogMode changelogMode;
+        private final List<int[]> upsertKeyColumns;
 
         public static OperatorBindingTableSemantics create(
                 DataType tableDataType,
                 StaticArgument staticArg,
                 RexTableArgCall tableArgCall,
                 int timeColumn,
-                @Nullable ChangelogMode changelogMode) {
+                @Nullable ChangelogMode changelogMode,
+                List<int[]> upsertKeyColumns) {
             return new OperatorBindingTableSemantics(
                     createDataType(tableDataType, staticArg),
                     tableArgCall.getPartitionKeys(),
                     tableArgCall.getOrderKeys(),
                     
RexTableArgCall.toSortDirections(tableArgCall.getSortOrder()),
                     timeColumn,
-                    changelogMode);
+                    changelogMode,
+                    upsertKeyColumns);
         }
 
         private OperatorBindingTableSemantics(
@@ -305,13 +335,15 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
                 int[] orderByColumns,
                 SortDirection[] orderByDirections,
                 int timeColumn,
-                @Nullable ChangelogMode changelogMode) {
+                @Nullable ChangelogMode changelogMode,
+                List<int[]> upsertKeyColumns) {
             this.dataType = dataType;
             this.partitionByColumns = partitionByColumns;
             this.orderByColumns = orderByColumns;
             this.orderByDirections = orderByDirections;
             this.timeColumn = timeColumn;
             this.changelogMode = changelogMode;
+            this.upsertKeyColumns = upsertKeyColumns;
         }
 
         private static DataType createDataType(DataType tableDataType, 
StaticArgument staticArg) {
@@ -353,5 +385,10 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
         public Optional<ChangelogMode> changelogMode() {
             return Optional.ofNullable(changelogMode);
         }
+
+        @Override
+        public List<int[]> upsertKeyColumns() {
+            return upsertKeyColumns;
+        }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
index 3973329af74..e0cf5f09187 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
@@ -69,6 +69,7 @@ import 
org.apache.flink.table.types.inference.TypeInferenceUtil.StateInfo;
 import org.apache.flink.table.types.logical.RowType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.calcite.linq4j.Ord;
@@ -108,6 +109,7 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
     public static final String FIELD_NAME_FUNCTION_CALL = "functionCall";
     public static final String FIELD_NAME_INPUT_CHANGELOG_MODES = 
"inputChangelogModes";
     public static final String FIELD_NAME_OUTPUT_CHANGELOG_MODE = 
"outputChangelogMode";
+    public static final String FIELD_NAME_INPUT_UPSERT_KEYS = 
"inputUpsertKeys";
 
     @JsonProperty(FIELD_NAME_UID)
     private final @Nullable String uid;
@@ -121,6 +123,10 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
     @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE)
     private final ChangelogMode outputChangelogMode;
 
+    @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS)
+    @JsonInclude(JsonInclude.Include.NON_EMPTY)
+    private final List<List<int[]>> inputUpsertKeys;
+
     public StreamExecProcessTableFunction(
             ReadableConfig tableConfig,
             List<InputProperty> inputProperties,
@@ -129,7 +135,8 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
             @Nullable String uid,
             RexCall invocation,
             List<ChangelogMode> inputChangelogModes,
-            ChangelogMode outputChangelogMode) {
+            ChangelogMode outputChangelogMode,
+            List<List<int[]>> inputUpsertKeys) {
         this(
                 ExecNodeContext.newNodeId(),
                 
ExecNodeContext.newContext(StreamExecProcessTableFunction.class),
@@ -141,7 +148,8 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
                 uid,
                 invocation,
                 inputChangelogModes,
-                outputChangelogMode);
+                outputChangelogMode,
+                inputUpsertKeys);
     }
 
     @JsonCreator
@@ -155,7 +163,9 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
             @JsonProperty(FIELD_NAME_UID) @Nullable String uid,
             @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation,
             @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES) 
List<ChangelogMode> inputChangelogModes,
-            @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode 
outputChangelogMode) {
+            @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode 
outputChangelogMode,
+            @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS)
+                    @Nullable List<List<int[]>> inputUpsertKeys) {
         super(id, context, persistedConfig, inputProperties, outputType, 
description);
         this.uid = uid;
         // Mirror the FlinkLogicalTableFunctionScan converter for the 
compiled-plan restore path:
@@ -164,6 +174,7 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
         this.invocation = BridgingSqlFunction.resolveCallTraits((RexCall) 
invocation);
         this.inputChangelogModes = inputChangelogModes;
         this.outputChangelogMode = outputChangelogMode;
+        this.inputUpsertKeys = inputUpsertKeys != null ? inputUpsertKeys : 
List.of();
     }
 
     public @Nullable String getUid() {
@@ -202,7 +213,12 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
         final RexCall udfCall = 
StreamPhysicalProcessTableFunction.toUdfCall(invocation);
         final GeneratedRunnerResult generated =
                 ProcessTableRunnerGenerator.generate(
-                        ctx, udfCall, inputTimeColumns, inputChangelogModes, 
outputChangelogMode);
+                        ctx,
+                        udfCall,
+                        inputTimeColumns,
+                        inputChangelogModes,
+                        outputChangelogMode,
+                        inputUpsertKeys);
         final GeneratedProcessTableRunner generatedRunner = generated.runner();
         final LinkedHashMap<String, StateInfo> stateInfos = 
generated.stateInfos();
 
@@ -310,9 +326,12 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
 
         final int timeColumn = 
inputTimeColumns.get(tableArgCall.getInputIndex());
 
+        final int inputIndex = tableArgCall.getInputIndex();
+        final List<int[]> upsertKeys =
+                inputIndex < inputUpsertKeys.size() ? 
inputUpsertKeys.get(inputIndex) : List.of();
         return new RuntimeTableSemantics(
                 tableArg.getName(),
-                tableArgCall.getInputIndex(),
+                inputIndex,
                 dataType,
                 tableArgCall.getPartitionKeys(),
                 tableArgCall.getOrderKeys(),
@@ -320,7 +339,8 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
                 consumedChangelogMode,
                 tableArg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH),
                 tableArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE),
-                timeColumn);
+                timeColumn,
+                upsertKeys);
     }
 
     private Transformation<RowData> createKeyedTransformation(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
index 5ccecf18e71..a337c250860 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.functions.ProcessTableFunction;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.RexTableArgCall;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction;
@@ -62,6 +63,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
@@ -165,6 +167,20 @@ public class StreamPhysicalProcessTableFunction extends 
AbstractRelNode
         verifyTimeAttributes(getInputs(), call, inputChangelogModes, 
outputChangelogMode);
         final List<Ord<StaticArgument>> providedInputArgs = 
getProvidedInputArgs(call);
         verifyPassThroughColumnsForUpdates(providedInputArgs, 
outputChangelogMode);
+        final FlinkRelMetadataQuery fmq =
+                
FlinkRelMetadataQuery.reuseOrCreate(getCluster().getMetadataQuery());
+        final List<List<int[]>> inputUpsertKeys =
+                getInputs().stream()
+                        .map(
+                                input -> {
+                                    final Set<ImmutableBitSet> keys = 
fmq.getUpsertKeys(input);
+                                    return keys == null
+                                            ? Collections.<int[]>emptyList()
+                                            : keys.stream()
+                                                    
.map(ImmutableBitSet::toArray)
+                                                    
.collect(Collectors.toList());
+                                })
+                        .collect(Collectors.toList());
         return new StreamExecProcessTableFunction(
                 unwrapTableConfig(this),
                 getInputs().stream().map(i -> 
InputProperty.DEFAULT).collect(Collectors.toList()),
@@ -173,7 +189,8 @@ public class StreamPhysicalProcessTableFunction extends 
AbstractRelNode
                 uid,
                 call,
                 inputChangelogModes,
-                outputChangelogMode);
+                outputChangelogMode,
+                inputUpsertKeys);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java
index cc301118b29..af99d92e9ed 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.table.planner.plan.utils;
 
+import org.apache.flink.table.utils.UpsertKeyUtils;
+
 import org.apache.calcite.util.ImmutableBitSet;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Utility for upsertKey which represented as a Set of {@link
@@ -55,21 +59,8 @@ public class UpsertKeyUtil {
         if (null == upsertKeys || upsertKeys.isEmpty()) {
             return Optional.empty();
         }
-        return upsertKeys.stream()
-                .map(ImmutableBitSet::toArray)
-                .reduce(
-                        (k1, k2) -> {
-                            if (k1.length < k2.length) {
-                                return k1;
-                            }
-                            if (k1.length == k2.length) {
-                                for (int index = 0; index < k1.length; 
index++) {
-                                    if (k1[index] < k2[index]) {
-                                        return k1;
-                                    }
-                                }
-                            }
-                            return k2;
-                        });
+        final List<int[]> asArrays =
+                
upsertKeys.stream().map(ImmutableBitSet::toArray).collect(Collectors.toList());
+        return Optional.of(UpsertKeyUtils.smallestKey(asArrays));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
index 52df803d5c8..ea9f5ca4c77 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
@@ -65,7 +65,8 @@ object ProcessTableRunnerGenerator {
       udfCall: RexCall,
       inputTimeColumns: java.util.List[Integer],
       inputChangelogModes: java.util.List[ChangelogMode],
-      outputChangelogMode: ChangelogMode): GeneratedRunnerResult = {
+      outputChangelogMode: ChangelogMode,
+      inputUpsertKeys: java.util.List[java.util.List[Array[Int]]]): 
GeneratedRunnerResult = {
     val function: BridgingSqlFunction = 
udfCall.getOperator.asInstanceOf[BridgingSqlFunction]
     val definition: FunctionDefinition = function.getDefinition
     val dataTypeFactory = function.getDataTypeFactory
@@ -77,7 +78,12 @@ object ProcessTableRunnerGenerator {
     // Thus, functions can reconfigure themselves for the exact use case.
     // Including updating their state layout.
     val callContext =
-      function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, 
outputChangelogMode)
+      function.toCallContext(
+        udfCall,
+        inputTimeColumns,
+        inputChangelogModes,
+        outputChangelogMode,
+        inputUpsertKeys)
 
     // Create the final UDF for runtime
     val udf = UserDefinedFunctionHelper.createSpecializedFunction(
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
index ea961c665ca..324327cad4b 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
@@ -78,7 +78,8 @@
       "type" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) NOT 
NULL, `score` BIGINT> NOT NULL"
     },
     "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ] ],
-    "outputChangelogMode" : [ "INSERT" ]
+    "outputChangelogMode" : [ "INSERT" ],
+    "inputUpsertKeys" : [ [ [ 0 ] ] ]
   }, {
     "id" : 3,
     "type" : "stream-exec-sink_2",
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java
index cabab4c6131..7f0fbf0c3d3 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.functions.TableSemantics;
 import org.apache.flink.table.types.DataType;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -44,6 +45,7 @@ public class RuntimeTableSemantics implements TableSemantics, 
Serializable {
     private final boolean passColumnsThrough;
     private final boolean hasSetSemantics;
     private final int timeColumn;
+    private final List<int[]> upsertKeyColumns;
 
     private transient ChangelogMode changelogMode;
 
@@ -57,7 +59,8 @@ public class RuntimeTableSemantics implements TableSemantics, 
Serializable {
             RuntimeChangelogMode consumedChangelogMode,
             boolean passColumnsThrough,
             boolean hasSetSemantics,
-            int timeColumn) {
+            int timeColumn,
+            List<int[]> upsertKeyColumns) {
         this.argName = argName;
         this.inputIndex = inputIndex;
         this.dataType = dataType;
@@ -68,6 +71,7 @@ public class RuntimeTableSemantics implements TableSemantics, 
Serializable {
         this.passColumnsThrough = passColumnsThrough;
         this.hasSetSemantics = hasSetSemantics;
         this.timeColumn = timeColumn;
+        this.upsertKeyColumns = upsertKeyColumns;
     }
 
     public String getArgName() {
@@ -122,4 +126,9 @@ public class RuntimeTableSemantics implements 
TableSemantics, Serializable {
     public Optional<ChangelogMode> changelogMode() {
         return Optional.of(getChangelogMode());
     }
+
+    @Override
+    public List<int[]> upsertKeyColumns() {
+        return upsertKeyColumns;
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
index be390ab5f55..15258ff24cb 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
@@ -246,7 +246,8 @@ class ProcessSetTableOperatorInterruptibleTimersTest {
                 RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()),
                 /* passColumnsThrough */ false,
                 /* hasSetSemantics */ true,
-                /* timeColumn */ 1);
+                /* timeColumn */ 1,
+                /* upsertKeyColumns */ List.of());
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java
 
b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java
index fadf21d7dd9..91edd9a059d 100644
--- 
a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java
+++ 
b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java
@@ -23,6 +23,8 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.functions.TableSemantics;
 import org.apache.flink.table.types.DataType;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 
 /** {@link TableSemantics} implementation for {@link 
ProcessTableFunctionTestHarness}. */
@@ -30,10 +32,17 @@ import java.util.Optional;
 class TestHarnessTableSemantics implements TableSemantics {
     private final DataType dataType;
     private final int[] partitionByColumns;
+    private final List<int[]> upsertKeyColumns;
 
     TestHarnessTableSemantics(DataType dataType, int[] partitionByColumns) {
+        this(dataType, partitionByColumns, Collections.emptyList());
+    }
+
+    TestHarnessTableSemantics(
+            DataType dataType, int[] partitionByColumns, List<int[]> 
upsertKeyColumns) {
         this.dataType = dataType;
         this.partitionByColumns = partitionByColumns;
+        this.upsertKeyColumns = upsertKeyColumns;
     }
 
     @Override
@@ -65,4 +74,9 @@ class TestHarnessTableSemantics implements TableSemantics {
     public Optional<ChangelogMode> changelogMode() {
         return Optional.empty();
     }
+
+    @Override
+    public List<int[]> upsertKeyColumns() {
+        return upsertKeyColumns;
+    }
 }


Reply via email to