gustavodemorais commented on code in PR #27886:
URL: https://github.com/apache/flink/pull/27886#discussion_r3130932989
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java:
##########
@@ -216,6 +224,118 @@ public boolean isDeterministic() {
return resolvedFunction.getDefinition().isDeterministic();
}
+ //
--------------------------------------------------------------------------------------------
+ // Conditional trait resolution
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Rewrites {@code call} so that the operator's {@link StaticArgument}s
have any conditional
+ * traits (see {@link StaticArgument#withConditionalTrait}) applied
against the call site
+ * (PARTITION BY, scalar literals). Downstream consumers can then treat
the operator's static
+ * arguments as the effective signature and use plain {@code
arg.is(SET_SEMANTIC_TABLE)} checks.
+ *
+ * <p>Called from the two places where a planner-level {@link RexCall} for
a PTF is first built
+ * for downstream consumption: {@code FlinkLogicalTableFunctionScan}
converter (fresh planning)
+ * and {@code StreamExecProcessTableFunction.@JsonCreator} (compiled-plan
restore). A no-op for
+ * non-PTF calls and for PTFs that declare no conditional traits.
+ */
+ public static RexCall resolveCallTraits(RexCall call) {
+ if (!(call.getOperator() instanceof BridgingSqlFunction)) {
+ return call;
+ }
+ final BridgingSqlFunction function = (BridgingSqlFunction)
call.getOperator();
+ final List<StaticArgument> declared =
+ function.typeInference.getStaticArguments().orElse(null);
+ if (declared == null ||
declared.stream().noneMatch(StaticArgument::hasConditionalTraits)) {
+ return call;
+ }
+ final List<RexNode> operands = call.getOperands();
+ final List<StaticArgument> resolved =
+ IntStream.range(0, declared.size())
+ .mapToObj(i -> resolveArg(declared.get(i), declared,
operands, i))
+ .collect(Collectors.toList());
+ if (resolved.equals(declared)) {
+ return call;
+ }
+ final BridgingSqlFunction rewritten =
function.withStaticArguments(resolved);
+ // Use a fresh RexBuilder from the function's own type factory so this
can run from a
+ // Jackson @JsonCreator that has no planner context.
+ return (RexCall)
+ new RexBuilder(function.typeFactory).makeCall(call.getType(),
rewritten, operands);
+ }
+
+ private static StaticArgument resolveArg(
+ StaticArgument declaredArg,
+ List<StaticArgument> declared,
+ List<RexNode> operands,
+ int index) {
+ // We only resolve conditional traits for the Table Argument with
conditional traits
+ if (!declaredArg.hasConditionalTraits()
+ || !(operands.get(index) instanceof RexTableArgCall)) {
+ return declaredArg;
+ }
+ return declaredArg.applyConditionalTraits(
+ buildTraitContext((RexTableArgCall) operands.get(index),
declared, operands));
+ }
+
+ /**
+ * Planner-side adapter to {@link TraitContext}. Sourced from a {@link
RexCall} (PARTITION BY
+ * via {@link RexTableArgCall}, scalar literals via the operand list)
instead of a {@link
+ * org.apache.flink.table.types.inference.CallContext}, since the planner
doesn't carry one. The
+ * validation-time equivalent is {@link TraitContext#of}.
+ */
+ private static TraitContext buildTraitContext(
Review Comment:
My thinking was that having the code in the BridgingSqlFunction importing
thing from the physical layer felt like a bad idea. We would be calling in the
early logical layer some code that already in the physical layer and we might
create a weird dependency.
But it's a valid point that we should have try to have the logic only once
and reuse. I've moved "toCallContext" to the BridgingSqlFunction.java and we're
reusing it in StreamPhysicalProcessTableFunction.Java. I think that makes sense
and it's also not a lot of code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]