twalthr commented on code in PR #27886:
URL: https://github.com/apache/flink/pull/27886#discussion_r3057248729
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -54,7 +54,7 @@ SELECT * FROM TO_CHANGELOG(
| Parameter | Required | Description |
|:-------------|:---------|:------------|
-| `input` | Yes | The input table. Must include `PARTITION BY` for
parallel execution. Accepts insert-only, retract, and upsert tables. |
+| `input` | Yes | The input table. With `PARTITION BY`, rows with
the same key are co-located for parallel execution. Without `PARTITION BY`,
each row is processed independently. Accepts insert-only, retract, and upsert
tables. For upsert tables, providing `PARTITION BY` is recommended for better
performance. |
Review Comment:
```suggestion
| `input` | Yes | The input table. With `PARTITION BY`, rows with
the same key are co-located for parallel execution (set semantics). Without
`PARTITION BY`, each row is processed independently (row semantics). Accepts
insert-only, retract, and upsert tables. |
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -57,18 +61,58 @@ public class StaticArgument {
private final @Nullable Class<?> conversionClass;
private final boolean isOptional;
private final EnumSet<StaticArgumentTrait> traits;
+ private final List<ConditionalTrait> conditionalTraits;
+
+ /** A trait that is conditionally added based on a {@link TraitCondition}.
*/
+ private static final class ConditionalTrait implements Serializable {
Review Comment:
nit: classes to the bottom of the file
```suggestion
private static final class ConditionalTrait {
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -99,6 +99,15 @@ SELECT * FROM TO_CHANGELOG(
-- +I[id:2, op:'DELETE', name:'Bob', cnt:1]
```
+#### Without PARTITION BY
+
+```sql
Review Comment:
Let's remove all PARTITION BY examples for now. A default TO_CHANGELOG
example should always be without PARTITION BY. They do not provide any benefit
but rather add exchange overhead.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##########
@@ -783,14 +784,16 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.kind(PROCESS_TABLE)
.staticArguments(
StaticArgument.table(
- "input",
- Row.class,
- false,
- EnumSet.of(
- StaticArgumentTrait.TABLE,
-
StaticArgumentTrait.SET_SEMANTIC_TABLE,
-
StaticArgumentTrait.SUPPORT_UPDATES,
-
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)),
+ "input",
+ Row.class,
+ false,
+ EnumSet.of(
+ StaticArgumentTrait.TABLE,
Review Comment:
Just to make things explicit, I would add
`StateicArgumentTrait.ROW_SEMANTIC` here as well.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
return traits.contains(trait);
}
+ /**
+ * Context-aware trait check. Evaluates conditional trait rules against
the given context to
+ * determine the effective traits.
+ */
+ public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+ return resolveTraits(ctx).contains(trait);
+ }
+
+ /**
+ * Returns a new {@link StaticArgument} with an additional conditional
trait rule. The trait is
+ * added to the effective trait set when the condition evaluates to {@code
true} at planning
+ * time.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE,
SUPPORT_UPDATES))
+ * .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+ * .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
+ * }</pre>
+ */
+ public StaticArgument addTraitWhen(
Review Comment:
```suggestion
public StaticArgument withConditionalTrait(
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
return traits.contains(trait);
}
+ /**
+ * Context-aware trait check. Evaluates conditional trait rules against
the given context to
+ * determine the effective traits.
+ */
+ public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+ return resolveTraits(ctx).contains(trait);
+ }
+
+ /**
+ * Returns a new {@link StaticArgument} with an additional conditional
trait rule. The trait is
+ * added to the effective trait set when the condition evaluates to {@code
true} at planning
+ * time.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE,
SUPPORT_UPDATES))
+ * .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+ * .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
+ * }</pre>
+ */
+ public StaticArgument addTraitWhen(
+ final TraitCondition condition, final StaticArgumentTrait trait) {
+ final List<ConditionalTrait> newList = new
ArrayList<>(this.conditionalTraits);
Review Comment:
we should limit the conditional traits. e.g. a scalar should not become a
table out of a sudden. root changes are not allowed. this should limit the
logic to tables currently, because it is the only trait with subtraits.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
return traits.contains(trait);
}
+ /**
+ * Context-aware trait check. Evaluates conditional trait rules against
the given context to
+ * determine the effective traits.
+ */
+ public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+ return resolveTraits(ctx).contains(trait);
+ }
+
+ /**
+ * Returns a new {@link StaticArgument} with an additional conditional
trait rule. The trait is
+ * added to the effective trait set when the condition evaluates to {@code
true} at planning
+ * time.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE,
SUPPORT_UPDATES))
+ * .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+ * .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
+ * }</pre>
+ */
+ public StaticArgument addTraitWhen(
+ final TraitCondition condition, final StaticArgumentTrait trait) {
+ final List<ConditionalTrait> newList = new
ArrayList<>(this.conditionalTraits);
+ newList.add(new ConditionalTrait(condition, trait));
+ return new StaticArgument(name, dataType, conversionClass, isOptional,
traits, newList);
+ }
+
+ /** Whether this argument has conditional trait rules. */
+ public boolean hasConditionalTraits() {
+ return !conditionalTraits.isEmpty();
+ }
+
+ /** Whether any conditional trait rule may add the given trait. */
+ public boolean hasConditionalTrait(final StaticArgumentTrait trait) {
+ return conditionalTraits.stream().anyMatch(ct -> ct.trait == trait);
+ }
+
+ /**
+ * Resolves effective traits by evaluating conditional rules against the
context. Returns the
+ * base traits combined with any conditional traits whose conditions are
met.
+ */
+ public EnumSet<StaticArgumentTrait> resolveTraits(final TraitContext ctx) {
+ if (conditionalTraits.isEmpty()) {
+ return traits;
+ }
+ final EnumSet<StaticArgumentTrait> resolved = EnumSet.copyOf(traits);
+ for (final ConditionalTrait ct : conditionalTraits) {
+ if (ct.condition.test(ctx)) {
+ // ROW_SEMANTIC_TABLE and SET_SEMANTIC_TABLE are mutually
exclusive.
+ // Adding one removes the other.
+ if (ct.trait == StaticArgumentTrait.SET_SEMANTIC_TABLE) {
+ resolved.remove(StaticArgumentTrait.ROW_SEMANTIC_TABLE);
+ } else if (ct.trait == StaticArgumentTrait.ROW_SEMANTIC_TABLE)
{
+ resolved.remove(StaticArgumentTrait.SET_SEMANTIC_TABLE);
+ }
+ resolved.add(ct.trait);
+ }
+ }
+ return resolved;
+ }
+
@Override
public String toString() {
Review Comment:
update toString as well. syntax `myUntypedTable {TABLE BY ROW, TABLE BY
SET}` we add all potential conditional traits to the list but without any
special syntax, the toString format is complex enough.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -656,12 +684,13 @@ private static void checkRowSemantics(StaticArgument
staticArg, TableSemantics s
}
}
- private static void checkSetSemantics(StaticArgument staticArg,
TableSemantics semantics) {
- if (!staticArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
+ private static void checkSetSemantics(
+ StaticArgument staticArg, TableSemantics semantics,
TraitContext traitCtx) {
Review Comment:
How about we copy `StaticArgument` with the effective traits. this would
simplify this tool and you don't have to pass TraitContext everywhere. we can
provide `StaticArgument.applyConditionalTraits(ctx): StaticArgument`
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -181,6 +181,28 @@ private static void checkReservedArgs(List<StaticArgument>
staticArgs) {
}
}
+ static TraitContext buildTraitContext(
+ final TableSemantics semantics,
+ final CallContext callContext,
+ final List<StaticArgument> staticArgs) {
+ return new TraitContext() {
+ @Override
+ public boolean hasPartitionBy() {
+ return semantics.partitionByColumns().length > 0;
+ }
+
+ @Override
+ public <T> Optional<T> getScalarArgument(final String name, final
Class<T> clazz) {
+ for (int i = 0; i < staticArgs.size(); i++) {
+ if (staticArgs.get(i).getName().equals(name)) {
Review Comment:
check for is scalar and then isLiteral.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -181,6 +181,28 @@ private static void checkReservedArgs(List<StaticArgument>
staticArgs) {
}
}
+ static TraitContext buildTraitContext(
+ final TableSemantics semantics,
+ final CallContext callContext,
+ final List<StaticArgument> staticArgs) {
+ return new TraitContext() {
+ @Override
+ public boolean hasPartitionBy() {
+ return semantics.partitionByColumns().length > 0;
Review Comment:
The trait context should be available to all args. So `TableSemantics`
should be nullable.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -314,13 +336,16 @@ private List<Field> derivePassThroughFields(CallContext
callContext) {
if
(arg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) {
return
DataType.getFields(argDataTypes.get(pos)).stream();
}
- if
(!arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
+ final TableSemantics semantics =
+
callContext.getTableSemantics(pos).orElse(null);
+ if (semantics == null) {
+ return Stream.<Field>empty();
+ }
+ final TraitContext traitCtx =
Review Comment:
Build the traits as the very first, before
`arg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)`
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * A condition that determines whether a conditional trait on a {@link
StaticArgument} should be
+ * active for a given call.
+ *
+ * <p>Conditions are evaluated at planning time using the {@link TraitContext}
which provides access
+ * to the SQL call's properties (PARTITION BY presence, scalar argument
values, etc.).
Review Comment:
```suggestion
* to the SQL call's properties (PARTITION BY presence, scalar literal
values, etc.).
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * A condition that determines whether a conditional trait on a {@link
StaticArgument} should be
+ * active for a given call.
+ *
+ * <p>Conditions are evaluated at planning time using the {@link TraitContext}
which provides access
+ * to the SQL call's properties (PARTITION BY presence, scalar argument
values, etc.).
+ *
+ * <p>Use the static factory methods for common conditions:
+ *
+ * <pre>{@code
+ * import static org.apache.flink.table.types.inference.TraitCondition.*;
+ *
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE,
SUPPORT_UPDATES))
+ * .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+ * .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE)
+ * .addTraitWhen(argIsTrue("produces_full_deletes"),
REQUIRE_UPDATE_BEFORE);
+ * }</pre>
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface TraitCondition extends Serializable {
+
+ /** Evaluates this condition against the given context. */
+ boolean test(TraitContext ctx);
+
+ /** True when PARTITION BY is provided on the table argument. */
+ static TraitCondition hasPartitionBy() {
+ return TraitContext::hasPartitionBy;
+ }
+
+ /** True when the named boolean argument is provided and its value is
{@code true}. */
+ static TraitCondition argIsTrue(final String name) {
Review Comment:
generialize the is true and is false to:
```
static <T> TraitCondition argIsEqualTo(T obj) {ctx.getScalarArgument(name,
obj.getClass) == obj}
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java:
##########
@@ -276,15 +279,17 @@ private RuntimeTableSemantics createRuntimeTableSemantics(
}
final int timeColumn =
inputTimeColumns.get(tableArgCall.getInputIndex());
+ final org.apache.flink.table.types.inference.TraitContext traitCtx =
Review Comment:
pay attention to full imports, seems Claude loves to do this
```suggestion
final TraitContext traitCtx =
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java:
##########
@@ -221,7 +223,11 @@ protected RelDataType deriveRowType() {
if (uidRexNode.getKind() == SqlKind.DEFAULT) {
// Optional for constant or row semantics functions
if (staticArgs.stream()
- .noneMatch(arg ->
arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE))) {
+ .noneMatch(
+ arg ->
+
arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)
+ || arg.hasConditionalTrait(
Review Comment:
we should do this on the actual resulting trait
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java:
##########
@@ -348,6 +354,54 @@ public static List<Ord<StaticArgument>>
getProvidedInputArgs(RexCall call) {
.collect(Collectors.toList());
}
+ /**
+ * Builds a {@link TraitContext} for resolving conditional traits on a
table argument at
+ * planning time.
+ */
+ public static TraitContext buildTraitContext(
+ final RexCall call, final RexTableArgCall tableArgCall) {
+ final List<StaticArgument> declaredArgs = getStaticArguments(call);
+ final List<RexNode> operands = call.getOperands();
+
+ return new TraitContext() {
+ @Override
+ public boolean hasPartitionBy() {
+ return tableArgCall.getPartitionKeys().length > 0;
+ }
+
+ @Override
+ public <T> Optional<T> getScalarArgument(final String name, final
Class<T> clazz) {
+ return findScalarLiteral(declaredArgs, operands, name, clazz);
+ }
+ };
+ }
+
+ private static List<StaticArgument> getStaticArguments(final RexCall call)
{
+ final BridgingSqlFunction.WithTableFunction function =
+ (BridgingSqlFunction.WithTableFunction) call.getOperator();
+ return function.getTypeInference()
+ .getStaticArguments()
+ .orElseThrow(IllegalStateException::new);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Optional<T> findScalarLiteral(
+ final List<StaticArgument> declaredArgs,
+ final List<RexNode> operands,
+ final String name,
+ final Class<T> clazz) {
+ for (int i = 0; i < declaredArgs.size(); i++) {
+ if (declaredArgs.get(i).getName().equals(name)) {
+ final RexNode operand = operands.get(i);
+ if (operand.getKind() == SqlKind.DEFAULT || !(operand
instanceof RexLiteral)) {
+ return Optional.empty();
+ }
+ return Optional.ofNullable(((RexLiteral)
operand).getValueAs(clazz));
Review Comment:
this is too simple, it should follow the same rules as CallContext does.
Otherwise it won't be possible e.g. to get `Instant.class` or other literals.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java:
##########
@@ -276,15 +279,17 @@ private RuntimeTableSemantics createRuntimeTableSemantics(
}
final int timeColumn =
inputTimeColumns.get(tableArgCall.getInputIndex());
+ final org.apache.flink.table.types.inference.TraitContext traitCtx =
Review Comment:
same comment as above. resolve the static arg as early as possible to not
reconstruct TraitContext multiple times
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalProcessTableFunctionRule.java:
##########
@@ -127,9 +138,17 @@ private static RelNode applyDistributionOnInput(
}
private static FlinkRelDistribution deriveDistribution(
- RexTableArgCall tableOperand, TableCharacteristic
tableCharacteristic) {
- if (tableCharacteristic.semantics == Semantics.SET) {
- final int[] partitionKeys = tableOperand.getPartitionKeys();
+ RexTableArgCall tableOperand,
+ TableCharacteristic tableCharacteristic,
+ StaticArgument staticArg) {
+ final int[] partitionKeys = tableOperand.getPartitionKeys();
+ final boolean hasPartitionBy = partitionKeys.length > 0;
+ final boolean reportedAsSet = tableCharacteristic.semantics ==
Semantics.SET;
+ final boolean setIsConditional =
+
staticArg.hasConditionalTrait(StaticArgumentTrait.SET_SEMANTIC_TABLE);
Review Comment:
too fragile. determine the effective StaticArgument first and then execute
this logic.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java:
##########
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
return traits.contains(trait);
}
+ /**
+ * Context-aware trait check. Evaluates conditional trait rules against
the given context to
+ * determine the effective traits.
+ */
+ public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+ return resolveTraits(ctx).contains(trait);
+ }
+
+ /**
+ * Returns a new {@link StaticArgument} with an additional conditional
trait rule. The trait is
+ * added to the effective trait set when the condition evaluates to {@code
true} at planning
+ * time.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE,
SUPPORT_UPDATES))
+ * .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
+ * .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
+ * }</pre>
+ */
+ public StaticArgument addTraitWhen(
Review Comment:
I would swap the parameter order then: `withConditionalTrait(trait,
condition)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]