raminqaf commented on code in PR #27886:
URL: https://github.com/apache/flink/pull/27886#discussion_r3122022489


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +211,91 @@ public boolean is(StaticArgumentTrait trait) {
         return traits.contains(trait);
     }
 
+    /**
+     * Context-aware trait check. Evaluates conditional trait rules against 
the given context to
+     * determine the effective traits.
+     */
+    public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+        return resolveTraits(ctx).contains(trait);
+    }
+
+    /**
+     * Returns a new {@link StaticArgument} with an additional conditional 
trait rule. The trait is
+     * added to the effective trait set when the condition evaluates to {@code 
true} at planning
+     * time. Only non-root traits (subtraits of TABLE, SCALAR, or MODEL) are 
allowed.
+     *
+     * <p>Multiple conditions for the same trait use OR semantics: the trait 
is activated if any of
+     * its conditions is met.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, 
SUPPORT_UPDATES))
+     *         .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy());
+     * }</pre>
+     */
+    public StaticArgument withConditionalTrait(
+            final StaticArgumentTrait trait, final TraitCondition condition) {
+        if (trait == StaticArgumentTrait.SCALAR

Review Comment:
   Should we make these part of the `ConditionalTrait` class? Suggestion a 
EnumSet called `IllegalConditionalTraits` and having a method 
(`isConditionalTrait`) that checks it.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A condition that determines whether a conditional trait on a {@link 
StaticArgument} should be
+ * active for a given call.
+ *
+ * <p>Conditions are evaluated at planning time using the {@link TraitContext} 
which provides access
+ * to the SQL call's properties (PARTITION BY presence, scalar literal values, 
etc.).
+ *
+ * <p>Implementations must implement {@code hashCode} and {@code equals} for 
{@link

Review Comment:
   Javadocs is inforcing to implement hashCode and equals but none of the 
implementations bellow (`argIsEqualTo` and `not`) is doing this. 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +211,91 @@ public boolean is(StaticArgumentTrait trait) {
         return traits.contains(trait);
     }
 
+    /**
+     * Context-aware trait check. Evaluates conditional trait rules against 
the given context to
+     * determine the effective traits.
+     */
+    public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+        return resolveTraits(ctx).contains(trait);
+    }
+
+    /**
+     * Returns a new {@link StaticArgument} with an additional conditional 
trait rule. The trait is
+     * added to the effective trait set when the condition evaluates to {@code 
true} at planning
+     * time. Only non-root traits (subtraits of TABLE, SCALAR, or MODEL) are 
allowed.
+     *
+     * <p>Multiple conditions for the same trait use OR semantics: the trait 
is activated if any of
+     * its conditions is met.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, 
SUPPORT_UPDATES))
+     *         .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy());
+     * }</pre>
+     */
+    public StaticArgument withConditionalTrait(
+            final StaticArgumentTrait trait, final TraitCondition condition) {
+        if (trait == StaticArgumentTrait.SCALAR
+                || trait == StaticArgumentTrait.TABLE
+                || trait == StaticArgumentTrait.MODEL) {
+            throw new IllegalArgumentException(
+                    "Root traits (SCALAR, TABLE, MODEL) cannot be 
conditional.");
+        }
+        final List<ConditionalTrait> accumulated = new 
ArrayList<>(this.conditionalTraits);
+        accumulated.add(new ConditionalTrait(condition, trait));
+        return new StaticArgument(name, dataType, conversionClass, isOptional, 
traits, accumulated);
+    }
+
+    /** Whether this argument has conditional trait rules. */
+    public boolean hasConditionalTraits() {
+        return !conditionalTraits.isEmpty();
+    }
+
+    /** Whether any conditional trait rule may add the given trait. */
+    public boolean hasConditionalTrait(final StaticArgumentTrait trait) {
+        return conditionalTraits.stream().anyMatch(c -> c.trait == trait);
+    }
+
+    /**
+     * Returns a new {@link StaticArgument} with conditional traits resolved 
against the given
+     * context. The returned argument has the effective traits baked in and no 
conditional rules.
+     */
+    public StaticArgument applyConditionalTraits(final TraitContext ctx) {
+        if (conditionalTraits.isEmpty()) {
+            return this;
+        }
+        return new StaticArgument(name, dataType, conversionClass, isOptional, 
resolveTraits(ctx));
+    }
+
+    /**
+     * Resolves effective traits by evaluating conditional rules against the 
context. Returns the
+     * base traits combined with any conditional traits whose conditions are 
met.
+     */
+    public EnumSet<StaticArgumentTrait> resolveTraits(final TraitContext ctx) {
+        if (conditionalTraits.isEmpty()) {
+            return traits;
+        }
+        final EnumSet<StaticArgumentTrait> resolved = EnumSet.copyOf(traits);
+        for (final ConditionalTrait conditionalTrait : conditionalTraits) {
+            if (conditionalTrait.condition.test(ctx)) {
+                removeMutuallyExclusiveTraits(resolved, 
conditionalTrait.trait);
+                resolved.add(conditionalTrait.trait);
+            }
+        }
+        return resolved;
+    }
+
+    /** ROW and SET semantics are mutually exclusive - adding one removes the 
other. */
+    private static void removeMutuallyExclusiveTraits(
+            final EnumSet<StaticArgumentTrait> traits, final 
StaticArgumentTrait adding) {
+        if (adding == StaticArgumentTrait.SET_SEMANTIC_TABLE) {

Review Comment:
   We can introduce a `getIncompatibleWith()` method and simplify this to
   ```suggestion
   private static void removeMutuallyExclusiveTraits(
             EnumSet<StaticArgumentTrait> traits, StaticArgumentTrait adding) {
         traits.removeAll(adding.getIncompatibleWith());
     }
   ```
   
   or add an `else` branch for fast fails (personally prefer a switch with 
default here)
   



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A condition that determines whether a conditional trait on a {@link 
StaticArgument} should be
+ * active for a given call.
+ *
+ * <p>Conditions are evaluated at planning time using the {@link TraitContext} 
which provides access
+ * to the SQL call's properties (PARTITION BY presence, scalar literal values, 
etc.).
+ *
+ * <p>Implementations must implement {@code hashCode} and {@code equals} for 
{@link
+ * StaticArgument#equals}/{@link StaticArgument#hashCode} to work correctly.
+ *
+ * <p>Use the static factory methods for common conditions:
+ *
+ * <pre>{@code
+ * import static org.apache.flink.table.types.inference.TraitCondition.*;
+ *
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, 
SUPPORT_UPDATES))
+ *         .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy());
+ * }</pre>
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface TraitCondition {
+
+    /** Evaluates this condition against the given context. */
+    boolean test(TraitContext ctx);
+
+    /** True when PARTITION BY is provided on the table argument. */
+    static TraitCondition hasPartitionBy() {
+        return TraitContext::hasPartitionBy;
+    }
+
+    /** True when the named scalar argument equals the expected value. */
+    @SuppressWarnings("unchecked")
+    static <T> TraitCondition argIsEqualTo(final String name, final T 
expected) {

Review Comment:
   Should we insure type safety by passing the `Class<T> clazz` to the method?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -182,6 +182,32 @@ private static void checkReservedArgs(List<StaticArgument> 
staticArgs) {
         }
     }
 
+    static TraitContext buildTraitContext(
+            @Nullable final TableSemantics semantics,
+            final CallContext callContext,
+            final List<StaticArgument> staticArgs) {
+        return new TraitContext() {

Review Comment:
   We have two implementations of `TraitContext`. One here and the other one in 
`StreamPhysicalProcessTableFunction` any reason for that?
   
   If they need to be similar move the implementation into TraitContext itself 
as a static factory (`TraitContext.of( @Nullable TableSemantics semantics,
             CallContext callContext,
             List<StaticArgument> staticArgs)`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to