twalthr commented on code in PR #27886: URL: https://github.com/apache/flink/pull/27886#discussion_r3130183663
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.List; +import java.util.Objects; +import java.util.function.Predicate; + +/** + * A condition that determines whether a conditional trait on a {@link StaticArgument} should be + * active for a given call. + * + * <p>Conditions are evaluated at planning time using the {@link TraitContext} which provides access + * to the SQL call's properties (PARTITION BY presence, scalar literal values, etc.). + * + * <p>Implementations must implement {@code hashCode} and {@code equals} for {@link + * StaticArgument#equals}/{@link StaticArgument#hashCode} to work correctly. The built-in factories + * below return value-comparable instances; user-supplied lambdas do not - prefer the factories. + * + * <pre>{@code + * import static org.apache.flink.table.types.inference.TraitCondition.*; + * + * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, SUPPORT_UPDATES)) + * .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy()); + * }</pre> + */ +@PublicEvolving +@FunctionalInterface +public interface TraitCondition { + + /** Evaluates this condition against the given context. */ + boolean test(TraitContext ctx); + + /** True when PARTITION BY is provided on the table argument. */ + static TraitCondition hasPartitionBy() { + return new BuiltInCondition( + BuiltInCondition.Kind.HAS_PARTITION_BY, List.of(), TraitContext::hasPartitionBy); + } + + /** True when the named scalar argument equals the expected value. */ + @SuppressWarnings("unchecked") + static <T> TraitCondition argIsEqualTo(final String name, final T expected) { + final Class<T> clazz = (Class<T>) expected.getClass(); + return new BuiltInCondition( + BuiltInCondition.Kind.ARG_IS_EQUAL_TO, + List.of(name, expected), + ctx -> ctx.getScalarArgument(name, clazz).map(expected::equals).orElse(false)); + } + + /** Negates the given condition. */ + static TraitCondition not(final TraitCondition condition) { + return new BuiltInCondition( + BuiltInCondition.Kind.NOT, List.of(condition), ctx -> !condition.test(ctx)); + } + + /** + * Internal value-comparable wrapper used by all built-in factories. Equality is keyed by {@code + * kind + args}; the {@code impl} predicate is reused but never compared, so two conditions + * built from the same factory inputs are equal. + */ + final class BuiltInCondition implements TraitCondition { Review Comment: ```suggestion private final class BuiltInCondition implements TraitCondition { ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java: ########## @@ -216,6 +224,118 @@ public boolean isDeterministic() { return resolvedFunction.getDefinition().isDeterministic(); } + // -------------------------------------------------------------------------------------------- + // Conditional trait resolution + // -------------------------------------------------------------------------------------------- + + /** + * Rewrites {@code call} so that the operator's {@link StaticArgument}s have any conditional + * traits (see {@link StaticArgument#withConditionalTrait}) applied against the call site + * (PARTITION BY, scalar literals). Downstream consumers can then treat the operator's static + * arguments as the effective signature and use plain {@code arg.is(SET_SEMANTIC_TABLE)} checks. + * + * <p>Called from the two places where a planner-level {@link RexCall} for a PTF is first built + * for downstream consumption: {@code FlinkLogicalTableFunctionScan} converter (fresh planning) + * and {@code StreamExecProcessTableFunction.@JsonCreator} (compiled-plan restore). A no-op for + * non-PTF calls and for PTFs that declare no conditional traits. + */ + public static RexCall resolveCallTraits(RexCall call) { + if (!(call.getOperator() instanceof BridgingSqlFunction)) { + return call; + } + final BridgingSqlFunction function = (BridgingSqlFunction) call.getOperator(); + final List<StaticArgument> declared = + function.typeInference.getStaticArguments().orElse(null); + if (declared == null || declared.stream().noneMatch(StaticArgument::hasConditionalTraits)) { + return call; + } + final List<RexNode> operands = call.getOperands(); + final List<StaticArgument> resolved = + IntStream.range(0, declared.size()) + .mapToObj(i -> resolveArg(declared.get(i), declared, operands, i)) + .collect(Collectors.toList()); + if (resolved.equals(declared)) { + return call; + } + final BridgingSqlFunction rewritten = function.withStaticArguments(resolved); + // Use a fresh RexBuilder from the function's own type factory so this can run from a + // Jackson @JsonCreator that has no planner context. + return (RexCall) + new RexBuilder(function.typeFactory).makeCall(call.getType(), rewritten, operands); + } + + private static StaticArgument resolveArg( + StaticArgument declaredArg, + List<StaticArgument> declared, + List<RexNode> operands, + int index) { + // We only resolve conditional traits for the Table Argument with conditional traits + if (!declaredArg.hasConditionalTraits() + || !(operands.get(index) instanceof RexTableArgCall)) { + return declaredArg; + } + return declaredArg.applyConditionalTraits( + buildTraitContext((RexTableArgCall) operands.get(index), declared, operands)); + } + + /** + * Planner-side adapter to {@link TraitContext}. Sourced from a {@link RexCall} (PARTITION BY + * via {@link RexTableArgCall}, scalar literals via the operand list) instead of a {@link + * org.apache.flink.table.types.inference.CallContext}, since the planner doesn't carry one. The + * validation-time equivalent is {@link TraitContext#of}. + */ + private static TraitContext buildTraitContext( Review Comment: Reusing `toCallContext` from StreamPhysicalProcessFunction was not an option? Relying directly on `RexLiteral` might cause issues in the future. It is better to reuse the provided wrapper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
