gustavodemorais commented on code in PR #27886:
URL: https://github.com/apache/flink/pull/27886#discussion_r3116889691


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java:
##########
@@ -348,6 +354,54 @@ public static List<Ord<StaticArgument>> 
getProvidedInputArgs(RexCall call) {
                 .collect(Collectors.toList());
     }
 
+    /**
+     * Builds a {@link TraitContext} for resolving conditional traits on a 
table argument at
+     * planning time.
+     */
+    public static TraitContext buildTraitContext(
+            final RexCall call, final RexTableArgCall tableArgCall) {
+        final List<StaticArgument> declaredArgs = getStaticArguments(call);
+        final List<RexNode> operands = call.getOperands();
+
+        return new TraitContext() {
+            @Override
+            public boolean hasPartitionBy() {
+                return tableArgCall.getPartitionKeys().length > 0;
+            }
+
+            @Override
+            public <T> Optional<T> getScalarArgument(final String name, final 
Class<T> clazz) {
+                return findScalarLiteral(declaredArgs, operands, name, clazz);
+            }
+        };
+    }
+
+    private static List<StaticArgument> getStaticArguments(final RexCall call) 
{
+        final BridgingSqlFunction.WithTableFunction function =
+                (BridgingSqlFunction.WithTableFunction) call.getOperator();
+        return function.getTypeInference()
+                .getStaticArguments()
+                .orElseThrow(IllegalStateException::new);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> Optional<T> findScalarLiteral(
+            final List<StaticArgument> declaredArgs,
+            final List<RexNode> operands,
+            final String name,
+            final Class<T> clazz) {
+        for (int i = 0; i < declaredArgs.size(); i++) {
+            if (declaredArgs.get(i).getName().equals(name)) {
+                final RexNode operand = operands.get(i);
+                if (operand.getKind() == SqlKind.DEFAULT || !(operand 
instanceof RexLiteral)) {
+                    return Optional.empty();
+                }
+                return Optional.ofNullable(((RexLiteral) 
operand).getValueAs(clazz));

Review Comment:
   That's right, I've extended it to support  NULL, DEFAULT, DESCRIPTOR, MAP 
and literals using a similar logic as we have in CallContext.
   
   We could theoretically create an OperatorBindingCallContext here to avoid 
code duplication and delegate but it's heavy to have a whole 
OperatorBindingCallContext only for this and creates a tight coupling between 
StreamPhysicalProcessTableFunction 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to