twalthr commented on code in PR #27886:
URL: https://github.com/apache/flink/pull/27886#discussion_r3057248729


##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -54,7 +54,7 @@ SELECT * FROM TO_CHANGELOG(
 
 | Parameter    | Required | Description |
 |:-------------|:---------|:------------|
-| `input`      | Yes      | The input table. Must include `PARTITION BY` for 
parallel execution. Accepts insert-only, retract, and upsert tables. |
+| `input`      | Yes      | The input table. With `PARTITION BY`, rows with 
the same key are co-located for parallel execution. Without `PARTITION BY`, 
each row is processed independently. Accepts insert-only, retract, and upsert 
tables. For upsert tables, providing `PARTITION BY` is recommended for better 
performance. |

Review Comment:
   ```suggestion
   | `input`      | Yes      | The input table. With `PARTITION BY`, rows with 
the same key are co-located for parallel execution (set semantics). Without 
`PARTITION BY`, each row is processed independently (row semantics). Accepts 
insert-only, retract, and upsert tables. |
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -57,18 +61,58 @@ public class StaticArgument {
     private final @Nullable Class<?> conversionClass;
     private final boolean isOptional;
     private final EnumSet<StaticArgumentTrait> traits;
+    private final List<ConditionalTrait> conditionalTraits;
+
+    /** A trait that is conditionally added based on a {@link TraitCondition}. 
*/
+    private static final class ConditionalTrait implements Serializable {

Review Comment:
   nit: classes to the bottom of the file
   ```suggestion
       private static final class ConditionalTrait {
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -99,6 +99,15 @@ SELECT * FROM TO_CHANGELOG(
 -- +I[id:2, op:'DELETE',       name:'Bob',   cnt:1]
 ```
 
+#### Without PARTITION BY
+
+```sql

Review Comment:
   Let's remove all PARTITION BY examples for now. A default TO_CHANGELOG 
example should always be without PARTITION BY. They do not provide any benefit 
but rather add exchange overhead.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##########
@@ -783,14 +784,16 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
                     .kind(PROCESS_TABLE)
                     .staticArguments(
                             StaticArgument.table(
-                                    "input",
-                                    Row.class,
-                                    false,
-                                    EnumSet.of(
-                                            StaticArgumentTrait.TABLE,
-                                            
StaticArgumentTrait.SET_SEMANTIC_TABLE,
-                                            
StaticArgumentTrait.SUPPORT_UPDATES,
-                                            
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)),
+                                            "input",
+                                            Row.class,
+                                            false,
+                                            EnumSet.of(
+                                                    StaticArgumentTrait.TABLE,

Review Comment:
   Just to make things explicit, I would add 
`StateicArgumentTrait.ROW_SEMANTIC` here as well.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
         return traits.contains(trait);
     }
 
+    /**
+     * Context-aware trait check. Evaluates conditional trait rules against 
the given context to
+     * determine the effective traits.
+     */
+    public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+        return resolveTraits(ctx).contains(trait);
+    }
+
+    /**
+     * Returns a new {@link StaticArgument} with an additional conditional 
trait rule. The trait is
+     * added to the effective trait set when the condition evaluates to {@code 
true} at planning
+     * time.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, 
SUPPORT_UPDATES))
+     *         .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+     *         .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
+     * }</pre>
+     */
+    public StaticArgument addTraitWhen(

Review Comment:
   ```suggestion
       public StaticArgument withConditionalTrait(
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
         return traits.contains(trait);
     }
 
+    /**
+     * Context-aware trait check. Evaluates conditional trait rules against 
the given context to
+     * determine the effective traits.
+     */
+    public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+        return resolveTraits(ctx).contains(trait);
+    }
+
+    /**
+     * Returns a new {@link StaticArgument} with an additional conditional 
trait rule. The trait is
+     * added to the effective trait set when the condition evaluates to {@code 
true} at planning
+     * time.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, 
SUPPORT_UPDATES))
+     *         .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+     *         .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
+     * }</pre>
+     */
+    public StaticArgument addTraitWhen(
+            final TraitCondition condition, final StaticArgumentTrait trait) {
+        final List<ConditionalTrait> newList = new 
ArrayList<>(this.conditionalTraits);

Review Comment:
   we should limit the conditional traits. e.g. a scalar should not become a 
table out of a sudden. root changes are not allowed. this should limit the 
logic to tables currently, because it is the only trait with subtraits.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
         return traits.contains(trait);
     }
 
+    /**
+     * Context-aware trait check. Evaluates conditional trait rules against 
the given context to
+     * determine the effective traits.
+     */
+    public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+        return resolveTraits(ctx).contains(trait);
+    }
+
+    /**
+     * Returns a new {@link StaticArgument} with an additional conditional 
trait rule. The trait is
+     * added to the effective trait set when the condition evaluates to {@code 
true} at planning
+     * time.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, 
SUPPORT_UPDATES))
+     *         .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+     *         .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
+     * }</pre>
+     */
+    public StaticArgument addTraitWhen(
+            final TraitCondition condition, final StaticArgumentTrait trait) {
+        final List<ConditionalTrait> newList = new 
ArrayList<>(this.conditionalTraits);
+        newList.add(new ConditionalTrait(condition, trait));
+        return new StaticArgument(name, dataType, conversionClass, isOptional, 
traits, newList);
+    }
+
+    /** Whether this argument has conditional trait rules. */
+    public boolean hasConditionalTraits() {
+        return !conditionalTraits.isEmpty();
+    }
+
+    /** Whether any conditional trait rule may add the given trait. */
+    public boolean hasConditionalTrait(final StaticArgumentTrait trait) {
+        return conditionalTraits.stream().anyMatch(ct -> ct.trait == trait);
+    }
+
+    /**
+     * Resolves effective traits by evaluating conditional rules against the 
context. Returns the
+     * base traits combined with any conditional traits whose conditions are 
met.
+     */
+    public EnumSet<StaticArgumentTrait> resolveTraits(final TraitContext ctx) {
+        if (conditionalTraits.isEmpty()) {
+            return traits;
+        }
+        final EnumSet<StaticArgumentTrait> resolved = EnumSet.copyOf(traits);
+        for (final ConditionalTrait ct : conditionalTraits) {
+            if (ct.condition.test(ctx)) {
+                // ROW_SEMANTIC_TABLE and SET_SEMANTIC_TABLE are mutually 
exclusive.
+                // Adding one removes the other.
+                if (ct.trait == StaticArgumentTrait.SET_SEMANTIC_TABLE) {
+                    resolved.remove(StaticArgumentTrait.ROW_SEMANTIC_TABLE);
+                } else if (ct.trait == StaticArgumentTrait.ROW_SEMANTIC_TABLE) 
{
+                    resolved.remove(StaticArgumentTrait.SET_SEMANTIC_TABLE);
+                }
+                resolved.add(ct.trait);
+            }
+        }
+        return resolved;
+    }
+
     @Override
     public String toString() {

Review Comment:
   update toString as well. syntax `myUntypedTable {TABLE BY ROW, TABLE BY 
SET}` we add all potential conditional traits to the list but without any 
special syntax, the toString format is complex enough.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -656,12 +684,13 @@ private static void checkRowSemantics(StaticArgument 
staticArg, TableSemantics s
             }
         }
 
-        private static void checkSetSemantics(StaticArgument staticArg, 
TableSemantics semantics) {
-            if (!staticArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
+        private static void checkSetSemantics(
+                StaticArgument staticArg, TableSemantics semantics, 
TraitContext traitCtx) {

Review Comment:
   How about we copy `StaticArgument` with the effective traits. this would 
simplify this tool and you don't have to pass TraitContext everywhere. we can 
provide `StaticArgument.applyConditionalTraits(ctx): StaticArgument`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -181,6 +181,28 @@ private static void checkReservedArgs(List<StaticArgument> 
staticArgs) {
         }
     }
 
+    static TraitContext buildTraitContext(
+            final TableSemantics semantics,
+            final CallContext callContext,
+            final List<StaticArgument> staticArgs) {
+        return new TraitContext() {
+            @Override
+            public boolean hasPartitionBy() {
+                return semantics.partitionByColumns().length > 0;
+            }
+
+            @Override
+            public <T> Optional<T> getScalarArgument(final String name, final 
Class<T> clazz) {
+                for (int i = 0; i < staticArgs.size(); i++) {
+                    if (staticArgs.get(i).getName().equals(name)) {

Review Comment:
   check for is scalar and then isLiteral.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -181,6 +181,28 @@ private static void checkReservedArgs(List<StaticArgument> 
staticArgs) {
         }
     }
 
+    static TraitContext buildTraitContext(
+            final TableSemantics semantics,
+            final CallContext callContext,
+            final List<StaticArgument> staticArgs) {
+        return new TraitContext() {
+            @Override
+            public boolean hasPartitionBy() {
+                return semantics.partitionByColumns().length > 0;

Review Comment:
   The trait context should be available to all args. So `TableSemantics` 
should be nullable.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -314,13 +336,16 @@ private List<Field> derivePassThroughFields(CallContext 
callContext) {
                                 if 
(arg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) {
                                     return 
DataType.getFields(argDataTypes.get(pos)).stream();
                                 }
-                                if 
(!arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
+                                final TableSemantics semantics =
+                                        
callContext.getTableSemantics(pos).orElse(null);
+                                if (semantics == null) {
+                                    return Stream.<Field>empty();
+                                }
+                                final TraitContext traitCtx =

Review Comment:
   Build the traits as the very first, before 
`arg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * A condition that determines whether a conditional trait on a {@link 
StaticArgument} should be
+ * active for a given call.
+ *
+ * <p>Conditions are evaluated at planning time using the {@link TraitContext} 
which provides access
+ * to the SQL call's properties (PARTITION BY presence, scalar argument 
values, etc.).

Review Comment:
   ```suggestion
    * to the SQL call's properties (PARTITION BY presence, scalar literal 
values, etc.).
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * A condition that determines whether a conditional trait on a {@link 
StaticArgument} should be
+ * active for a given call.
+ *
+ * <p>Conditions are evaluated at planning time using the {@link TraitContext} 
which provides access
+ * to the SQL call's properties (PARTITION BY presence, scalar argument 
values, etc.).
+ *
+ * <p>Use the static factory methods for common conditions:
+ *
+ * <pre>{@code
+ * import static org.apache.flink.table.types.inference.TraitCondition.*;
+ *
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, 
SUPPORT_UPDATES))
+ *         .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+ *         .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE)
+ *         .addTraitWhen(argIsTrue("produces_full_deletes"), 
REQUIRE_UPDATE_BEFORE);
+ * }</pre>
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface TraitCondition extends Serializable {
+
+    /** Evaluates this condition against the given context. */
+    boolean test(TraitContext ctx);
+
+    /** True when PARTITION BY is provided on the table argument. */
+    static TraitCondition hasPartitionBy() {
+        return TraitContext::hasPartitionBy;
+    }
+
+    /** True when the named boolean argument is provided and its value is 
{@code true}. */
+    static TraitCondition argIsTrue(final String name) {

Review Comment:
   generialize the is true and is false to:
   ```
   static <T> TraitCondition argIsEqualTo(T obj) {ctx.getScalarArgument(name, 
obj.getClass) == obj}
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java:
##########
@@ -276,15 +279,17 @@ private RuntimeTableSemantics createRuntimeTableSemantics(
         }
 
         final int timeColumn = 
inputTimeColumns.get(tableArgCall.getInputIndex());
+        final org.apache.flink.table.types.inference.TraitContext traitCtx =

Review Comment:
   pay attention to full imports, seems Claude loves to do this
   ```suggestion
           final TraitContext traitCtx =
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java:
##########
@@ -221,7 +223,11 @@ protected RelDataType deriveRowType() {
         if (uidRexNode.getKind() == SqlKind.DEFAULT) {
             // Optional for constant or row semantics functions
             if (staticArgs.stream()
-                    .noneMatch(arg -> 
arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE))) {
+                    .noneMatch(
+                            arg ->
+                                    
arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)
+                                            || arg.hasConditionalTrait(

Review Comment:
   we should do this on the actual resulting trait



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java:
##########
@@ -348,6 +354,54 @@ public static List<Ord<StaticArgument>> 
getProvidedInputArgs(RexCall call) {
                 .collect(Collectors.toList());
     }
 
+    /**
+     * Builds a {@link TraitContext} for resolving conditional traits on a 
table argument at
+     * planning time.
+     */
+    public static TraitContext buildTraitContext(
+            final RexCall call, final RexTableArgCall tableArgCall) {
+        final List<StaticArgument> declaredArgs = getStaticArguments(call);
+        final List<RexNode> operands = call.getOperands();
+
+        return new TraitContext() {
+            @Override
+            public boolean hasPartitionBy() {
+                return tableArgCall.getPartitionKeys().length > 0;
+            }
+
+            @Override
+            public <T> Optional<T> getScalarArgument(final String name, final 
Class<T> clazz) {
+                return findScalarLiteral(declaredArgs, operands, name, clazz);
+            }
+        };
+    }
+
+    private static List<StaticArgument> getStaticArguments(final RexCall call) 
{
+        final BridgingSqlFunction.WithTableFunction function =
+                (BridgingSqlFunction.WithTableFunction) call.getOperator();
+        return function.getTypeInference()
+                .getStaticArguments()
+                .orElseThrow(IllegalStateException::new);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> Optional<T> findScalarLiteral(
+            final List<StaticArgument> declaredArgs,
+            final List<RexNode> operands,
+            final String name,
+            final Class<T> clazz) {
+        for (int i = 0; i < declaredArgs.size(); i++) {
+            if (declaredArgs.get(i).getName().equals(name)) {
+                final RexNode operand = operands.get(i);
+                if (operand.getKind() == SqlKind.DEFAULT || !(operand 
instanceof RexLiteral)) {
+                    return Optional.empty();
+                }
+                return Optional.ofNullable(((RexLiteral) 
operand).getValueAs(clazz));

Review Comment:
   this is too simple, it should follow the same rules as CallContext does. 
Otherwise it won't be possible e.g. to get `Instant.class` or other literals.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java:
##########
@@ -276,15 +279,17 @@ private RuntimeTableSemantics createRuntimeTableSemantics(
         }
 
         final int timeColumn = 
inputTimeColumns.get(tableArgCall.getInputIndex());
+        final org.apache.flink.table.types.inference.TraitContext traitCtx =

Review Comment:
   same comment as above. resolve the static arg as early as possible to not 
reconstruct TraitContext multiple times



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalProcessTableFunctionRule.java:
##########
@@ -127,9 +138,17 @@ private static RelNode applyDistributionOnInput(
     }
 
     private static FlinkRelDistribution deriveDistribution(
-            RexTableArgCall tableOperand, TableCharacteristic 
tableCharacteristic) {
-        if (tableCharacteristic.semantics == Semantics.SET) {
-            final int[] partitionKeys = tableOperand.getPartitionKeys();
+            RexTableArgCall tableOperand,
+            TableCharacteristic tableCharacteristic,
+            StaticArgument staticArg) {
+        final int[] partitionKeys = tableOperand.getPartitionKeys();
+        final boolean hasPartitionBy = partitionKeys.length > 0;
+        final boolean reportedAsSet = tableCharacteristic.semantics == 
Semantics.SET;
+        final boolean setIsConditional =
+                
staticArg.hasConditionalTrait(StaticArgumentTrait.SET_SEMANTIC_TABLE);

Review Comment:
   too fragile. determine the effective StaticArgument first and then execute 
this logic.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
         return traits.contains(trait);
     }
 
+    /**
+     * Context-aware trait check. Evaluates conditional trait rules against 
the given context to
+     * determine the effective traits.
+     */
+    public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+        return resolveTraits(ctx).contains(trait);
+    }
+
+    /**
+     * Returns a new {@link StaticArgument} with an additional conditional 
trait rule. The trait is
+     * added to the effective trait set when the condition evaluates to {@code 
true} at planning
+     * time.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, 
SUPPORT_UPDATES))
+     *         .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+     *         .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
+     * }</pre>
+     */
+    public StaticArgument addTraitWhen(

Review Comment:
   I would swap the parameter order then: `withConditionalTrait(trait, 
condition)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to