gustavodemorais commented on code in PR #27886:
URL: https://github.com/apache/flink/pull/27886#discussion_r3116884131
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##########
@@ -783,14 +784,16 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.kind(PROCESS_TABLE)
.staticArguments(
StaticArgument.table(
- "input",
- Row.class,
- false,
- EnumSet.of(
- StaticArgumentTrait.TABLE,
-
StaticArgumentTrait.SET_SEMANTIC_TABLE,
-
StaticArgumentTrait.SUPPORT_UPDATES,
-
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)),
+ "input",
+ Row.class,
+ false,
+ EnumSet.of(
+ StaticArgumentTrait.TABLE,
Review Comment:
Done
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
return traits.contains(trait);
}
+ /**
+ * Context-aware trait check. Evaluates conditional trait rules against
the given context to
+ * determine the effective traits.
+ */
+ public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+ return resolveTraits(ctx).contains(trait);
+ }
+
+ /**
+ * Returns a new {@link StaticArgument} with an additional conditional
trait rule. The trait is
+ * added to the effective trait set when the condition evaluates to {@code
true} at planning
+ * time.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE,
SUPPORT_UPDATES))
+ * .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+ * .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
+ * }</pre>
+ */
+ public StaticArgument addTraitWhen(
Review Comment:
Done
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
return traits.contains(trait);
}
+ /**
+ * Context-aware trait check. Evaluates conditional trait rules against
the given context to
+ * determine the effective traits.
+ */
+ public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+ return resolveTraits(ctx).contains(trait);
+ }
+
+ /**
+ * Returns a new {@link StaticArgument} with an additional conditional
trait rule. The trait is
+ * added to the effective trait set when the condition evaluates to {@code
true} at planning
+ * time.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE,
SUPPORT_UPDATES))
+ * .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+ * .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
+ * }</pre>
+ */
+ public StaticArgument addTraitWhen(
Review Comment:
Done
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -57,18 +61,58 @@ public class StaticArgument {
private final @Nullable Class<?> conversionClass;
private final boolean isOptional;
private final EnumSet<StaticArgumentTrait> traits;
+ private final List<ConditionalTrait> conditionalTraits;
+
+ /** A trait that is conditionally added based on a {@link TraitCondition}.
*/
+ private static final class ConditionalTrait implements Serializable {
Review Comment:
Done
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
return traits.contains(trait);
}
+ /**
+ * Context-aware trait check. Evaluates conditional trait rules against
the given context to
+ * determine the effective traits.
+ */
+ public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+ return resolveTraits(ctx).contains(trait);
+ }
+
+ /**
+ * Returns a new {@link StaticArgument} with an additional conditional
trait rule. The trait is
+ * added to the effective trait set when the condition evaluates to {@code
true} at planning
+ * time.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE,
SUPPORT_UPDATES))
+ * .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+ * .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
+ * }</pre>
+ */
+ public StaticArgument addTraitWhen(
+ final TraitCondition condition, final StaticArgumentTrait trait) {
+ final List<ConditionalTrait> newList = new
ArrayList<>(this.conditionalTraits);
+ newList.add(new ConditionalTrait(condition, trait));
+ return new StaticArgument(name, dataType, conversionClass, isOptional,
traits, newList);
+ }
+
+ /** Whether this argument has conditional trait rules. */
+ public boolean hasConditionalTraits() {
+ return !conditionalTraits.isEmpty();
+ }
+
+ /** Whether any conditional trait rule may add the given trait. */
+ public boolean hasConditionalTrait(final StaticArgumentTrait trait) {
+ return conditionalTraits.stream().anyMatch(ct -> ct.trait == trait);
+ }
+
+ /**
+ * Resolves effective traits by evaluating conditional rules against the
context. Returns the
+ * base traits combined with any conditional traits whose conditions are
met.
+ */
+ public EnumSet<StaticArgumentTrait> resolveTraits(final TraitContext ctx) {
+ if (conditionalTraits.isEmpty()) {
+ return traits;
+ }
+ final EnumSet<StaticArgumentTrait> resolved = EnumSet.copyOf(traits);
+ for (final ConditionalTrait ct : conditionalTraits) {
+ if (ct.condition.test(ctx)) {
+ // ROW_SEMANTIC_TABLE and SET_SEMANTIC_TABLE are mutually
exclusive.
+ // Adding one removes the other.
+ if (ct.trait == StaticArgumentTrait.SET_SEMANTIC_TABLE) {
+ resolved.remove(StaticArgumentTrait.ROW_SEMANTIC_TABLE);
+ } else if (ct.trait == StaticArgumentTrait.ROW_SEMANTIC_TABLE)
{
+ resolved.remove(StaticArgumentTrait.SET_SEMANTIC_TABLE);
+ }
+ resolved.add(ct.trait);
+ }
+ }
+ return resolved;
+ }
+
@Override
public String toString() {
Review Comment:
Done
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
return traits.contains(trait);
}
+ /**
+ * Context-aware trait check. Evaluates conditional trait rules against
the given context to
+ * determine the effective traits.
+ */
+ public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+ return resolveTraits(ctx).contains(trait);
+ }
+
+ /**
+ * Returns a new {@link StaticArgument} with an additional conditional
trait rule. The trait is
+ * added to the effective trait set when the condition evaluates to {@code
true} at planning
+ * time.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE,
SUPPORT_UPDATES))
+ * .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+ * .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
+ * }</pre>
+ */
+ public StaticArgument addTraitWhen(
+ final TraitCondition condition, final StaticArgumentTrait trait) {
+ final List<ConditionalTrait> newList = new
ArrayList<>(this.conditionalTraits);
Review Comment:
Done
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -181,6 +181,28 @@ private static void checkReservedArgs(List<StaticArgument>
staticArgs) {
}
}
+ static TraitContext buildTraitContext(
+ final TableSemantics semantics,
+ final CallContext callContext,
+ final List<StaticArgument> staticArgs) {
+ return new TraitContext() {
+ @Override
+ public boolean hasPartitionBy() {
+ return semantics.partitionByColumns().length > 0;
Review Comment:
Done
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -181,6 +181,28 @@ private static void checkReservedArgs(List<StaticArgument>
staticArgs) {
}
}
+ static TraitContext buildTraitContext(
+ final TableSemantics semantics,
+ final CallContext callContext,
+ final List<StaticArgument> staticArgs) {
+ return new TraitContext() {
+ @Override
+ public boolean hasPartitionBy() {
+ return semantics.partitionByColumns().length > 0;
+ }
+
+ @Override
+ public <T> Optional<T> getScalarArgument(final String name, final
Class<T> clazz) {
+ for (int i = 0; i < staticArgs.size(); i++) {
+ if (staticArgs.get(i).getName().equals(name)) {
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]