This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 055edcc7cb8 [FLINK-39392][table] Support conditional traits for PTFs
055edcc7cb8 is described below

commit 055edcc7cb8abdaa4be33b3baff365571c1da023
Author: Gustavo de Morais <[email protected]>
AuthorDate: Mon Apr 27 14:07:53 2026 +0200

    [FLINK-39392][table] Support conditional traits for PTFs
    
    This closes #27886.
---
 .../docs/sql/reference/queries/changelog.md        |  10 +-
 .../functions/BuiltInFunctionDefinitions.java      |  31 ++--
 .../table/types/inference/BuiltInCondition.java    |  74 +++++++++
 .../table/types/inference/StaticArgument.java      | 134 ++++++++++++++++-
 .../table/types/inference/StaticArgumentTrait.java |  24 +++
 .../table/types/inference/SystemTypeInference.java |  44 +++++-
 .../table/types/inference/TraitCondition.java      |  71 +++++++++
 .../flink/table/types/inference/TraitContext.java  |  81 ++++++++++
 flink-table/flink-table-planner/AGENTS.md          |  16 ++
 .../functions/bridging/BridgingSqlFunction.java    | 166 ++++++++++++++++++++-
 .../stream/StreamExecProcessTableFunction.java     |   6 +-
 .../logical/FlinkLogicalTableFunctionScan.java     |  10 +-
 .../stream/StreamPhysicalProcessTableFunction.java |  23 ---
 .../codegen/ProcessTableRunnerGenerator.scala      |   8 +-
 .../FlinkChangelogModeInferenceProgram.scala       |  34 ++---
 .../planner/plan/stream/sql/ToChangelogTest.java   |  17 +++
 .../planner/plan/stream/sql/ToChangelogTest.xml    |  20 +++
 17 files changed, 687 insertions(+), 82 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index 60773af2b27..494dcc60be8 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -177,7 +177,7 @@ This is useful when you need to materialize changelog 
events into a downstream s
 
 ```sql
 SELECT * FROM TO_CHANGELOG(
-  input => TABLE source_table,
+  input => TABLE source_table [PARTITION BY key_col],
   [op => DESCRIPTOR(op_column_name),]
   [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
 )
@@ -185,10 +185,10 @@ SELECT * FROM TO_CHANGELOG(
 
 ### Parameters
 
-| Parameter    | Required | Description |
-|:-------------|:---------|:------------|
-| `input`      | Yes      | The input table. Accepts insert-only, retract, and 
upsert tables. |
-| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. |
+| Parameter    | Required | Description                                        
                                                                                
                                                                                
                                                                                
                                                      |
+|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `input`      | Yes      | The input table. With `PARTITION BY`, rows with 
the same key are co-located and run in the same operator instance. Without 
`PARTITION BY`, each row is processed independently. Accepts insert-only, 
retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key 
should match or be a subset of the upsert key of the subquery.                  
                    |
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`.                                        
                                                                                
                                                                                
                                                        |
 | `op_mapping` | No       | A `MAP<STRING, STRING>` mapping change operation 
names to custom output codes. Keys can contain comma-separated names to map 
multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When 
provided, only mapped operations are forwarded - unmapped events are dropped. 
Each change operation may appear at most once across all entries. |
 
 #### Default op_mapping
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index bbe38ca13d5..cd25361923c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.InputTypeStrategies;
 import org.apache.flink.table.types.inference.StaticArgument;
 import org.apache.flink.table.types.inference.StaticArgumentTrait;
+import org.apache.flink.table.types.inference.TraitCondition;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import 
org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
@@ -785,22 +786,22 @@ public final class BuiltInFunctionDefinitions {
                     .name("TO_CHANGELOG")
                     .kind(PROCESS_TABLE)
                     .staticArguments(
-                            // Row semantics (no PARTITION BY). Accepts 
updating
-                            // inputs. The planner inserts ChangelogNormalize 
for
-                            // upsert sources to produce UPDATE_BEFORE and full
-                            // DELETE rows.
+                            // Row semantics (no PARTITION BY).
+                            // With PARTITION BY, switches to set
+                            // semantics for co-located parallel execution.
                             StaticArgument.table(
-                                    "input",
-                                    Row.class,
-                                    false,
-                                    EnumSet.of(
-                                            StaticArgumentTrait.TABLE,
-                                            
StaticArgumentTrait.ROW_SEMANTIC_TABLE,
-                                            
StaticArgumentTrait.SUPPORT_UPDATES,
-                                            
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
-                                            // Not strictly necessary but 
explicitly state that
-                                            // we require full deletes.
-                                            
StaticArgumentTrait.REQUIRE_FULL_DELETE)),
+                                            "input",
+                                            Row.class,
+                                            false,
+                                            EnumSet.of(
+                                                    StaticArgumentTrait.TABLE,
+                                                    
StaticArgumentTrait.ROW_SEMANTIC_TABLE,
+                                                    
StaticArgumentTrait.SUPPORT_UPDATES,
+                                                    
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
+                                                    
StaticArgumentTrait.REQUIRE_FULL_DELETE))
+                                    .withConditionalTrait(
+                                            
StaticArgumentTrait.SET_SEMANTIC_TABLE,
+                                            TraitCondition.hasPartitionBy()),
                             StaticArgument.scalar("op", 
DataTypes.DESCRIPTOR(), true),
                             StaticArgument.scalar(
                                     "op_mapping",
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java
new file mode 100644
index 00000000000..2a06191ad13
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+/**
+ * Internal value-comparable wrapper used by all built-in {@link 
TraitCondition} factories. Equality
+ * is keyed by {@code kind + args}; the {@code impl} predicate is reused but 
never compared, so two
+ * conditions built from the same factory inputs are equal.
+ *
+ * <p>Lives outside {@link TraitCondition} because Java forbids {@code 
private} nested types in
+ * interfaces (they are implicitly {@code public static}); top-level 
package-private gives the same
+ * encapsulation.
+ */
+final class BuiltInCondition implements TraitCondition {
+
+    /** Tag identifying which factory produced the condition. */
+    enum Kind {
+        HAS_PARTITION_BY,
+        ARG_IS_EQUAL_TO,
+        NOT
+    }
+
+    private final Kind kind;
+    private final List<Object> args;
+    private final Predicate<TraitContext> impl;
+
+    BuiltInCondition(final Kind kind, final List<Object> args, final 
Predicate<TraitContext> impl) {
+        this.kind = kind;
+        this.args = args;
+        this.impl = impl;
+    }
+
+    @Override
+    public boolean test(final TraitContext ctx) {
+        return impl.test(ctx);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof BuiltInCondition)) {
+            return false;
+        }
+        final BuiltInCondition that = (BuiltInCondition) o;
+        return kind == that.kind && args.equals(that.args);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(kind, args);
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
index 3f5c48db8d2..546f30a7f7e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
@@ -31,10 +31,13 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Describes an argument in a static signature that is not overloaded and does 
not support varargs.
@@ -57,6 +60,7 @@ public class StaticArgument {
     private final @Nullable Class<?> conversionClass;
     private final boolean isOptional;
     private final EnumSet<StaticArgumentTrait> traits;
+    private final List<ConditionalTrait> conditionalTraits;
 
     private StaticArgument(
             String name,
@@ -64,11 +68,22 @@ public class StaticArgument {
             @Nullable Class<?> conversionClass,
             boolean isOptional,
             EnumSet<StaticArgumentTrait> traits) {
+        this(name, dataType, conversionClass, isOptional, traits, List.of());
+    }
+
+    private StaticArgument(
+            String name,
+            @Nullable DataType dataType,
+            @Nullable Class<?> conversionClass,
+            boolean isOptional,
+            EnumSet<StaticArgumentTrait> traits,
+            List<ConditionalTrait> conditionalTraits) {
         this.name = Preconditions.checkNotNull(name, "Name must not be null.");
         this.dataType = dataType;
         this.conversionClass = conversionClass;
         this.isOptional = isOptional;
         this.traits = Preconditions.checkNotNull(traits, "Traits must not be 
null.");
+        this.conditionalTraits = conditionalTraits;
         checkName();
         checkTraits(traits);
         checkOptionalType();
@@ -196,6 +211,84 @@ public class StaticArgument {
         return traits.contains(trait);
     }
 
+    /**
+     * Context-aware trait check. Evaluates conditional trait rules against 
the given context to
+     * determine the effective traits.
+     */
+    public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+        return resolveTraits(ctx).contains(trait);
+    }
+
+    /**
+     * Returns a new {@link StaticArgument} with an additional conditional 
trait rule. The trait is
+     * added to the effective trait set when the condition evaluates to {@code 
true} at planning
+     * time. Only non-root traits (subtraits of TABLE, SCALAR, or MODEL) are 
allowed.
+     *
+     * <p>Multiple conditions for the same trait use OR semantics: the trait 
is activated if any of
+     * its conditions is met.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, 
SUPPORT_UPDATES))
+     *         .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy());
+     * }</pre>
+     */
+    public StaticArgument withConditionalTrait(
+            final StaticArgumentTrait trait, final TraitCondition condition) {
+        if (trait.isRoot()) {
+            throw new IllegalArgumentException(
+                    "Root traits (SCALAR, TABLE, MODEL) cannot be 
conditional.");
+        }
+        final List<ConditionalTrait> accumulated = new 
ArrayList<>(this.conditionalTraits);
+        accumulated.add(new ConditionalTrait(condition, trait));
+        return new StaticArgument(name, dataType, conversionClass, isOptional, 
traits, accumulated);
+    }
+
+    /** Whether this argument has conditional trait rules. */
+    public boolean hasConditionalTraits() {
+        return !conditionalTraits.isEmpty();
+    }
+
+    /** Whether any conditional trait rule may add the given trait. */
+    public boolean hasConditionalTrait(final StaticArgumentTrait trait) {
+        return conditionalTraits.stream().anyMatch(c -> c.trait == trait);
+    }
+
+    /**
+     * Returns a new {@link StaticArgument} with conditional traits resolved 
against the given
+     * context. The returned argument has the effective traits baked in and no 
conditional rules.
+     */
+    public StaticArgument applyConditionalTraits(final TraitContext ctx) {
+        if (conditionalTraits.isEmpty()) {
+            return this;
+        }
+        return new StaticArgument(name, dataType, conversionClass, isOptional, 
resolveTraits(ctx));
+    }
+
+    /**
+     * Resolves effective traits by evaluating conditional rules against the 
context. Returns the
+     * base traits combined with any conditional traits whose conditions are 
met.
+     */
+    public EnumSet<StaticArgumentTrait> resolveTraits(final TraitContext ctx) {
+        if (conditionalTraits.isEmpty()) {
+            return traits;
+        }
+        final EnumSet<StaticArgumentTrait> resolved = EnumSet.copyOf(traits);
+        for (final ConditionalTrait conditionalTrait : conditionalTraits) {
+            if (conditionalTrait.condition.test(ctx)) {
+                removeMutuallyExclusiveTraits(resolved, 
conditionalTrait.trait);
+                resolved.add(conditionalTrait.trait);
+            }
+        }
+        return resolved;
+    }
+
+    private static void removeMutuallyExclusiveTraits(
+            final EnumSet<StaticArgumentTrait> traits, final 
StaticArgumentTrait adding) {
+        traits.removeAll(adding.getIncompatibleWith());
+    }
+
     @Override
     public String toString() {
         final StringBuilder s = new StringBuilder();
@@ -210,11 +303,13 @@ public class StaticArgument {
             s.append(dataType);
         }
         if (!traits.equals(EnumSet.of(StaticArgumentTrait.SCALAR))) {
+            final Stream<String> baseTraitNames =
+                    traits.stream().map(Enum::name).map(n -> n.replace('_', ' 
'));
+            final Stream<String> conditionalTraitNames =
+                    conditionalTraits.stream().map(c -> 
c.trait.name().replace('_', ' '));
             s.append(" ");
             s.append(
-                    traits.stream()
-                            .map(Enum::name)
-                            .map(n -> n.replace('_', ' '))
+                    Stream.concat(baseTraitNames, conditionalTraitNames)
                             .collect(Collectors.joining(", ", "{", "}")));
         }
         return s.toString();
@@ -233,12 +328,13 @@ public class StaticArgument {
                 && Objects.equals(name, that.name)
                 && Objects.equals(dataType, that.dataType)
                 && Objects.equals(conversionClass, that.conversionClass)
-                && Objects.equals(traits, that.traits);
+                && Objects.equals(traits, that.traits)
+                && Objects.equals(conditionalTraits, that.conditionalTraits);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, dataType, conversionClass, isOptional, 
traits);
+        return Objects.hash(name, dataType, conversionClass, isOptional, 
traits, conditionalTraits);
     }
 
     private void checkName() {
@@ -354,4 +450,32 @@ public class StaticArgument {
             throw new ValidationException("Model arguments must not be 
optional.");
         }
     }
+
+    /** A trait that is conditionally added based on a {@link TraitCondition}. 
*/
+    private static final class ConditionalTrait {
+        private final TraitCondition condition;
+        private final StaticArgumentTrait trait;
+
+        ConditionalTrait(final TraitCondition condition, final 
StaticArgumentTrait trait) {
+            this.condition = condition;
+            this.trait = trait;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            final ConditionalTrait that = (ConditionalTrait) o;
+            return Objects.equals(condition, that.condition) && trait == 
that.trait;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(condition, trait);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
index 647248ada2d..7b0083ed2dc 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.types.inference;
 import org.apache.flink.annotation.PublicEvolving;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -51,6 +53,8 @@ public enum StaticArgumentTrait {
     REQUIRE_UPDATE_BEFORE(SUPPORT_UPDATES),
     REQUIRE_FULL_DELETE(SUPPORT_UPDATES);
 
+    private static final Set<StaticArgumentTrait> ROOTS = EnumSet.of(SCALAR, 
TABLE, MODEL);
+
     private final Set<StaticArgumentTrait> requirements;
 
     StaticArgumentTrait(StaticArgumentTrait... requirements) {
@@ -60,4 +64,24 @@ public enum StaticArgumentTrait {
     public Set<StaticArgumentTrait> getRequirements() {
         return requirements;
     }
+
+    /** Whether this trait is one of the top-level roots (SCALAR, TABLE, 
MODEL). */
+    public boolean isRoot() {
+        return ROOTS.contains(this);
+    }
+
+    /**
+     * Returns the traits that are mutually exclusive with this one. Adding 
this trait to a set
+     * implies removing all returned traits. Empty by default.
+     */
+    public Set<StaticArgumentTrait> getIncompatibleWith() {
+        switch (this) {
+            case SET_SEMANTIC_TABLE:
+                return Collections.singleton(ROW_SEMANTIC_TABLE);
+            case ROW_SEMANTIC_TABLE:
+                return Collections.singleton(SET_SEMANTIC_TABLE);
+            default:
+                return Collections.emptySet();
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
index 92fd79f407b..971831d6ac4 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
@@ -182,6 +182,32 @@ public class SystemTypeInference {
         }
     }
 
+    /**
+     * Resolves conditional traits (see {@link 
StaticArgument#withConditionalTrait}) on every static
+     * arg using the call's semantics and operands. Called once at the top of 
{@link
+     * SystemInputStrategy#inferInputTypes} and {@link 
SystemOutputStrategy#inferType}; downstream
+     * helpers receive the resolved list and iterate it directly.
+     */
+    private static List<StaticArgument> resolveStaticArgs(
+            final CallContext callContext, final @Nullable 
List<StaticArgument> staticArgs) {
+        if (staticArgs == null) {
+            return null;
+        }
+        return IntStream.range(0, staticArgs.size())
+                .mapToObj(
+                        pos -> {
+                            final StaticArgument arg = staticArgs.get(pos);
+                            if (!arg.hasConditionalTraits()) {
+                                return arg;
+                            }
+                            final TableSemantics semantics =
+                                    
callContext.getTableSemantics(pos).orElse(null);
+                            return arg.applyConditionalTraits(
+                                    TraitContext.of(semantics, callContext, 
staticArgs));
+                        })
+                .collect(Collectors.toList());
+    }
+
     private static void checkMultipleTableArgs(List<StaticArgument> 
staticArgs) {
         final List<StaticArgument> tableArgs =
                 staticArgs.stream()
@@ -262,6 +288,10 @@ public class SystemTypeInference {
             return origin.inferType(callContext)
                     .map(
                             functionDataType -> {
+                                // Resolve once so all helpers see the same 
effective signature
+                                // (PARTITION BY / scalar literals applied to 
conditional traits).
+                                final List<StaticArgument> resolvedArgs =
+                                        resolveStaticArgs(callContext, 
staticArgs);
                                 final List<Field> fields = new ArrayList<>();
 
                                 // According to the SQL standard, pass-through 
columns should
@@ -273,11 +303,11 @@ public class SystemTypeInference {
                                 // - Flink SESSION windows add pass-through 
columns at the beginning
                                 // - Oracle adds pass-through columns for all 
ROW semantics args, so
                                 // this whole topic is kind of vendor specific 
already
-                                
fields.addAll(derivePassThroughFields(callContext));
+                                
fields.addAll(derivePassThroughFields(callContext, resolvedArgs));
                                 
fields.addAll(deriveFunctionOutputFields(functionDataType));
 
                                 if (!disableSystemArgs) {
-                                    
fields.addAll(deriveRowtimeField(callContext));
+                                    
fields.addAll(deriveRowtimeField(callContext, resolvedArgs));
                                 }
 
                                 final List<Field> uniqueFields = 
makeFieldNamesUnique(fields);
@@ -303,7 +333,8 @@ public class SystemTypeInference {
                     .collect(Collectors.toList());
         }
 
-        private List<Field> derivePassThroughFields(CallContext callContext) {
+        private List<Field> derivePassThroughFields(
+                CallContext callContext, List<StaticArgument> staticArgs) {
             if (functionKind != FunctionKind.PROCESS_TABLE) {
                 return List.of();
             }
@@ -349,7 +380,8 @@ public class SystemTypeInference {
                     .collect(Collectors.toList());
         }
 
-        private List<Field> deriveRowtimeField(CallContext callContext) {
+        private List<Field> deriveRowtimeField(
+                CallContext callContext, List<StaticArgument> staticArgs) {
             if (this.functionKind != FunctionKind.PROCESS_TABLE) {
                 return List.of();
             }
@@ -566,8 +598,10 @@ public class SystemTypeInference {
                                 + "that is not overloaded and doesn't contain 
varargs.");
             }
 
+            // Resolve once so the rest of validation iterates the effective 
signature.
+            final List<StaticArgument> resolvedArgs = 
resolveStaticArgs(callContext, staticArgs);
             try {
-                checkTableArgs(staticArgs, callContext);
+                checkTableArgs(resolvedArgs, callContext);
                 if (!disableSystemArgs) {
                     checkUidArg(callContext);
                 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java
new file mode 100644
index 00000000000..93bdc5c8ce4
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.List;
+
+/**
+ * A condition that determines whether a conditional trait on a {@link 
StaticArgument} should be
+ * active for a given call.
+ *
+ * <p>Conditions are evaluated at planning time using the {@link TraitContext} 
which provides access
+ * to the SQL call's properties (PARTITION BY presence, scalar literal values, 
etc.).
+ *
+ * <p>Implementations must implement {@code hashCode} and {@code equals} for 
{@link
+ * StaticArgument#equals}/{@link StaticArgument#hashCode} to work correctly. 
The built-in factories
+ * below return value-comparable instances; user-supplied lambdas do not - 
prefer the factories.
+ *
+ * <pre>{@code
+ * import static org.apache.flink.table.types.inference.TraitCondition.*;
+ *
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, 
SUPPORT_UPDATES))
+ *         .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy());
+ * }</pre>
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface TraitCondition {
+
+    /** Evaluates this condition against the given context. */
+    boolean test(TraitContext ctx);
+
+    /** True when PARTITION BY is provided on the table argument. */
+    static TraitCondition hasPartitionBy() {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.HAS_PARTITION_BY, List.of(), 
TraitContext::hasPartitionBy);
+    }
+
+    /** True when the named scalar argument equals the expected value. */
+    @SuppressWarnings("unchecked")
+    static <T> TraitCondition argIsEqualTo(final String name, final T 
expected) {
+        final Class<T> clazz = (Class<T>) expected.getClass();
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.ARG_IS_EQUAL_TO,
+                List.of(name, expected),
+                ctx -> ctx.getScalarArgument(name, 
clazz).map(expected::equals).orElse(false));
+    }
+
+    /** Negates the given condition. */
+    static TraitCondition not(final TraitCondition condition) {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.NOT, List.of(condition), ctx -> 
!condition.test(ctx));
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java
new file mode 100644
index 00000000000..e58a4b36dce
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.functions.TableSemantics;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Read-only context provided to {@link TraitCondition} during trait 
resolution at planning time.
+ *
+ * <p>Allows conditions to inspect the SQL call (e.g., whether PARTITION BY 
was provided, or what
+ * value a scalar argument has) to decide whether a conditional trait should 
be active.
+ */
+@PublicEvolving
+public interface TraitContext {
+
+    /** Whether PARTITION BY was provided on this table argument. */
+    boolean hasPartitionBy();
+
+    /**
+     * Reads a scalar argument value by name.
+     *
+     * @return the argument value, or empty if the argument was not provided, 
is not a literal, or
+     *     cannot be converted to the requested type
+     */
+    <T> Optional<T> getScalarArgument(String name, Class<T> clazz);
+
+    /**
+     * Builds a {@link TraitContext} from validation-time inputs.
+     *
+     * <p>Used by {@code SystemTypeInference} when wrapping a function's 
strategies. Planner-side
+     * code that has a {@code RexCall} should use the planner adapter in 
{@code BridgingSqlFunction}
+     * instead.
+     */
+    static TraitContext of(
+            @Nullable final TableSemantics semantics,
+            final CallContext callContext,
+            final List<StaticArgument> staticArgs) {
+        return new TraitContext() {
+            @Override
+            public boolean hasPartitionBy() {
+                return semantics != null && 
semantics.partitionByColumns().length > 0;
+            }
+
+            @Override
+            public <T> Optional<T> getScalarArgument(final String name, final 
Class<T> clazz) {
+                for (int i = 0; i < staticArgs.size(); i++) {
+                    final StaticArgument arg = staticArgs.get(i);
+                    if (arg.is(StaticArgumentTrait.SCALAR) && 
arg.getName().equals(name)) {
+                        if (!callContext.isArgumentLiteral(i)) {
+                            return Optional.empty();
+                        }
+                        return callContext.getArgumentValue(i, clazz);
+                    }
+                }
+                return Optional.empty();
+            }
+        };
+    }
+}
diff --git a/flink-table/flink-table-planner/AGENTS.md 
b/flink-table/flink-table-planner/AGENTS.md
index 969c5b36656..8b45353ecbf 100644
--- a/flink-table/flink-table-planner/AGENTS.md
+++ b/flink-table/flink-table-planner/AGENTS.md
@@ -106,6 +106,22 @@ When bumping an ExecNode version, update the 
`@ExecNodeMetadata` annotation's `v
 
 New features often introduce `ExecutionConfigOptions` entries (in 
`flink-table-api-java`) for runtime tunability (e.g., cache sizes, timeouts, 
batch sizes).
 
+### PTF conditional traits
+
+A *conditional trait* lets a PTF's table-argument traits depend on the call 
site instead of being fixed at declaration. Example for `TO_CHANGELOG`: the 
`input` argument is row-semantic by default (single stream, no PARTITION BY), 
but switches to set-semantic when the user writes `PARTITION BY` so the runtime 
can co-locate state per key. One declaration, two effective signatures 
depending on the call.
+
+**Declaration.** Built-in functions add conditional rules in 
`BuiltInFunctionDefinitions` via `StaticArgument.withConditionalTrait(trait, 
condition)`. The condition (a `TraitCondition`) is a small value-comparable 
predicate evaluated against a `TraitContext`. Built-in factories live on 
`TraitCondition` (`hasPartitionBy()`, `argIsEqualTo(name, value)`, `not(c)`); 
under the hood they wrap into the package-private `BuiltInCondition` so 
equality cascades correctly through `StaticArgument.equals`.
+
+**Evaluation.** A `TraitCondition` reads two things: whether `PARTITION BY` is 
present on this table arg, and the literal value of named scalar args. Both 
come through `TraitContext`. There are two factories: 
`TraitContext.of(TableSemantics, CallContext, declared)` for the validation 
side (called from `SystemTypeInference.resolveStaticArgs`) and a planner-side 
adapter inside `BridgingSqlFunction.buildTraitContext` that sources the same 
data from a `RexCall` + `RexTableArgCall`. Same logi [...]
+
+**Resolution.** Three call sites bake conditional traits into the operator's 
effective signature:
+
+1. **Validation** — `SystemTypeInference.resolveStaticArgs` runs once each 
from `inferInputTypes` and `inferType`. Twice per validation pass; can't dedupe 
across Calcite hooks because each gets a different `CallContext` instance.
+2. **Planning** — `BridgingSqlFunction.resolveCallTraits` is called from 
`FlinkLogicalTableFunctionScan.Converter.convert`. It rewrites the operator on 
the `RexCall` so all downstream readers see the resolved view via plain 
`function.getTypeInference().getStaticArguments()`.
+3. **Compiled-plan restore** — `BridgingSqlFunction.resolveCallTraits` is 
called again from `StreamExecProcessTableFunction.@JsonCreator`, because the 
JSON path skips the logical converter. Without this hook, restore would 
silently produce wrong results for any conditional-trait PTF.
+
+The payoff: downstream rules, exec nodes, codegen, and changelog inference all 
use ordinary `staticArg.is(SET_SEMANTIC_TABLE)` checks. No consumer needs to 
know that conditional traits exist. Why three sites and not one. The three 
resolution points exist because they sit in different lifecycles that can't 
share state.
+
 ## Testing Patterns
 
 Choose test types based on what you're changing:
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
index 1cad3c7ec86..059fcd6184d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.functions.bridging;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.ContextResolvedFunction;
 import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionIdentifier;
@@ -30,17 +31,25 @@ import org.apache.flink.table.planner.calcite.FlinkContext;
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.RexFactory;
+import org.apache.flink.table.planner.calcite.RexTableArgCall;
+import 
org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.StaticArgument;
 import org.apache.flink.table.types.inference.StaticArgumentTrait;
 import org.apache.flink.table.types.inference.SystemTypeInference;
+import org.apache.flink.table.types.inference.TraitContext;
 import org.apache.flink.table.types.inference.TypeInference;
 
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCallBinding;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
@@ -50,8 +59,11 @@ import org.apache.calcite.sql.type.SqlReturnTypeInference;
 import org.apache.calcite.tools.RelBuilder;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static 
org.apache.flink.table.planner.functions.bridging.BridgingUtils.createName;
 import static 
org.apache.flink.table.planner.functions.bridging.BridgingUtils.createSqlFunctionCategory;
@@ -216,6 +228,152 @@ public class BridgingSqlFunction extends SqlFunction {
         return resolvedFunction.getDefinition().isDeterministic();
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // Conditional trait resolution
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Rewrites {@code call} so that the operator's {@link StaticArgument}s 
have any conditional
+     * traits (see {@link StaticArgument#withConditionalTrait}) applied 
against the call site
+     * (PARTITION BY, scalar literals). Downstream consumers can then treat 
the operator's static
+     * arguments as the effective signature and use plain {@code 
arg.is(SET_SEMANTIC_TABLE)} checks.
+     *
+     * <p>Called from the two places where a planner-level {@link RexCall} for 
a PTF is first built
+     * for downstream consumption: {@code FlinkLogicalTableFunctionScan} 
converter (fresh planning)
+     * and {@code StreamExecProcessTableFunction.@JsonCreator} (compiled-plan 
restore). A no-op for
+     * non-PTF calls and for PTFs that declare no conditional traits.
+     */
+    public static RexCall resolveCallTraits(RexCall call) {
+        if (!(call.getOperator() instanceof BridgingSqlFunction)) {
+            return call;
+        }
+        final BridgingSqlFunction function = (BridgingSqlFunction) 
call.getOperator();
+        final List<StaticArgument> declared =
+                function.typeInference.getStaticArguments().orElse(null);
+        if (declared == null || 
declared.stream().noneMatch(StaticArgument::hasConditionalTraits)) {
+            return call;
+        }
+        final List<RexNode> operands = call.getOperands();
+        final CallContext callContext = function.toCallContext(call);
+        final List<StaticArgument> resolved =
+                IntStream.range(0, declared.size())
+                        .mapToObj(
+                                i ->
+                                        resolveArg(
+                                                declared.get(i),
+                                                declared,
+                                                operands,
+                                                i,
+                                                callContext))
+                        .collect(Collectors.toList());
+        if (resolved.equals(declared)) {
+            return call;
+        }
+        final BridgingSqlFunction rewritten = 
function.withStaticArguments(resolved);
+        // Use a fresh RexBuilder from the function's own type factory so this 
can run from a
+        // Jackson @JsonCreator that has no planner context.
+        return (RexCall)
+                new RexBuilder(function.typeFactory).makeCall(call.getType(), 
rewritten, operands);
+    }
+
+    private static StaticArgument resolveArg(
+            StaticArgument declaredArg,
+            List<StaticArgument> declared,
+            List<RexNode> operands,
+            int index,
+            CallContext callContext) {
+        // We only resolve conditional traits for the Table Argument with 
conditional traits
+        if (!declaredArg.hasConditionalTraits()
+                || !(operands.get(index) instanceof RexTableArgCall)) {
+            return declaredArg;
+        }
+        return declaredArg.applyConditionalTraits(
+                buildTraitContext((RexTableArgCall) operands.get(index), 
declared, callContext));
+    }
+
+    /**
+     * Planner-side adapter to {@link TraitContext}. Sourced from a {@link 
RexCall}: PARTITION BY
+     * via the {@link RexTableArgCall} operand, scalar literals via the {@link 
CallContext} wrapper.
+     * Equivalent to {@link TraitContext#of} but takes its inputs from a 
planner-side call instead
+     * of validation-side {@link 
org.apache.flink.table.functions.TableSemantics}.
+     */
+    private static TraitContext buildTraitContext(
+            RexTableArgCall tableArgCall, List<StaticArgument> declared, 
CallContext callContext) {
+        return new TraitContext() {
+            @Override
+            public boolean hasPartitionBy() {
+                return tableArgCall.getPartitionKeys().length > 0;
+            }
+
+            @Override
+            public <T> Optional<T> getScalarArgument(String name, Class<T> 
clazz) {
+                for (int i = 0; i < declared.size(); i++) {
+                    final StaticArgument arg = declared.get(i);
+                    if (arg.is(StaticArgumentTrait.SCALAR) && 
arg.getName().equals(name)) {
+                        return callContext.getArgumentValue(i, clazz);
+                    }
+                }
+                return Optional.empty();
+            }
+        };
+    }
+
+    /**
+     * Builds a {@link CallContext} from the given {@link RexCall} for this 
function. Wraps the call
+     * in an {@link OperatorBindingCallContext} so consumers (trait 
resolution, codegen, etc.) read
+     * scalar arguments through the same coercion path as validation.
+     */
+    public CallContext toCallContext(RexCall call) {
+        return toCallContext(call, null, null, null);
+    }
+
+    /**
+     * Variant of {@link #toCallContext(RexCall)} that additionally exposes 
the call's input time
+     * columns and changelog modes - needed by the streaming codegen path so 
PTFs can specialize
+     * themselves to the exact call.
+     */
+    public CallContext toCallContext(
+            RexCall call,
+            @Nullable List<Integer> inputTimeColumns,
+            @Nullable List<ChangelogMode> inputChangelogModes,
+            @Nullable ChangelogMode outputChangelogMode) {
+        return new OperatorBindingCallContext(
+                dataTypeFactory,
+                getDefinition(),
+                RexCallBinding.create(typeFactory, call, 
Collections.emptyList()),
+                call.getType(),
+                inputTimeColumns,
+                inputChangelogModes,
+                outputChangelogMode);
+    }
+
+    /**
+     * Returns a copy of this function whose {@link TypeInference} reports the 
given static
+     * arguments. The wrapped input/output strategies are reused unchanged - 
they ran at validation
+     * time and aren't invoked again afterwards.
+     */
+    private BridgingSqlFunction withStaticArguments(List<StaticArgument> 
staticArguments) {
+        final TypeInference rewritten =
+                TypeInference.newBuilder()
+                        .staticArguments(staticArguments)
+                        
.inputTypeStrategy(typeInference.getInputTypeStrategy())
+                        
.stateTypeStrategies(typeInference.getStateTypeStrategies())
+                        
.outputTypeStrategy(typeInference.getOutputTypeStrategy())
+                        
.disableSystemArguments(typeInference.disableSystemArguments())
+                        .build();
+        if (this instanceof WithTableFunction) {
+            return new WithTableFunction(
+                    dataTypeFactory,
+                    typeFactory,
+                    rexFactory,
+                    getKind(),
+                    resolvedFunction,
+                    rewritten);
+        }
+        return new BridgingSqlFunction(
+                dataTypeFactory, typeFactory, rexFactory, getKind(), 
resolvedFunction, rewritten);
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Table function extension
     // 
--------------------------------------------------------------------------------------------
@@ -273,10 +431,12 @@ public class BridgingSqlFunction extends SqlFunction {
             }
             final StaticArgument arg = args.get(ordinal);
             final TableCharacteristic.Semantics semantics;
-            if (arg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) {
-                semantics = TableCharacteristic.Semantics.ROW;
-            } else if (arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
+            // Report SET semantics if it may apply - which allows the use of 
PARTITION BY
+            if (arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)
+                    || 
arg.hasConditionalTrait(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
                 semantics = TableCharacteristic.Semantics.SET;
+            } else if (arg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) {
+                semantics = TableCharacteristic.Semantics.ROW;
             } else {
                 return null;
             }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
index 12a20833068..3973329af74 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.table.planner.codegen.HashCodeGenerator;
 import org.apache.flink.table.planner.codegen.ProcessTableRunnerGenerator;
 import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
@@ -157,7 +158,10 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
             @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode 
outputChangelogMode) {
         super(id, context, persistedConfig, inputProperties, outputType, 
description);
         this.uid = uid;
-        this.invocation = (RexCall) invocation;
+        // Mirror the FlinkLogicalTableFunctionScan converter for the 
compiled-plan restore path:
+        // bake StaticArgument#withConditionalTrait rules into the operator's 
static args so
+        // downstream code can use plain arg.is(SET_SEMANTIC_TABLE) checks.
+        this.invocation = BridgingSqlFunction.resolveCallTraits((RexCall) 
invocation);
         this.inputChangelogModes = inputChangelogModes;
         this.outputChangelogMode = outputChangelogMode;
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
index d478b3afcf0..17473ab19a7 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.functions.TemporalTableFunction;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 
@@ -115,7 +116,14 @@ public class FlinkLogicalTableFunctionScan extends 
TableFunctionScan implements
                     functionScan.getInputs().stream()
                             .map(input -> RelOptRule.convert(input, 
FlinkConventions.LOGICAL()))
                             .collect(Collectors.toList());
-            final RexCall rexCall = (RexCall) functionScan.getCall();
+
+            // Resolve any StaticArgument#withConditionalTrait rules on the 
operator against this
+            // call site (PARTITION BY, scalar literals). After this rewrite, 
downstream code sees a
+            // BridgingSqlFunction whose getStaticArguments() reports the 
effective signature, so
+            // simple staticArg.is(SET_SEMANTIC_TABLE) checks suffice.
+            final RexCall rexCall =
+                    BridgingSqlFunction.resolveCallTraits((RexCall) 
functionScan.getCall());
+
             return new FlinkLogicalTableFunctionScan(
                     functionScan.getCluster(),
                     traitSet,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
index 754fe1b3284..56a2bf1f6fc 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
@@ -22,13 +22,11 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.catalog.ContextResolvedFunction;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.functions.ProcessTableFunction;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.RexTableArgCall;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
-import 
org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction;
@@ -36,7 +34,6 @@ import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFuncti
 import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
-import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.StaticArgument;
 import org.apache.flink.table.types.inference.StaticArgumentTrait;
 import org.apache.flink.table.types.inference.SystemTypeInference;
@@ -56,7 +53,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexCallBinding;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
@@ -66,7 +62,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
@@ -512,22 +507,4 @@ public class StreamPhysicalProcessTableFunction extends 
AbstractRelNode
         }
         return ImmutableSet.copyOf(partitionColumnsPerArg);
     }
-
-    public static CallContext toCallContext(
-            RexCall udfCall,
-            List<Integer> inputTimeColumns,
-            List<ChangelogMode> inputChangelogModes,
-            @Nullable ChangelogMode outputChangelogMode) {
-        final BridgingSqlFunction function = 
ShortcutUtils.unwrapBridgingSqlFunction(udfCall);
-        assert function != null;
-        final FunctionDefinition definition = 
ShortcutUtils.unwrapFunctionDefinition(udfCall);
-        return new OperatorBindingCallContext(
-                function.getDataTypeFactory(),
-                definition,
-                RexCallBinding.create(function.getTypeFactory(), udfCall, 
Collections.emptyList()),
-                udfCall.getType(),
-                inputTimeColumns,
-                inputChangelogModes,
-                outputChangelogMode);
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
index 4f1d8d65d41..52df803d5c8 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
@@ -34,7 +34,6 @@ import 
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil
 import 
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil.{verifyFunctionAwareOutputType,
 DefaultExpressionEvaluatorFactory}
 import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalProcessTableFunction
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import org.apache.flink.table.runtime.dataview.DataViewUtils
 import org.apache.flink.table.runtime.dataview.StateListView.KeyedStateListView
@@ -77,11 +76,8 @@ object ProcessTableRunnerGenerator {
     // For specialized functions, this call context is able to provide the 
final changelog modes.
     // Thus, functions can reconfigure themselves for the exact use case.
     // Including updating their state layout.
-    val callContext = StreamPhysicalProcessTableFunction.toCallContext(
-      udfCall,
-      inputTimeColumns,
-      inputChangelogModes,
-      outputChangelogMode)
+    val callContext =
+      function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, 
outputChangelogMode)
 
     // Create the final UDF for runtime
     val udf = UserDefinedFunctionHelper.createSpecializedFunction(
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index b6767e68ac6..6839cc1c28c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.connector.ChangelogMode
 import org.apache.flink.table.functions.{BuiltInFunctionDefinition, 
ChangelogFunction}
 import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext
 import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, 
RexTableArgCall}
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.planner.plan.`trait`._
 import 
org.apache.flink.table.planner.plan.`trait`.DeleteKindTrait.{deleteOnKeyOrNone, 
fullDeleteOrNone, DELETE_BY_KEY}
 import 
org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone, 
onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER}
@@ -852,10 +853,10 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
                       !modifyKindSet.isInsertOnly && tableArg.is(
                         StaticArgumentTrait.SUPPORT_UPDATES)
                     ) {
-                      if (isPtfUpsert(tableArg, tableArgCall, child)) {
-                        UpdateKindTrait.ONLY_UPDATE_AFTER
-                      } else {
+                      if (ptfRequiresUpdateBefore(tableArg, tableArgCall, 
child)) {
                         UpdateKindTrait.BEFORE_AND_AFTER
+                      } else {
+                        UpdateKindTrait.ONLY_UPDATE_AFTER
                       }
                     } else {
                       UpdateKindTrait.NONE
@@ -1272,7 +1273,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
                     extractPtfTableArgComponents(process, child, inputArg)
                   if (
                     tableArg.is(StaticArgumentTrait.SUPPORT_UPDATES)
-                    && isPtfUpsert(tableArg, tableArgCall, child)
+                    && !ptfRequiresUpdateBefore(tableArg, tableArgCall, child)
                     && !tableArg.is(StaticArgumentTrait.REQUIRE_FULL_DELETE)
                   ) {
                     this
@@ -1640,21 +1641,20 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
     modeBuilder.build()
   }
 
-  private def isPtfUpsert(
+  /**
+   * Whether the PTF requires UPDATE_BEFORE from its input. Returns true 
unless partition keys cover
+   * the upsert keys (co-located) and the argument doesn't explicitly require 
UPDATE_BEFORE.
+   */
+  private def ptfRequiresUpdateBefore(
       tableArg: StaticArgument,
       tableArgCall: RexTableArgCall,
       input: StreamPhysicalRel): Boolean = {
     val partitionKeys = ImmutableBitSet.of(tableArgCall.getPartitionKeys: _*)
     val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(input.getCluster.getMetadataQuery)
     val upsertKeys = fmq.getUpsertKeys(input)
-    if (
-      upsertKeys == null || partitionKeys.isEmpty || 
!upsertKeys.contains(partitionKeys)
-      || tableArg.is(StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)
-    ) {
-      false
-    } else {
-      true
-    }
+    upsertKeys == null || partitionKeys.isEmpty ||
+    !upsertKeys.contains(partitionKeys) ||
+    tableArg.is(StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)
   }
 
   private def extractPtfTableArgComponents(
@@ -1674,11 +1674,9 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
       outputChangelogMode: ChangelogMode): ChangelogContext = {
     val udfCall = StreamPhysicalProcessTableFunction.toUdfCall(process.getCall)
     val inputTimeColumns = 
StreamPhysicalProcessTableFunction.toInputTimeColumns(process.getCall)
-    val callContext = StreamPhysicalProcessTableFunction.toCallContext(
-      udfCall,
-      inputTimeColumns,
-      inputChangelogModes,
-      outputChangelogMode)
+    val function = udfCall.getOperator.asInstanceOf[BridgingSqlFunction]
+    val callContext =
+      function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, 
outputChangelogMode)
 
     // Expose a simplified context to let users focus on important 
characteristics.
     // If necessary, we can expose the full CallContext in the future.
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
index ee9576b2e75..e98e28fc53a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
@@ -87,4 +87,21 @@ public class ToChangelogTest extends TableTestBase {
         util.verifyRelPlan(
                 "SELECT * FROM TO_CHANGELOG(input => TABLE 
insert_only_source)", CHANGELOG_MODE);
     }
+
+    @Test
+    void testSetSemanticsWithPartitionBy() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE retract_source ("
+                                + "  id INT,"
+                                + "  name STRING,"
+                                + "  PRIMARY KEY (id) NOT ENFORCED"
+                                + ") WITH ("
+                                + "  'connector' = 'values',"
+                                + "  'changelog-mode' = 'I,UB,UA,D'"
+                                + ")");
+        util.verifyRelPlan(
+                "SELECT * FROM TO_CHANGELOG(input => TABLE retract_source 
PARTITION BY id)",
+                CHANGELOG_MODE);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
index 91c73153bee..77133f4fe41 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
@@ -51,6 +51,26 @@ LogicalProject(op=[$0], id=[$1], name=[$2])
       <![CDATA[
 ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
 +- TableSourceScan(table=[[default_catalog, default_database, 
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSetSemanticsWithPartitionBy">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source 
PARTITION BY id)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], op=[$1], id0=[$2], name=[$3])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION 
BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, INTEGER id0, 
VARCHAR(2147483647) name)])
+   +- LogicalProject(id=[$0], name=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
retract_source]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], 
select=[id,op,id0,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) 
op, INTEGER id0, VARCHAR(2147483647) name)], changelogMode=[I])
++- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+   +- TableSourceScan(table=[[default_catalog, default_database, 
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
 ]]>
     </Resource>
   </TestCase>

Reply via email to