fhueske commented on code in PR #28199:
URL: https://github.com/apache/flink/pull/28199#discussion_r3272273190
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java:
##########
@@ -68,4 +70,30 @@ static TraitCondition not(final TraitCondition condition) {
return new BuiltInCondition(
BuiltInCondition.Kind.NOT, List.of(condition), ctx ->
!condition.test(ctx));
}
+
+ /**
+ * True when the named {@code MAP<STRING, STRING>} scalar argument has a
key that, after
+ * splitting on comma and trimming each part, equals {@code key}. Returns
true when the argument
+ * is omitted, on the assumption that an absent argument means the
function falls back to a
+ * default that includes all keys.
+ */
+ @SuppressWarnings("rawtypes")
+ static TraitCondition mapArgIncludesKey(final String argName, final String
key) {
+ return new BuiltInCondition(
+ BuiltInCondition.Kind.MAP_ARG_INCLUDES_KEY,
+ List.of(argName, key),
+ ctx ->
+ ctx.getScalarArgument(argName, Map.class)
+ .map(map -> mapKeysContain(map, key))
+ .orElse(true));
+ }
Review Comment:
would it make sense to have this defined more generic like
```suggestion
static <X> TraitCondition argMatches(final String argName, final Class<X>
argClass, final Predicate<X> predicate) {
return new BuiltInCondition(
BuiltInCondition.Kind.ARG_MATCHES,
List.of(argName, argClass, predicate),
ctx ->
ctx.getScalarArgument(argName, argClass)
.stream().anyMatch(predicate));
}
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -184,6 +184,37 @@ public class ToChangelogTestPrograms {
.runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t)")
.build();
+ public static final TableTestProgram UPSERT_PARTITION_BY =
Review Comment:
does it make sense to have a test program without full deletes?
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -369,6 +381,13 @@ When `PARTITION BY` is provided, **the output schema
changes**. The partition ke
Prefer row semantics, when possible. `PARTITION BY` is only necessary when
downstream operators are keyed on that column and you want to co-locate rows
for the same key in the same parallel operator instance.
+#### Avoiding ChangelogNormalize for upsert sources
+
+When the input is an upsert source (emits `UPDATE_AFTER` but no
`UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by
default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads.
This operator is stateful and can be expensive. When `PARTITION BY` is
provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit
the corresponding kinds:
Review Comment:
```suggestion
When a query includes an upsert source input (emits `UPDATE_AFTER` but no
`UPDATE_BEFORE`), the planner typically inserts a `ChangelogNormalize` operator
to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This
operator is stateful and can be expensive.
By wrapping the upsert source input in a properly configured `TO_CHANGELOG`
function, we can avoid the `ChangelogNormalize` operator. For this, the
function's table input requires a `PARTITION BY` and an `op_mapping` that does
not emit the corresponding kinds:
```
I would rephase this a bit and make clear that this is in the context of the
`TO_CHANGELOG` function.
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -369,6 +381,13 @@ When `PARTITION BY` is provided, **the output schema
changes**. The partition ke
Prefer row semantics, when possible. `PARTITION BY` is only necessary when
downstream operators are keyed on that column and you want to co-locate rows
for the same key in the same parallel operator instance.
+#### Avoiding ChangelogNormalize for upsert sources
+
+When the input is an upsert source (emits `UPDATE_AFTER` but no
`UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by
default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads.
This operator is stateful and can be expensive. When `PARTITION BY` is
provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit
the corresponding kinds:
Review Comment:
I wonder if this argument isn't a bit misleading. We "promise" that users
can avoid `ChangelogNormatlize` by using `TO_CHANGELOG` but don't explain what
it really means.
They would need to "manually" handle all changes because Flink only sees an
append-only stream.
So it is certainly not a silver bullet and only applicable in a few cases
that allow for manual change handling (which most users won't be able to do,
IMO).
So isn't this more like an escape hatch for expert users?
If so, it should be clearly positioned as such, IMO.
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -369,6 +381,13 @@ When `PARTITION BY` is provided, **the output schema
changes**. The partition ke
Prefer row semantics, when possible. `PARTITION BY` is only necessary when
downstream operators are keyed on that column and you want to co-locate rows
for the same key in the same parallel operator instance.
+#### Avoiding ChangelogNormalize for upsert sources
+
+When the input is an upsert source (emits `UPDATE_AFTER` but no
`UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by
default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads.
This operator is stateful and can be expensive. When `PARTITION BY` is
provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit
the corresponding kinds:
+
+* Omit `UPDATE_BEFORE` from `op_mapping` to skip `UPDATE_BEFORE`
materialization.
+* If the source emits partial `DELETE` events (only the keys flow through,
common with Flink's `upsert-kafka` connector or other key-compacted topics),
it's necessary to omit `DELETE` from `op_mapping` to skip the full-`DELETE`
materialization step that also happens in `ChangelogNormalize`.
Review Comment:
isn't it a bit unintuitive to support partial deletes by not declaring a
`DELETE` key in the map?
The function would still emit deletes (partial, not full) although they
aren't configured.
Or am I misunderstanding something?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java:
##########
@@ -108,6 +118,35 @@ private static Map<RowKind, String> buildOpMap(@Nullable
final Map<String, Strin
return result;
}
+ /**
+ * Rejects mappings that reference change operations the input changelog
cannot produce. Without
+ * this check the extra entries are dead code: the corresponding rows
never arrive, so the user
+ * gets silently incorrect plans (for upsert input also a wasted {@code
ChangelogNormalize}).
+ *
+ * <p>Lives here rather than in the input type strategy because {@link
+ * TableSemantics#changelogMode()} returns empty during type inference and
is only populated at
+ * specialization time, which is when this constructor runs.
+ */
+ private static void validateAgainstInputChangelogMode(
+ final Map<RowKind, String> mapping, final TableSemantics
tableSemantics) {
+ final ChangelogMode inputMode =
tableSemantics.changelogMode().orElse(null);
+ if (inputMode == null) {
+ return;
+ }
+ final Set<RowKind> unsupported =
+ mapping.keySet().stream()
+ .filter(kind -> !inputMode.contains(kind))
+ .collect(Collectors.toCollection(TreeSet::new));
Review Comment:
with `anyMatch` we couldn't list all ops that aren't present in the input.
But a `List<RowKind>` would suffice as well (`keySet` ensures no duplicates).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]