This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 055edcc7cb8 [FLINK-39392][table] Support conditional traits for PTFs
055edcc7cb8 is described below
commit 055edcc7cb8abdaa4be33b3baff365571c1da023
Author: Gustavo de Morais <[email protected]>
AuthorDate: Mon Apr 27 14:07:53 2026 +0200
[FLINK-39392][table] Support conditional traits for PTFs
This closes #27886.
---
.../docs/sql/reference/queries/changelog.md | 10 +-
.../functions/BuiltInFunctionDefinitions.java | 31 ++--
.../table/types/inference/BuiltInCondition.java | 74 +++++++++
.../table/types/inference/StaticArgument.java | 134 ++++++++++++++++-
.../table/types/inference/StaticArgumentTrait.java | 24 +++
.../table/types/inference/SystemTypeInference.java | 44 +++++-
.../table/types/inference/TraitCondition.java | 71 +++++++++
.../flink/table/types/inference/TraitContext.java | 81 ++++++++++
flink-table/flink-table-planner/AGENTS.md | 16 ++
.../functions/bridging/BridgingSqlFunction.java | 166 ++++++++++++++++++++-
.../stream/StreamExecProcessTableFunction.java | 6 +-
.../logical/FlinkLogicalTableFunctionScan.java | 10 +-
.../stream/StreamPhysicalProcessTableFunction.java | 23 ---
.../codegen/ProcessTableRunnerGenerator.scala | 8 +-
.../FlinkChangelogModeInferenceProgram.scala | 34 ++---
.../planner/plan/stream/sql/ToChangelogTest.java | 17 +++
.../planner/plan/stream/sql/ToChangelogTest.xml | 20 +++
17 files changed, 687 insertions(+), 82 deletions(-)
diff --git a/docs/content/docs/sql/reference/queries/changelog.md
b/docs/content/docs/sql/reference/queries/changelog.md
index 60773af2b27..494dcc60be8 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -177,7 +177,7 @@ This is useful when you need to materialize changelog
events into a downstream s
```sql
SELECT * FROM TO_CHANGELOG(
- input => TABLE source_table,
+ input => TABLE source_table [PARTITION BY key_col],
[op => DESCRIPTOR(op_column_name),]
[op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
)
@@ -185,10 +185,10 @@ SELECT * FROM TO_CHANGELOG(
### Parameters
-| Parameter | Required | Description |
-|:-------------|:---------|:------------|
-| `input` | Yes | The input table. Accepts insert-only, retract, and
upsert tables. |
-| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. |
+| Parameter | Required | Description
|
+|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `input` | Yes | The input table. With `PARTITION BY`, rows with
the same key are co-located and run in the same operator instance. Without
`PARTITION BY`, each row is processed independently. Accepts insert-only,
retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key
should match or be a subset of the upsert key of the subquery.
|
+| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`.
|
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation
names to custom output codes. Keys can contain comma-separated names to map
multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When
provided, only mapped operations are forwarded - unmapped events are dropped.
Each change operation may appear at most once across all entries. |
#### Default op_mapping
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index bbe38ca13d5..cd25361923c 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -36,6 +36,7 @@ import
org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.StaticArgument;
import org.apache.flink.table.types.inference.StaticArgumentTrait;
+import org.apache.flink.table.types.inference.TraitCondition;
import org.apache.flink.table.types.inference.TypeStrategies;
import
org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy;
import
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
@@ -785,22 +786,22 @@ public final class BuiltInFunctionDefinitions {
.name("TO_CHANGELOG")
.kind(PROCESS_TABLE)
.staticArguments(
- // Row semantics (no PARTITION BY). Accepts
updating
- // inputs. The planner inserts ChangelogNormalize
for
- // upsert sources to produce UPDATE_BEFORE and full
- // DELETE rows.
+ // Row semantics (no PARTITION BY).
+ // With PARTITION BY, switches to set
+ // semantics for co-located parallel execution.
StaticArgument.table(
- "input",
- Row.class,
- false,
- EnumSet.of(
- StaticArgumentTrait.TABLE,
-
StaticArgumentTrait.ROW_SEMANTIC_TABLE,
-
StaticArgumentTrait.SUPPORT_UPDATES,
-
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
- // Not strictly necessary but
explicitly state that
- // we require full deletes.
-
StaticArgumentTrait.REQUIRE_FULL_DELETE)),
+ "input",
+ Row.class,
+ false,
+ EnumSet.of(
+ StaticArgumentTrait.TABLE,
+
StaticArgumentTrait.ROW_SEMANTIC_TABLE,
+
StaticArgumentTrait.SUPPORT_UPDATES,
+
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
+
StaticArgumentTrait.REQUIRE_FULL_DELETE))
+ .withConditionalTrait(
+
StaticArgumentTrait.SET_SEMANTIC_TABLE,
+ TraitCondition.hasPartitionBy()),
StaticArgument.scalar("op",
DataTypes.DESCRIPTOR(), true),
StaticArgument.scalar(
"op_mapping",
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java
new file mode 100644
index 00000000000..2a06191ad13
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+/**
+ * Internal value-comparable wrapper used by all built-in {@link
TraitCondition} factories. Equality
+ * is keyed by {@code kind + args}; the {@code impl} predicate is reused but
never compared, so two
+ * conditions built from the same factory inputs are equal.
+ *
+ * <p>Lives outside {@link TraitCondition} because Java forbids {@code
private} nested types in
+ * interfaces (they are implicitly {@code public static}); top-level
package-private gives the same
+ * encapsulation.
+ */
+final class BuiltInCondition implements TraitCondition {
+
+ /** Tag identifying which factory produced the condition. */
+ enum Kind {
+ HAS_PARTITION_BY,
+ ARG_IS_EQUAL_TO,
+ NOT
+ }
+
+ private final Kind kind;
+ private final List<Object> args;
+ private final Predicate<TraitContext> impl;
+
+ BuiltInCondition(final Kind kind, final List<Object> args, final
Predicate<TraitContext> impl) {
+ this.kind = kind;
+ this.args = args;
+ this.impl = impl;
+ }
+
+ @Override
+ public boolean test(final TraitContext ctx) {
+ return impl.test(ctx);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof BuiltInCondition)) {
+ return false;
+ }
+ final BuiltInCondition that = (BuiltInCondition) o;
+ return kind == that.kind && args.equals(that.args);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(kind, args);
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
index 3f5c48db8d2..546f30a7f7e 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
@@ -31,10 +31,13 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Describes an argument in a static signature that is not overloaded and does
not support varargs.
@@ -57,6 +60,7 @@ public class StaticArgument {
private final @Nullable Class<?> conversionClass;
private final boolean isOptional;
private final EnumSet<StaticArgumentTrait> traits;
+ private final List<ConditionalTrait> conditionalTraits;
private StaticArgument(
String name,
@@ -64,11 +68,22 @@ public class StaticArgument {
@Nullable Class<?> conversionClass,
boolean isOptional,
EnumSet<StaticArgumentTrait> traits) {
+ this(name, dataType, conversionClass, isOptional, traits, List.of());
+ }
+
+ private StaticArgument(
+ String name,
+ @Nullable DataType dataType,
+ @Nullable Class<?> conversionClass,
+ boolean isOptional,
+ EnumSet<StaticArgumentTrait> traits,
+ List<ConditionalTrait> conditionalTraits) {
this.name = Preconditions.checkNotNull(name, "Name must not be null.");
this.dataType = dataType;
this.conversionClass = conversionClass;
this.isOptional = isOptional;
this.traits = Preconditions.checkNotNull(traits, "Traits must not be
null.");
+ this.conditionalTraits = conditionalTraits;
checkName();
checkTraits(traits);
checkOptionalType();
@@ -196,6 +211,84 @@ public class StaticArgument {
return traits.contains(trait);
}
+ /**
+ * Context-aware trait check. Evaluates conditional trait rules against
the given context to
+ * determine the effective traits.
+ */
+ public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
+ return resolveTraits(ctx).contains(trait);
+ }
+
+ /**
+ * Returns a new {@link StaticArgument} with an additional conditional
trait rule. The trait is
+ * added to the effective trait set when the condition evaluates to {@code
true} at planning
+ * time. Only non-root traits (subtraits of TABLE, SCALAR, or MODEL) are
allowed.
+ *
+ * <p>Multiple conditions for the same trait use OR semantics: the trait
is activated if any of
+ * its conditions is met.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE,
SUPPORT_UPDATES))
+ * .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy());
+ * }</pre>
+ */
+ public StaticArgument withConditionalTrait(
+ final StaticArgumentTrait trait, final TraitCondition condition) {
+ if (trait.isRoot()) {
+ throw new IllegalArgumentException(
+ "Root traits (SCALAR, TABLE, MODEL) cannot be
conditional.");
+ }
+ final List<ConditionalTrait> accumulated = new
ArrayList<>(this.conditionalTraits);
+ accumulated.add(new ConditionalTrait(condition, trait));
+ return new StaticArgument(name, dataType, conversionClass, isOptional,
traits, accumulated);
+ }
+
+ /** Whether this argument has conditional trait rules. */
+ public boolean hasConditionalTraits() {
+ return !conditionalTraits.isEmpty();
+ }
+
+ /** Whether any conditional trait rule may add the given trait. */
+ public boolean hasConditionalTrait(final StaticArgumentTrait trait) {
+ return conditionalTraits.stream().anyMatch(c -> c.trait == trait);
+ }
+
+ /**
+ * Returns a new {@link StaticArgument} with conditional traits resolved
against the given
+ * context. The returned argument has the effective traits baked in and no
conditional rules.
+ */
+ public StaticArgument applyConditionalTraits(final TraitContext ctx) {
+ if (conditionalTraits.isEmpty()) {
+ return this;
+ }
+ return new StaticArgument(name, dataType, conversionClass, isOptional,
resolveTraits(ctx));
+ }
+
+ /**
+ * Resolves effective traits by evaluating conditional rules against the
context. Returns the
+ * base traits combined with any conditional traits whose conditions are
met.
+ */
+ public EnumSet<StaticArgumentTrait> resolveTraits(final TraitContext ctx) {
+ if (conditionalTraits.isEmpty()) {
+ return traits;
+ }
+ final EnumSet<StaticArgumentTrait> resolved = EnumSet.copyOf(traits);
+ for (final ConditionalTrait conditionalTrait : conditionalTraits) {
+ if (conditionalTrait.condition.test(ctx)) {
+ removeMutuallyExclusiveTraits(resolved,
conditionalTrait.trait);
+ resolved.add(conditionalTrait.trait);
+ }
+ }
+ return resolved;
+ }
+
+ private static void removeMutuallyExclusiveTraits(
+ final EnumSet<StaticArgumentTrait> traits, final
StaticArgumentTrait adding) {
+ traits.removeAll(adding.getIncompatibleWith());
+ }
+
@Override
public String toString() {
final StringBuilder s = new StringBuilder();
@@ -210,11 +303,13 @@ public class StaticArgument {
s.append(dataType);
}
if (!traits.equals(EnumSet.of(StaticArgumentTrait.SCALAR))) {
+ final Stream<String> baseTraitNames =
+ traits.stream().map(Enum::name).map(n -> n.replace('_', '
'));
+ final Stream<String> conditionalTraitNames =
+ conditionalTraits.stream().map(c ->
c.trait.name().replace('_', ' '));
s.append(" ");
s.append(
- traits.stream()
- .map(Enum::name)
- .map(n -> n.replace('_', ' '))
+ Stream.concat(baseTraitNames, conditionalTraitNames)
.collect(Collectors.joining(", ", "{", "}")));
}
return s.toString();
@@ -233,12 +328,13 @@ public class StaticArgument {
&& Objects.equals(name, that.name)
&& Objects.equals(dataType, that.dataType)
&& Objects.equals(conversionClass, that.conversionClass)
- && Objects.equals(traits, that.traits);
+ && Objects.equals(traits, that.traits)
+ && Objects.equals(conditionalTraits, that.conditionalTraits);
}
@Override
public int hashCode() {
- return Objects.hash(name, dataType, conversionClass, isOptional,
traits);
+ return Objects.hash(name, dataType, conversionClass, isOptional,
traits, conditionalTraits);
}
private void checkName() {
@@ -354,4 +450,32 @@ public class StaticArgument {
throw new ValidationException("Model arguments must not be
optional.");
}
}
+
+ /** A trait that is conditionally added based on a {@link TraitCondition}.
*/
+ private static final class ConditionalTrait {
+ private final TraitCondition condition;
+ private final StaticArgumentTrait trait;
+
+ ConditionalTrait(final TraitCondition condition, final
StaticArgumentTrait trait) {
+ this.condition = condition;
+ this.trait = trait;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ConditionalTrait that = (ConditionalTrait) o;
+ return Objects.equals(condition, that.condition) && trait ==
that.trait;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(condition, trait);
+ }
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
index 647248ada2d..7b0083ed2dc 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.types.inference;
import org.apache.flink.annotation.PublicEvolving;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
import java.util.Set;
import java.util.stream.Collectors;
@@ -51,6 +53,8 @@ public enum StaticArgumentTrait {
REQUIRE_UPDATE_BEFORE(SUPPORT_UPDATES),
REQUIRE_FULL_DELETE(SUPPORT_UPDATES);
+ private static final Set<StaticArgumentTrait> ROOTS = EnumSet.of(SCALAR,
TABLE, MODEL);
+
private final Set<StaticArgumentTrait> requirements;
StaticArgumentTrait(StaticArgumentTrait... requirements) {
@@ -60,4 +64,24 @@ public enum StaticArgumentTrait {
public Set<StaticArgumentTrait> getRequirements() {
return requirements;
}
+
+ /** Whether this trait is one of the top-level roots (SCALAR, TABLE,
MODEL). */
+ public boolean isRoot() {
+ return ROOTS.contains(this);
+ }
+
+ /**
+ * Returns the traits that are mutually exclusive with this one. Adding
this trait to a set
+ * implies removing all returned traits. Empty by default.
+ */
+ public Set<StaticArgumentTrait> getIncompatibleWith() {
+ switch (this) {
+ case SET_SEMANTIC_TABLE:
+ return Collections.singleton(ROW_SEMANTIC_TABLE);
+ case ROW_SEMANTIC_TABLE:
+ return Collections.singleton(SET_SEMANTIC_TABLE);
+ default:
+ return Collections.emptySet();
+ }
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
index 92fd79f407b..971831d6ac4 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
@@ -182,6 +182,32 @@ public class SystemTypeInference {
}
}
+ /**
+ * Resolves conditional traits (see {@link
StaticArgument#withConditionalTrait}) on every static
+ * arg using the call's semantics and operands. Called once at the top of
{@link
+ * SystemInputStrategy#inferInputTypes} and {@link
SystemOutputStrategy#inferType}; downstream
+ * helpers receive the resolved list and iterate it directly.
+ */
+ private static List<StaticArgument> resolveStaticArgs(
+ final CallContext callContext, final @Nullable
List<StaticArgument> staticArgs) {
+ if (staticArgs == null) {
+ return null;
+ }
+ return IntStream.range(0, staticArgs.size())
+ .mapToObj(
+ pos -> {
+ final StaticArgument arg = staticArgs.get(pos);
+ if (!arg.hasConditionalTraits()) {
+ return arg;
+ }
+ final TableSemantics semantics =
+
callContext.getTableSemantics(pos).orElse(null);
+ return arg.applyConditionalTraits(
+ TraitContext.of(semantics, callContext,
staticArgs));
+ })
+ .collect(Collectors.toList());
+ }
+
private static void checkMultipleTableArgs(List<StaticArgument>
staticArgs) {
final List<StaticArgument> tableArgs =
staticArgs.stream()
@@ -262,6 +288,10 @@ public class SystemTypeInference {
return origin.inferType(callContext)
.map(
functionDataType -> {
+ // Resolve once so all helpers see the same
effective signature
+ // (PARTITION BY / scalar literals applied to
conditional traits).
+ final List<StaticArgument> resolvedArgs =
+ resolveStaticArgs(callContext,
staticArgs);
final List<Field> fields = new ArrayList<>();
// According to the SQL standard, pass-through
columns should
@@ -273,11 +303,11 @@ public class SystemTypeInference {
// - Flink SESSION windows add pass-through
columns at the beginning
// - Oracle adds pass-through columns for all
ROW semantics args, so
// this whole topic is kind of vendor specific
already
-
fields.addAll(derivePassThroughFields(callContext));
+
fields.addAll(derivePassThroughFields(callContext, resolvedArgs));
fields.addAll(deriveFunctionOutputFields(functionDataType));
if (!disableSystemArgs) {
-
fields.addAll(deriveRowtimeField(callContext));
+
fields.addAll(deriveRowtimeField(callContext, resolvedArgs));
}
final List<Field> uniqueFields =
makeFieldNamesUnique(fields);
@@ -303,7 +333,8 @@ public class SystemTypeInference {
.collect(Collectors.toList());
}
- private List<Field> derivePassThroughFields(CallContext callContext) {
+ private List<Field> derivePassThroughFields(
+ CallContext callContext, List<StaticArgument> staticArgs) {
if (functionKind != FunctionKind.PROCESS_TABLE) {
return List.of();
}
@@ -349,7 +380,8 @@ public class SystemTypeInference {
.collect(Collectors.toList());
}
- private List<Field> deriveRowtimeField(CallContext callContext) {
+ private List<Field> deriveRowtimeField(
+ CallContext callContext, List<StaticArgument> staticArgs) {
if (this.functionKind != FunctionKind.PROCESS_TABLE) {
return List.of();
}
@@ -566,8 +598,10 @@ public class SystemTypeInference {
+ "that is not overloaded and doesn't contain
varargs.");
}
+ // Resolve once so the rest of validation iterates the effective
signature.
+ final List<StaticArgument> resolvedArgs =
resolveStaticArgs(callContext, staticArgs);
try {
- checkTableArgs(staticArgs, callContext);
+ checkTableArgs(resolvedArgs, callContext);
if (!disableSystemArgs) {
checkUidArg(callContext);
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java
new file mode 100644
index 00000000000..93bdc5c8ce4
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.List;
+
+/**
+ * A condition that determines whether a conditional trait on a {@link
StaticArgument} should be
+ * active for a given call.
+ *
+ * <p>Conditions are evaluated at planning time using the {@link TraitContext}
which provides access
+ * to the SQL call's properties (PARTITION BY presence, scalar literal values,
etc.).
+ *
+ * <p>Implementations must implement {@code hashCode} and {@code equals} for
{@link
+ * StaticArgument#equals}/{@link StaticArgument#hashCode} to work correctly.
The built-in factories
+ * below return value-comparable instances; user-supplied lambdas do not -
prefer the factories.
+ *
+ * <pre>{@code
+ * import static org.apache.flink.table.types.inference.TraitCondition.*;
+ *
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE,
SUPPORT_UPDATES))
+ * .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy());
+ * }</pre>
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface TraitCondition {
+
+ /** Evaluates this condition against the given context. */
+ boolean test(TraitContext ctx);
+
+ /** True when PARTITION BY is provided on the table argument. */
+ static TraitCondition hasPartitionBy() {
+ return new BuiltInCondition(
+ BuiltInCondition.Kind.HAS_PARTITION_BY, List.of(),
TraitContext::hasPartitionBy);
+ }
+
+ /** True when the named scalar argument equals the expected value. */
+ @SuppressWarnings("unchecked")
+ static <T> TraitCondition argIsEqualTo(final String name, final T
expected) {
+ final Class<T> clazz = (Class<T>) expected.getClass();
+ return new BuiltInCondition(
+ BuiltInCondition.Kind.ARG_IS_EQUAL_TO,
+ List.of(name, expected),
+ ctx -> ctx.getScalarArgument(name,
clazz).map(expected::equals).orElse(false));
+ }
+
+ /** Negates the given condition. */
+ static TraitCondition not(final TraitCondition condition) {
+ return new BuiltInCondition(
+ BuiltInCondition.Kind.NOT, List.of(condition), ctx ->
!condition.test(ctx));
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java
new file mode 100644
index 00000000000..e58a4b36dce
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.functions.TableSemantics;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Read-only context provided to {@link TraitCondition} during trait
resolution at planning time.
+ *
+ * <p>Allows conditions to inspect the SQL call (e.g., whether PARTITION BY
was provided, or what
+ * value a scalar argument has) to decide whether a conditional trait should
be active.
+ */
+@PublicEvolving
+public interface TraitContext {
+
+ /** Whether PARTITION BY was provided on this table argument. */
+ boolean hasPartitionBy();
+
+ /**
+ * Reads a scalar argument value by name.
+ *
+ * @return the argument value, or empty if the argument was not provided,
is not a literal, or
+ * cannot be converted to the requested type
+ */
+ <T> Optional<T> getScalarArgument(String name, Class<T> clazz);
+
+ /**
+ * Builds a {@link TraitContext} from validation-time inputs.
+ *
+ * <p>Used by {@code SystemTypeInference} when wrapping a function's
strategies. Planner-side
+ * code that has a {@code RexCall} should use the planner adapter in
{@code BridgingSqlFunction}
+ * instead.
+ */
+ static TraitContext of(
+ @Nullable final TableSemantics semantics,
+ final CallContext callContext,
+ final List<StaticArgument> staticArgs) {
+ return new TraitContext() {
+ @Override
+ public boolean hasPartitionBy() {
+ return semantics != null &&
semantics.partitionByColumns().length > 0;
+ }
+
+ @Override
+ public <T> Optional<T> getScalarArgument(final String name, final
Class<T> clazz) {
+ for (int i = 0; i < staticArgs.size(); i++) {
+ final StaticArgument arg = staticArgs.get(i);
+ if (arg.is(StaticArgumentTrait.SCALAR) &&
arg.getName().equals(name)) {
+ if (!callContext.isArgumentLiteral(i)) {
+ return Optional.empty();
+ }
+ return callContext.getArgumentValue(i, clazz);
+ }
+ }
+ return Optional.empty();
+ }
+ };
+ }
+}
diff --git a/flink-table/flink-table-planner/AGENTS.md
b/flink-table/flink-table-planner/AGENTS.md
index 969c5b36656..8b45353ecbf 100644
--- a/flink-table/flink-table-planner/AGENTS.md
+++ b/flink-table/flink-table-planner/AGENTS.md
@@ -106,6 +106,22 @@ When bumping an ExecNode version, update the
`@ExecNodeMetadata` annotation's `v
New features often introduce `ExecutionConfigOptions` entries (in
`flink-table-api-java`) for runtime tunability (e.g., cache sizes, timeouts,
batch sizes).
+### PTF conditional traits
+
+A *conditional trait* lets a PTF's table-argument traits depend on the call
site instead of being fixed at declaration. Example for `TO_CHANGELOG`: the
`input` argument is row-semantic by default (single stream, no PARTITION BY),
but switches to set-semantic when the user writes `PARTITION BY` so the runtime
can co-locate state per key. One declaration, two effective signatures
depending on the call.
+
+**Declaration.** Built-in functions add conditional rules in
`BuiltInFunctionDefinitions` via `StaticArgument.withConditionalTrait(trait,
condition)`. The condition (a `TraitCondition`) is a small value-comparable
predicate evaluated against a `TraitContext`. Built-in factories live on
`TraitCondition` (`hasPartitionBy()`, `argIsEqualTo(name, value)`, `not(c)`);
under the hood they wrap into the package-private `BuiltInCondition` so
equality cascades correctly through `StaticArgument.equals`.
+
+**Evaluation.** A `TraitCondition` reads two things: whether `PARTITION BY` is
present on this table arg, and the literal value of named scalar args. Both
come through `TraitContext`. There are two factories:
`TraitContext.of(TableSemantics, CallContext, declared)` for the validation
side (called from `SystemTypeInference.resolveStaticArgs`) and a planner-side
adapter inside `BridgingSqlFunction.buildTraitContext` that sources the same
data from a `RexCall` + `RexTableArgCall`. Same logi [...]
+
+**Resolution.** Three call sites bake conditional traits into the operator's
effective signature:
+
+1. **Validation** — `SystemTypeInference.resolveStaticArgs` runs once each
from `inferInputTypes` and `inferType`. Twice per validation pass; can't dedupe
across Calcite hooks because each gets a different `CallContext` instance.
+2. **Planning** — `BridgingSqlFunction.resolveCallTraits` is called from
`FlinkLogicalTableFunctionScan.Converter.convert`. It rewrites the operator on
the `RexCall` so all downstream readers see the resolved view via plain
`function.getTypeInference().getStaticArguments()`.
+3. **Compiled-plan restore** — `BridgingSqlFunction.resolveCallTraits` is
called again from `StreamExecProcessTableFunction.@JsonCreator`, because the
JSON path skips the logical converter. Without this hook, restore would
silently produce wrong results for any conditional-trait PTF.
+
+The payoff: downstream rules, exec nodes, codegen, and changelog inference all
use ordinary `staticArg.is(SET_SEMANTIC_TABLE)` checks. No consumer needs to
know that conditional traits exist. Why three sites and not one. The three
resolution points exist because they sit in different lifecycles that can't
share state.
+
## Testing Patterns
Choose test types based on what you're changing:
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
index 1cad3c7ec86..059fcd6184d 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.functions.bridging;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.ContextResolvedFunction;
import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
@@ -30,17 +31,25 @@ import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.RexFactory;
+import org.apache.flink.table.planner.calcite.RexTableArgCall;
+import
org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.StaticArgument;
import org.apache.flink.table.types.inference.StaticArgumentTrait;
import org.apache.flink.table.types.inference.SystemTypeInference;
+import org.apache.flink.table.types.inference.TraitContext;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCallBinding;
+import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
@@ -50,8 +59,11 @@ import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.tools.RelBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
+import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static
org.apache.flink.table.planner.functions.bridging.BridgingUtils.createName;
import static
org.apache.flink.table.planner.functions.bridging.BridgingUtils.createSqlFunctionCategory;
@@ -216,6 +228,152 @@ public class BridgingSqlFunction extends SqlFunction {
return resolvedFunction.getDefinition().isDeterministic();
}
+ //
--------------------------------------------------------------------------------------------
+ // Conditional trait resolution
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Rewrites {@code call} so that the operator's {@link StaticArgument}s
have any conditional
+ * traits (see {@link StaticArgument#withConditionalTrait}) applied
against the call site
+ * (PARTITION BY, scalar literals). Downstream consumers can then treat
the operator's static
+ * arguments as the effective signature and use plain {@code
arg.is(SET_SEMANTIC_TABLE)} checks.
+ *
+ * <p>Called from the two places where a planner-level {@link RexCall} for
a PTF is first built
+ * for downstream consumption: {@code FlinkLogicalTableFunctionScan}
converter (fresh planning)
+ * and {@code StreamExecProcessTableFunction.@JsonCreator} (compiled-plan
restore). A no-op for
+ * non-PTF calls and for PTFs that declare no conditional traits.
+ */
+ public static RexCall resolveCallTraits(RexCall call) {
+ if (!(call.getOperator() instanceof BridgingSqlFunction)) {
+ return call;
+ }
+ final BridgingSqlFunction function = (BridgingSqlFunction)
call.getOperator();
+ final List<StaticArgument> declared =
+ function.typeInference.getStaticArguments().orElse(null);
+ if (declared == null ||
declared.stream().noneMatch(StaticArgument::hasConditionalTraits)) {
+ return call;
+ }
+ final List<RexNode> operands = call.getOperands();
+ final CallContext callContext = function.toCallContext(call);
+ final List<StaticArgument> resolved =
+ IntStream.range(0, declared.size())
+ .mapToObj(
+ i ->
+ resolveArg(
+ declared.get(i),
+ declared,
+ operands,
+ i,
+ callContext))
+ .collect(Collectors.toList());
+ if (resolved.equals(declared)) {
+ return call;
+ }
+ final BridgingSqlFunction rewritten =
function.withStaticArguments(resolved);
+ // Use a fresh RexBuilder from the function's own type factory so this
can run from a
+ // Jackson @JsonCreator that has no planner context.
+ return (RexCall)
+ new RexBuilder(function.typeFactory).makeCall(call.getType(),
rewritten, operands);
+ }
+
+ private static StaticArgument resolveArg(
+ StaticArgument declaredArg,
+ List<StaticArgument> declared,
+ List<RexNode> operands,
+ int index,
+ CallContext callContext) {
+ // We only resolve conditional traits for the Table Argument with
conditional traits
+ if (!declaredArg.hasConditionalTraits()
+ || !(operands.get(index) instanceof RexTableArgCall)) {
+ return declaredArg;
+ }
+ return declaredArg.applyConditionalTraits(
+ buildTraitContext((RexTableArgCall) operands.get(index),
declared, callContext));
+ }
+
+ /**
+ * Planner-side adapter to {@link TraitContext}. Sourced from a {@link
RexCall}: PARTITION BY
+ * via the {@link RexTableArgCall} operand, scalar literals via the {@link
CallContext} wrapper.
+ * Equivalent to {@link TraitContext#of} but takes its inputs from a
planner-side call instead
+ * of validation-side {@link
org.apache.flink.table.functions.TableSemantics}.
+ */
+ private static TraitContext buildTraitContext(
+ RexTableArgCall tableArgCall, List<StaticArgument> declared,
CallContext callContext) {
+ return new TraitContext() {
+ @Override
+ public boolean hasPartitionBy() {
+ return tableArgCall.getPartitionKeys().length > 0;
+ }
+
+ @Override
+ public <T> Optional<T> getScalarArgument(String name, Class<T>
clazz) {
+ for (int i = 0; i < declared.size(); i++) {
+ final StaticArgument arg = declared.get(i);
+ if (arg.is(StaticArgumentTrait.SCALAR) &&
arg.getName().equals(name)) {
+ return callContext.getArgumentValue(i, clazz);
+ }
+ }
+ return Optional.empty();
+ }
+ };
+ }
+
+ /**
+ * Builds a {@link CallContext} from the given {@link RexCall} for this
function. Wraps the call
+ * in an {@link OperatorBindingCallContext} so consumers (trait
resolution, codegen, etc.) read
+ * scalar arguments through the same coercion path as validation.
+ */
+ public CallContext toCallContext(RexCall call) {
+ return toCallContext(call, null, null, null);
+ }
+
+ /**
+ * Variant of {@link #toCallContext(RexCall)} that additionally exposes
the call's input time
+ * columns and changelog modes - needed by the streaming codegen path so
PTFs can specialize
+ * themselves to the exact call.
+ */
+ public CallContext toCallContext(
+ RexCall call,
+ @Nullable List<Integer> inputTimeColumns,
+ @Nullable List<ChangelogMode> inputChangelogModes,
+ @Nullable ChangelogMode outputChangelogMode) {
+ return new OperatorBindingCallContext(
+ dataTypeFactory,
+ getDefinition(),
+ RexCallBinding.create(typeFactory, call,
Collections.emptyList()),
+ call.getType(),
+ inputTimeColumns,
+ inputChangelogModes,
+ outputChangelogMode);
+ }
+
+ /**
+ * Returns a copy of this function whose {@link TypeInference} reports the
given static
+ * arguments. The wrapped input/output strategies are reused unchanged -
they ran at validation
+ * time and aren't invoked again afterwards.
+ */
+ private BridgingSqlFunction withStaticArguments(List<StaticArgument>
staticArguments) {
+ final TypeInference rewritten =
+ TypeInference.newBuilder()
+ .staticArguments(staticArguments)
+
.inputTypeStrategy(typeInference.getInputTypeStrategy())
+
.stateTypeStrategies(typeInference.getStateTypeStrategies())
+
.outputTypeStrategy(typeInference.getOutputTypeStrategy())
+
.disableSystemArguments(typeInference.disableSystemArguments())
+ .build();
+ if (this instanceof WithTableFunction) {
+ return new WithTableFunction(
+ dataTypeFactory,
+ typeFactory,
+ rexFactory,
+ getKind(),
+ resolvedFunction,
+ rewritten);
+ }
+ return new BridgingSqlFunction(
+ dataTypeFactory, typeFactory, rexFactory, getKind(),
resolvedFunction, rewritten);
+ }
+
//
--------------------------------------------------------------------------------------------
// Table function extension
//
--------------------------------------------------------------------------------------------
@@ -273,10 +431,12 @@ public class BridgingSqlFunction extends SqlFunction {
}
final StaticArgument arg = args.get(ordinal);
final TableCharacteristic.Semantics semantics;
- if (arg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) {
- semantics = TableCharacteristic.Semantics.ROW;
- } else if (arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
+ // Report SET semantics if it may apply - which allows the use of
PARTITION BY
+ if (arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)
+ ||
arg.hasConditionalTrait(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
semantics = TableCharacteristic.Semantics.SET;
+ } else if (arg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) {
+ semantics = TableCharacteristic.Semantics.ROW;
} else {
return null;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
index 12a20833068..3973329af74 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
@@ -38,6 +38,7 @@ import
org.apache.flink.table.planner.codegen.HashCodeGenerator;
import org.apache.flink.table.planner.codegen.ProcessTableRunnerGenerator;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
@@ -157,7 +158,10 @@ public class StreamExecProcessTableFunction extends
ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode
outputChangelogMode) {
super(id, context, persistedConfig, inputProperties, outputType,
description);
this.uid = uid;
- this.invocation = (RexCall) invocation;
+ // Mirror the FlinkLogicalTableFunctionScan converter for the
compiled-plan restore path:
+ // bake StaticArgument#withConditionalTrait rules into the operator's
static args so
+ // downstream code can use plain arg.is(SET_SEMANTIC_TABLE) checks.
+ this.invocation = BridgingSqlFunction.resolveCallTraits((RexCall)
invocation);
this.inputChangelogModes = inputChangelogModes;
this.outputChangelogMode = outputChangelogMode;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
index d478b3afcf0..17473ab19a7 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.functions.TemporalTableFunction;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
import org.apache.flink.table.planner.utils.ShortcutUtils;
@@ -115,7 +116,14 @@ public class FlinkLogicalTableFunctionScan extends
TableFunctionScan implements
functionScan.getInputs().stream()
.map(input -> RelOptRule.convert(input,
FlinkConventions.LOGICAL()))
.collect(Collectors.toList());
- final RexCall rexCall = (RexCall) functionScan.getCall();
+
+ // Resolve any StaticArgument#withConditionalTrait rules on the
operator against this
+ // call site (PARTITION BY, scalar literals). After this rewrite,
downstream code sees a
+ // BridgingSqlFunction whose getStaticArguments() reports the
effective signature, so
+ // simple staticArg.is(SET_SEMANTIC_TABLE) checks suffice.
+ final RexCall rexCall =
+ BridgingSqlFunction.resolveCallTraits((RexCall)
functionScan.getCall());
+
return new FlinkLogicalTableFunctionScan(
functionScan.getCluster(),
traitSet,
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
index 754fe1b3284..56a2bf1f6fc 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
@@ -22,13 +22,11 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.catalog.ContextResolvedFunction;
import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.RexTableArgCall;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
-import
org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction;
@@ -36,7 +34,6 @@ import
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFuncti
import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.ShortcutUtils;
-import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.StaticArgument;
import org.apache.flink.table.types.inference.StaticArgumentTrait;
import org.apache.flink.table.types.inference.SystemTypeInference;
@@ -56,7 +53,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexCallBinding;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
@@ -66,7 +62,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@@ -512,22 +507,4 @@ public class StreamPhysicalProcessTableFunction extends
AbstractRelNode
}
return ImmutableSet.copyOf(partitionColumnsPerArg);
}
-
- public static CallContext toCallContext(
- RexCall udfCall,
- List<Integer> inputTimeColumns,
- List<ChangelogMode> inputChangelogModes,
- @Nullable ChangelogMode outputChangelogMode) {
- final BridgingSqlFunction function =
ShortcutUtils.unwrapBridgingSqlFunction(udfCall);
- assert function != null;
- final FunctionDefinition definition =
ShortcutUtils.unwrapFunctionDefinition(udfCall);
- return new OperatorBindingCallContext(
- function.getDataTypeFactory(),
- definition,
- RexCallBinding.create(function.getTypeFactory(), udfCall,
Collections.emptyList()),
- udfCall.getType(),
- inputTimeColumns,
- inputChangelogModes,
- outputChangelogMode);
- }
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
index 4f1d8d65d41..52df803d5c8 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
@@ -34,7 +34,6 @@ import
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil
import
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil.{verifyFunctionAwareOutputType,
DefaultExpressionEvaluatorFactory}
import org.apache.flink.table.planner.delegation.PlannerBase
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalProcessTableFunction
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
import org.apache.flink.table.runtime.dataview.DataViewUtils
import org.apache.flink.table.runtime.dataview.StateListView.KeyedStateListView
@@ -77,11 +76,8 @@ object ProcessTableRunnerGenerator {
// For specialized functions, this call context is able to provide the
final changelog modes.
// Thus, functions can reconfigure themselves for the exact use case.
// Including updating their state layout.
- val callContext = StreamPhysicalProcessTableFunction.toCallContext(
- udfCall,
- inputTimeColumns,
- inputChangelogModes,
- outputChangelogMode)
+ val callContext =
+ function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes,
outputChangelogMode)
// Create the final UDF for runtime
val udf = UserDefinedFunctionHelper.createSpecializedFunction(
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index b6767e68ac6..6839cc1c28c 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.functions.{BuiltInFunctionDefinition,
ChangelogFunction}
import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext
import org.apache.flink.table.planner.calcite.{FlinkTypeFactory,
RexTableArgCall}
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
import org.apache.flink.table.planner.plan.`trait`._
import
org.apache.flink.table.planner.plan.`trait`.DeleteKindTrait.{deleteOnKeyOrNone,
fullDeleteOrNone, DELETE_BY_KEY}
import
org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone,
onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER}
@@ -852,10 +853,10 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
!modifyKindSet.isInsertOnly && tableArg.is(
StaticArgumentTrait.SUPPORT_UPDATES)
) {
- if (isPtfUpsert(tableArg, tableArgCall, child)) {
- UpdateKindTrait.ONLY_UPDATE_AFTER
- } else {
+ if (ptfRequiresUpdateBefore(tableArg, tableArgCall,
child)) {
UpdateKindTrait.BEFORE_AND_AFTER
+ } else {
+ UpdateKindTrait.ONLY_UPDATE_AFTER
}
} else {
UpdateKindTrait.NONE
@@ -1272,7 +1273,7 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
extractPtfTableArgComponents(process, child, inputArg)
if (
tableArg.is(StaticArgumentTrait.SUPPORT_UPDATES)
- && isPtfUpsert(tableArg, tableArgCall, child)
+ && !ptfRequiresUpdateBefore(tableArg, tableArgCall, child)
&& !tableArg.is(StaticArgumentTrait.REQUIRE_FULL_DELETE)
) {
this
@@ -1640,21 +1641,20 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
modeBuilder.build()
}
- private def isPtfUpsert(
+ /**
+ * Whether the PTF requires UPDATE_BEFORE from its input. Returns true
unless partition keys cover
+ * the upsert keys (co-located) and the argument doesn't explicitly require
UPDATE_BEFORE.
+ */
+ private def ptfRequiresUpdateBefore(
tableArg: StaticArgument,
tableArgCall: RexTableArgCall,
input: StreamPhysicalRel): Boolean = {
val partitionKeys = ImmutableBitSet.of(tableArgCall.getPartitionKeys: _*)
val fmq =
FlinkRelMetadataQuery.reuseOrCreate(input.getCluster.getMetadataQuery)
val upsertKeys = fmq.getUpsertKeys(input)
- if (
- upsertKeys == null || partitionKeys.isEmpty ||
!upsertKeys.contains(partitionKeys)
- || tableArg.is(StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)
- ) {
- false
- } else {
- true
- }
+ upsertKeys == null || partitionKeys.isEmpty ||
+ !upsertKeys.contains(partitionKeys) ||
+ tableArg.is(StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)
}
private def extractPtfTableArgComponents(
@@ -1674,11 +1674,9 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
outputChangelogMode: ChangelogMode): ChangelogContext = {
val udfCall = StreamPhysicalProcessTableFunction.toUdfCall(process.getCall)
val inputTimeColumns =
StreamPhysicalProcessTableFunction.toInputTimeColumns(process.getCall)
- val callContext = StreamPhysicalProcessTableFunction.toCallContext(
- udfCall,
- inputTimeColumns,
- inputChangelogModes,
- outputChangelogMode)
+ val function = udfCall.getOperator.asInstanceOf[BridgingSqlFunction]
+ val callContext =
+ function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes,
outputChangelogMode)
// Expose a simplified context to let users focus on important
characteristics.
// If necessary, we can expose the full CallContext in the future.
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
index ee9576b2e75..e98e28fc53a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
@@ -87,4 +87,21 @@ public class ToChangelogTest extends TableTestBase {
util.verifyRelPlan(
"SELECT * FROM TO_CHANGELOG(input => TABLE
insert_only_source)", CHANGELOG_MODE);
}
+
+ @Test
+ void testSetSemanticsWithPartitionBy() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE retract_source ("
+ + " id INT,"
+ + " name STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'changelog-mode' = 'I,UB,UA,D'"
+ + ")");
+ util.verifyRelPlan(
+ "SELECT * FROM TO_CHANGELOG(input => TABLE retract_source
PARTITION BY id)",
+ CHANGELOG_MODE);
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
index 91c73153bee..77133f4fe41 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
@@ -51,6 +51,26 @@ LogicalProject(op=[$0], id=[$1], name=[$2])
<![CDATA[
ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(),
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name],
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647)
name)], changelogMode=[I])
+- TableSourceScan(table=[[default_catalog, default_database,
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSetSemanticsWithPartitionBy">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source
PARTITION BY id)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], op=[$1], id0=[$2], name=[$3])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION
BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())],
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, INTEGER id0,
VARCHAR(2147483647) name)])
+ +- LogicalProject(id=[$0], name=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
retract_source]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG],
select=[id,op,id0,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647)
op, INTEGER id0, VARCHAR(2147483647) name)], changelogMode=[I])
++- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
]]>
</Resource>
</TestCase>