raminqaf commented on code in PR #28199:
URL: https://github.com/apache/flink/pull/28199#discussion_r3267453988


##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -369,6 +381,13 @@ When `PARTITION BY` is provided, **the output schema 
changes**. The partition ke
 
 Prefer row semantics, when possible. `PARTITION BY` is only necessary when 
downstream operators are keyed on that column and you want to co-locate rows 
for the same key in the same parallel operator instance.
 
+#### Avoiding ChangelogNormalize for upsert sources
+
+When the input is an upsert source (emits `UPDATE_AFTER` but no 
`UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by 
default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. 
This operator is stateful and can be expensive. When `PARTITION BY` is 
provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit 
the corresponding kinds:
+
+* Omit `UPDATE_BEFORE` from `op_mapping` to skip `UPDATE_BEFORE` 
materialization.
+* If the source emits partial `DELETE` events (only the keys flow through, 
common with Flink's `upsert-kafka` connector or other key-compacted topics), 
it's necessary to omit `DELETE` from `op_mapping` to skip the full-`DELETE` 
materialization step that also happens in `ChangelogNormalize`.

Review Comment:
   I think, this is way to detailed for the Flink user. I think if the user 
wants to partition their data on a specific key they do it and if they don't 
they will a just avoid using parition by clause



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java:
##########
@@ -108,6 +118,35 @@ private static Map<RowKind, String> buildOpMap(@Nullable 
final Map<String, Strin
         return result;
     }
 
+    /**
+     * Rejects mappings that reference change operations the input changelog 
cannot produce. Without
+     * this check the extra entries are dead code: the corresponding rows 
never arrive, so the user
+     * gets silently incorrect plans (for upsert input also a wasted {@code 
ChangelogNormalize}).
+     *
+     * <p>Lives here rather than in the input type strategy because {@link
+     * TableSemantics#changelogMode()} returns empty during type inference and 
is only populated at
+     * specialization time, which is when this constructor runs.
+     */
+    private static void validateAgainstInputChangelogMode(
+            final Map<RowKind, String> mapping, final TableSemantics 
tableSemantics) {
+        final ChangelogMode inputMode = 
tableSemantics.changelogMode().orElse(null);
+        if (inputMode == null) {
+            return;
+        }
+        final Set<RowKind> unsupported =
+                mapping.keySet().stream()
+                        .filter(kind -> !inputMode.contains(kind))
+                        .collect(Collectors.toCollection(TreeSet::new));

Review Comment:
   Why a TreeSet and not a normal HashSet?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java:
##########
@@ -77,6 +82,11 @@ public ToChangelogFunction(final SpecializedContext context) 
{
         final Map<String, String> opMapping =
                 callContext.getArgumentValue(2, Map.class).orElse(null);
         this.rawOpMap = buildOpMap(opMapping);
+        if (opMapping != null) {
+            // Only user-supplied mappings are validated. The default mapping 
covers all kinds by
+            // design and is harmless for insert-only or upsert inputs.
+            validateAgainstInputChangelogMode(this.rawOpMap, tableSemantics);
+        }

Review Comment:
   We can move this to before calling `buildOpMap`.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java:
##########
@@ -77,6 +82,11 @@ public ToChangelogFunction(final SpecializedContext context) 
{
         final Map<String, String> opMapping =
                 callContext.getArgumentValue(2, Map.class).orElse(null);
         this.rawOpMap = buildOpMap(opMapping);
+        if (opMapping != null) {
+            // Only user-supplied mappings are validated. The default mapping 
covers all kinds by
+            // design and is harmless for insert-only or upsert inputs.
+            validateAgainstInputChangelogMode(this.rawOpMap, tableSemantics);
+        }

Review Comment:
   Could you elaborate why we need this? Is there a way to have test for this?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java:
##########
@@ -108,6 +118,35 @@ private static Map<RowKind, String> buildOpMap(@Nullable 
final Map<String, Strin
         return result;
     }
 
+    /**
+     * Rejects mappings that reference change operations the input changelog 
cannot produce. Without
+     * this check the extra entries are dead code: the corresponding rows 
never arrive, so the user
+     * gets silently incorrect plans (for upsert input also a wasted {@code 
ChangelogNormalize}).
+     *
+     * <p>Lives here rather than in the input type strategy because {@link
+     * TableSemantics#changelogMode()} returns empty during type inference and 
is only populated at
+     * specialization time, which is when this constructor runs.
+     */
+    private static void validateAgainstInputChangelogMode(
+            final Map<RowKind, String> mapping, final TableSemantics 
tableSemantics) {
+        final ChangelogMode inputMode = 
tableSemantics.changelogMode().orElse(null);
+        if (inputMode == null) {
+            return;
+        }
+        final Set<RowKind> unsupported =
+                mapping.keySet().stream()
+                        .filter(kind -> !inputMode.contains(kind))
+                        .collect(Collectors.toCollection(TreeSet::new));

Review Comment:
   Can't we just use `anyMatch` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to