twalthr commented on code in PR #27886:
URL: https://github.com/apache/flink/pull/27886#discussion_r3117358110


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -182,6 +182,32 @@ private static void checkReservedArgs(List<StaticArgument> 
staticArgs) {
         }
     }
 
+    static TraitContext buildTraitContext(
+            @Nullable final TableSemantics semantics,
+            final CallContext callContext,
+            final List<StaticArgument> staticArgs) {
+        return new TraitContext() {
+            @Override
+            public boolean hasPartitionBy() {
+                return semantics != null && 
semantics.partitionByColumns().length > 0;
+            }
+
+            @Override
+            public <T> Optional<T> getScalarArgument(final String name, final 
Class<T> clazz) {
+                for (int i = 0; i < staticArgs.size(); i++) {
+                    final StaticArgument arg = staticArgs.get(i);
+                    if (arg.is(StaticArgumentTrait.SCALAR) && 
arg.getName().equals(name)) {
+                        if (!callContext.isArgumentLiteral(i)) {

Review Comment:
   just do double check: do we also need a null check here via 
`callContext.isNullLiteral`? Or is that covered by getArgumentValue?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -354,4 +458,32 @@ private void checkModelNotOptional() {
             throw new ValidationException("Model arguments must not be 
optional.");
         }
     }
+
+    /** A trait that is conditionally added based on a {@link TraitCondition}. 
*/
+    private static final class ConditionalTrait implements Serializable {

Review Comment:
   ```suggestion
       private static final class ConditionalTrait {
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -154,18 +154,18 @@ This is useful when you need to materialize changelog 
events into a downstream s
 
 ```sql
 SELECT * FROM TO_CHANGELOG(
-  input => TABLE source_table,
+  input => TABLE source_table [PARTITION BY key_col],
   [op => DESCRIPTOR(op_column_name),]
   [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
 )
 ```
 
 ### Parameters
 
-| Parameter    | Required | Description |
-|:-------------|:---------|:------------|
-| `input`      | Yes      | The input table. Accepts insert-only, retract, and 
upsert tables. |
-| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. |
+| Parameter    | Required | Description                                        
                                                                                
                                                                                
                                                                                
                                                      |
+|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `input`      | Yes      | The input table. With `PARTITION BY`, rows with 
the same key are co-located and run in the same operator instance. Without 
`PARTITION BY`, each row is processed independently. Accepts insert-only, 
retract, and upsert tables. For upsert tables, providing `PARTITION BY` is 
recommended for better performance.                                      |

Review Comment:
   ```suggestion
   | `input`      | Yes      | The input table. With `PARTITION BY`, rows with 
the same key are co-located and run in the same operator instance. Without 
`PARTITION BY`, each row is processed independently. Accepts insert-only, 
retract, and upsert tables. For upsert tables, a provided `PARTITION BY` must 
match the upsert key of the subquery.                                      |
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -311,17 +337,21 @@ private List<Field> derivePassThroughFields(CallContext 
callContext) {
             return IntStream.range(0, staticArgs.size())
                     .mapToObj(
                             pos -> {
-                                final StaticArgument arg = staticArgs.get(pos);
-                                if 
(arg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) {
+                                final TableSemantics semantics =
+                                        
callContext.getTableSemantics(pos).orElse(null);
+                                final TraitContext traitCtx =
+                                        buildTraitContext(semantics, 
callContext, staticArgs);
+                                final StaticArgument resolvedArg =
+                                        
staticArgs.get(pos).applyConditionalTraits(traitCtx);
+                                if 
(resolvedArg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) {
                                     return 
DataType.getFields(argDataTypes.get(pos)).stream();
                                 }
-                                if 
(!arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
+                                if (semantics == null) {
+                                    return Stream.<Field>empty();
+                                }

Review Comment:
   nit:
   ```suggestion
                                   
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java:
##########
@@ -273,10 +273,13 @@ public SqlReturnTypeInference getRowTypeInference() {
             }
             final StaticArgument arg = args.get(ordinal);
             final TableCharacteristic.Semantics semantics;
-            if (arg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) {
-                semantics = TableCharacteristic.Semantics.ROW;
-            } else if (arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
+            // Report SET if it may apply - which allows the use of Partition 
BY

Review Comment:
   ```suggestion
               // Report SET semantics if it may apply - which allows the use 
of PARTITION BY
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * A condition that determines whether a conditional trait on a {@link 
StaticArgument} should be
+ * active for a given call.
+ *
+ * <p>Conditions are evaluated at planning time using the {@link TraitContext} 
which provides access
+ * to the SQL call's properties (PARTITION BY presence, scalar literal values, 
etc.).
+ *
+ * <p>Use the static factory methods for common conditions:
+ *
+ * <pre>{@code
+ * import static org.apache.flink.table.types.inference.TraitCondition.*;
+ *
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, 
SUPPORT_UPDATES))
+ *         .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy());
+ * }</pre>
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface TraitCondition extends Serializable {

Review Comment:
   ```suggestion
   public interface TraitCondition {
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java:
##########
@@ -379,6 +383,118 @@ public static List<Ord<StaticArgument>> 
getProvidedInputArgs(RexCall call) {
                 .collect(Collectors.toList());
     }
 
+    /**
+     * Builds a {@link TraitContext} for resolving conditional traits on a 
table argument at
+     * planning time.
+     */
+    public static TraitContext buildTraitContext(
+            final RexCall call, final RexTableArgCall tableArgCall) {
+        final List<StaticArgument> declaredArgs = getStaticArguments(call);
+        final List<RexNode> operands = call.getOperands();
+
+        return new TraitContext() {
+            @Override
+            public boolean hasPartitionBy() {
+                return tableArgCall.getPartitionKeys().length > 0;
+            }
+
+            @Override
+            public <T> Optional<T> getScalarArgument(final String name, final 
Class<T> clazz) {
+                return findScalarLiteral(declaredArgs, operands, name, clazz);
+            }
+        };
+    }
+
+    /** Checks if any table argument resolves to SET_SEMANTIC_TABLE after 
applying conditions. */
+    private static boolean hasResolvedSetSemantics(
+            final List<StaticArgument> staticArgs,
+            final List<RexNode> operands,
+            final RexCall rexCall) {
+        for (int i = 0; i < staticArgs.size(); i++) {
+            final StaticArgument arg = staticArgs.get(i);
+            if (!arg.is(StaticArgumentTrait.TABLE)) {
+                continue;
+            }
+
+            final RexTableArgCall tableArgCall = (RexTableArgCall) 
operands.get(i);
+
+            final StaticArgument resolvedArg =
+                    arg.applyConditionalTraits(buildTraitContext(rexCall, 
tableArgCall));
+            if (resolvedArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static List<StaticArgument> getStaticArguments(final RexCall call) {
+        final BridgingSqlFunction.WithTableFunction function =
+                (BridgingSqlFunction.WithTableFunction) call.getOperator();
+        return function.getTypeInference()
+                .getStaticArguments()
+                .orElseThrow(IllegalStateException::new);
+    }
+
+    /**
+     * Extracts a scalar argument value by name. Handles NULL, DEFAULT, 
DESCRIPTOR, MAP, and
+     * standard literal values (Boolean, String, Integer, etc.) following the 
same rules as {@link
+     * CallContext#getArgumentValue}. Does not support time types (Instant, 
Duration, LocalDate)
+     * which require the full CallContext bridge.
+     */
+    @SuppressWarnings("unchecked")
+    private static <T> Optional<T> findScalarLiteral(

Review Comment:
   Reuse `StreamPhysicalProcessTableFunction.toCallContext`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java:
##########
@@ -305,17 +308,20 @@ private RuntimeTableSemantics createRuntimeTableSemantics(
         }
 
         final int timeColumn = 
inputTimeColumns.get(tableArgCall.getInputIndex());
+        final StaticArgument resolvedArg =
+                tableArg.applyConditionalTraits(

Review Comment:
   Can we apply them earlier? Ideally, this topic is done as early as possible. 
`org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalProcessTableFunctionRule#convert`
 could be a good location.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * A condition that determines whether a conditional trait on a {@link 
StaticArgument} should be
+ * active for a given call.
+ *
+ * <p>Conditions are evaluated at planning time using the {@link TraitContext} 
which provides access
+ * to the SQL call's properties (PARTITION BY presence, scalar literal values, 
etc.).
+ *
+ * <p>Use the static factory methods for common conditions:
+ *
+ * <pre>{@code
+ * import static org.apache.flink.table.types.inference.TraitCondition.*;
+ *
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, 
SUPPORT_UPDATES))
+ *         .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy());
+ * }</pre>

Review Comment:
   Mention that hashCode/Equals need to be implemented otherwise 
StaticArgument.equals/hashCode won't work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to