gustavodemorais commented on code in PR #27886:
URL: https://github.com/apache/flink/pull/27886#discussion_r3118023779


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java:
##########
@@ -379,6 +383,118 @@ public static List<Ord<StaticArgument>> 
getProvidedInputArgs(RexCall call) {
                 .collect(Collectors.toList());
     }
 
+    /**
+     * Builds a {@link TraitContext} for resolving conditional traits on a 
table argument at
+     * planning time.
+     */
+    public static TraitContext buildTraitContext(
+            final RexCall call, final RexTableArgCall tableArgCall) {
+        final List<StaticArgument> declaredArgs = getStaticArguments(call);
+        final List<RexNode> operands = call.getOperands();
+
+        return new TraitContext() {
+            @Override
+            public boolean hasPartitionBy() {
+                return tableArgCall.getPartitionKeys().length > 0;
+            }
+
+            @Override
+            public <T> Optional<T> getScalarArgument(final String name, final 
Class<T> clazz) {
+                return findScalarLiteral(declaredArgs, operands, name, clazz);
+            }
+        };
+    }
+
+    /** Checks if any table argument resolves to SET_SEMANTIC_TABLE after 
applying conditions. */
+    private static boolean hasResolvedSetSemantics(
+            final List<StaticArgument> staticArgs,
+            final List<RexNode> operands,
+            final RexCall rexCall) {
+        for (int i = 0; i < staticArgs.size(); i++) {
+            final StaticArgument arg = staticArgs.get(i);
+            if (!arg.is(StaticArgumentTrait.TABLE)) {
+                continue;
+            }
+
+            final RexTableArgCall tableArgCall = (RexTableArgCall) 
operands.get(i);
+
+            final StaticArgument resolvedArg =
+                    arg.applyConditionalTraits(buildTraitContext(rexCall, 
tableArgCall));
+            if (resolvedArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static List<StaticArgument> getStaticArguments(final RexCall call) {
+        final BridgingSqlFunction.WithTableFunction function =
+                (BridgingSqlFunction.WithTableFunction) call.getOperator();
+        return function.getTypeInference()
+                .getStaticArguments()
+                .orElseThrow(IllegalStateException::new);
+    }
+
+    /**
+     * Extracts a scalar argument value by name. Handles NULL, DEFAULT, 
DESCRIPTOR, MAP, and
+     * standard literal values (Boolean, String, Integer, etc.) following the 
same rules as {@link
+     * CallContext#getArgumentValue}. Does not support time types (Instant, 
Duration, LocalDate)
+     * which require the full CallContext bridge.
+     */
+    @SuppressWarnings("unchecked")
+    private static <T> Optional<T> findScalarLiteral(

Review Comment:
   Thanks for the pointer. I've created an overload that supports feting the 
scalar literals but doesn't require the additional changelog information which 
we don't use and do not require. Using it now
   
   ```
     public static CallContext toCallContext(RexCall udfCall) {                 
                                                                                
                                                                                
       
         return toCallContext(udfCall, null, null, null);                  
     }  
    ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to