fhueske commented on code in PR #28164:
URL: https://github.com/apache/flink/pull/28164#discussion_r3257326993
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,34 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Upsert output
Review Comment:
I'm wondering to what extend we need to document this behavior.
`FROM_CHANGELOG` converts and explicit changelog into a Flink dynamic table
that is internally handled. From a user's point of view, does it really matter
how Flink encodes changes? It shouldn't change the semantics of the following
SQL operations. Is using `upsert` changelog mode not "just" an internal
optimization?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -95,6 +98,50 @@ public Optional<List<DataType>> inferInputTypes(
return Optional.of(DataTypes.ROW(outputFields).notNull());
};
+ //
--------------------------------------------------------------------------------------------
+ // Changelog mode inference
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Emits an upsert changelog when the input is partitioned (set semantics)
and the resolved
+ * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code
UPDATE_BEFORE}. In all other
+ * cases the output is a retract changelog. When upsert mode is selected,
the partition key acts
+ * as the upsert key.
+ *
+ * <p>Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean)
upsert(false)})
+ * because the runtime forwards each input delete row with all fields
populated; only the {@link
+ * org.apache.flink.types.RowKind} is rewritten.
+ */
+ public static final ChangelogModeStrategy CHANGELOG_MODE_STRATEGY =
+ ctx -> isUpsertConfig(ctx) ? ChangelogMode.upsert(false) :
ChangelogMode.all();
+
+ /**
+ * Returns {@code true} when the FROM_CHANGELOG call should emit an upsert
changelog: the input
+ * table is partitioned AND the resolved {@code op_mapping} contains
{@code UPDATE_AFTER}
+ * without {@code UPDATE_BEFORE}. Falls back to {@code false} when the
mapping is absent or
+ * cannot be resolved as a literal, since the default mapping includes
both (retract).
+ */
+ @SuppressWarnings("unchecked")
+ public static boolean isUpsertConfig(final ChangelogContext ctx) {
+ final boolean partitioned =
+ ctx.getTableSemantics(ARG_TABLE)
+ .map(ts -> ts.partitionByColumns().length > 0)
+ .orElse(false);
+ if (!partitioned) {
+ return false;
+ }
+ final Optional<Map> opMapping = ctx.getArgumentValue(ARG_OP_MAPPING,
Map.class);
+ if (opMapping.isEmpty()) {
+ return false;
+ }
+ final Map<String, String> mapping = opMapping.get();
+ final boolean hasUpdateAfter =
+ mapping.values().stream().anyMatch(v ->
UPDATE_AFTER.equals(v.trim()));
Review Comment:
was already checked that `v` isn't null or might `v.trim()` throw a NPE?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -200,22 +247,47 @@ private static Optional<List<DataType>> validateOpMapping(
if (validationError.isPresent()) {
return validationError;
}
-
- final boolean hasUpdateBefore =
- mapping.values().stream().anyMatch(v ->
UPDATE_BEFORE.equals(v.trim()));
- final boolean hasUpdateAfter =
- mapping.values().stream().anyMatch(v ->
UPDATE_AFTER.equals(v.trim()));
- if (hasUpdateAfter && !hasUpdateBefore) {
- return callContext.fail(
- throwOnFailure,
- "The 'op_mapping' must include UPDATE_BEFORE for
retract mode. "
- + "Upsert mode (without UPDATE_BEFORE) is not
supported "
- + "in this version.");
+ final Optional<List<DataType>> upsertKeyError =
+ validateUpsertRequiresPartitionBy(callContext, mapping,
throwOnFailure);
+ if (upsertKeyError.isPresent()) {
+ return upsertKeyError;
}
}
return Optional.empty();
}
+ /**
+ * An {@code op_mapping} that produces {@code UPDATE_AFTER} without {@code
UPDATE_BEFORE}
+ * describes an upsert changelog. Upsert mode requires a key, so the input
table must use set
+ * semantics via {@code PARTITION BY}.
+ */
+ private static Optional<List<DataType>> validateUpsertRequiresPartitionBy(
Review Comment:
The return type of this method looks strange.
It never returns an actual `List`, only `Optional.empty()`.
I'd rather return a boolean and move the error message into
`validateOpMapping()`.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -200,6 +200,69 @@ public class FromChangelogTestPrograms {
+ "input => TABLE cdc_stream PARTITION BY
id)")
.build();
+ public static final TableTestProgram UPSERT_PARTITION_BY =
+ TableTestProgram.of(
+ "from-changelog-upsert-partition-by",
+ "PARTITION BY + op_mapping without UPDATE_BEFORE
produces an "
+ + "upsert changelog keyed on the partition
columns")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema("name STRING", "id INT", "op
STRING")
+ .producedValues(
+ Row.of("Alice", 1, "INSERT"),
+ Row.of("Bob", 2, "INSERT"),
+ Row.of("Alice2", 1,
"UPDATE_AFTER"),
+ Row.of("Bob", 2, "DELETE"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT PRIMARY KEY NOT
ENFORCED", "name STRING")
+ .consumedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"),
+ Row.ofKind(RowKind.UPDATE_AFTER,
1, "Alice2"),
+ Row.ofKind(RowKind.DELETE, 2,
"Bob"))
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ + "input => TABLE cdc_stream PARTITION BY
id, "
+ + "op_mapping => MAP["
+ + "'INSERT', 'INSERT', "
+ + "'UPDATE_AFTER', 'UPDATE_AFTER', "
+ + "'DELETE', 'DELETE'])")
+ .build();
+
+ public static final TableTestProgram UPSERT_PARTITION_BY_CUSTOM_MAPPING =
Review Comment:
Isn't the only difference to the other added test that the mapping uses "c"
instead of "INSERT"?
Isn't that essentially the same test?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -200,22 +247,47 @@ private static Optional<List<DataType>> validateOpMapping(
if (validationError.isPresent()) {
return validationError;
}
-
- final boolean hasUpdateBefore =
- mapping.values().stream().anyMatch(v ->
UPDATE_BEFORE.equals(v.trim()));
- final boolean hasUpdateAfter =
- mapping.values().stream().anyMatch(v ->
UPDATE_AFTER.equals(v.trim()));
- if (hasUpdateAfter && !hasUpdateBefore) {
- return callContext.fail(
- throwOnFailure,
- "The 'op_mapping' must include UPDATE_BEFORE for
retract mode. "
- + "Upsert mode (without UPDATE_BEFORE) is not
supported "
- + "in this version.");
+ final Optional<List<DataType>> upsertKeyError =
+ validateUpsertRequiresPartitionBy(callContext, mapping,
throwOnFailure);
+ if (upsertKeyError.isPresent()) {
+ return upsertKeyError;
}
}
return Optional.empty();
}
+ /**
+ * An {@code op_mapping} that produces {@code UPDATE_AFTER} without {@code
UPDATE_BEFORE}
+ * describes an upsert changelog. Upsert mode requires a key, so the input
table must use set
+ * semantics via {@code PARTITION BY}.
+ */
+ private static Optional<List<DataType>> validateUpsertRequiresPartitionBy(
+ final CallContext callContext,
+ final Map<String, String> mapping,
+ final boolean throwOnFailure) {
+ final boolean hasUpdateAfter =
+ mapping.values().stream().anyMatch(v ->
UPDATE_AFTER.equals(v.trim()));
+ final boolean hasUpdateBefore =
+ mapping.values().stream().anyMatch(v ->
UPDATE_BEFORE.equals(v.trim()));
+ if (!hasUpdateAfter || hasUpdateBefore) {
Review Comment:
Add a comment describing the rational of this condition.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -95,6 +98,50 @@ public Optional<List<DataType>> inferInputTypes(
return Optional.of(DataTypes.ROW(outputFields).notNull());
};
+ //
--------------------------------------------------------------------------------------------
+ // Changelog mode inference
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Emits an upsert changelog when the input is partitioned (set semantics)
and the resolved
+ * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code
UPDATE_BEFORE}. In all other
+ * cases the output is a retract changelog. When upsert mode is selected,
the partition key acts
+ * as the upsert key.
+ *
+ * <p>Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean)
upsert(false)})
+ * because the runtime forwards each input delete row with all fields
populated; only the {@link
+ * org.apache.flink.types.RowKind} is rewritten.
+ */
+ public static final ChangelogModeStrategy CHANGELOG_MODE_STRATEGY =
+ ctx -> isUpsertConfig(ctx) ? ChangelogMode.upsert(false) :
ChangelogMode.all();
+
+ /**
+ * Returns {@code true} when the FROM_CHANGELOG call should emit an upsert
changelog: the input
+ * table is partitioned AND the resolved {@code op_mapping} contains
{@code UPDATE_AFTER}
+ * without {@code UPDATE_BEFORE}. Falls back to {@code false} when the
mapping is absent or
+ * cannot be resolved as a literal, since the default mapping includes
both (retract).
+ */
+ @SuppressWarnings("unchecked")
+ public static boolean isUpsertConfig(final ChangelogContext ctx) {
Review Comment:
does this function need to be `public`?
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,34 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Upsert output
+
+When `PARTITION BY` is combined with an `op_mapping` that does NOT include
`UPDATE_BEFORE`, the output changelog is an upsert table keyed on the partition
columns. Each input row produces an `INSERT`, `UPDATE_AFTER`, or `DELETE` event
with the partition key acting as the upsert key.
+
+```sql
+-- Upsert input: INSERT / UPDATE_AFTER / DELETE only
+-- +I[id:1, op:'INSERT', name:'Alice']
+-- +I[id:2, op:'INSERT', name:'Bob']
+-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
+-- +I[id:2, op:'DELETE', name:'Bob']
+
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream PARTITION BY id,
+ op_mapping => MAP[
+ 'INSERT', 'INSERT',
+ 'UPDATE_AFTER', 'UPDATE_AFTER',
+ 'DELETE', 'DELETE']
+)
+
+-- Output (upsert changelog, upsert key = id):
+-- +I[id:1, name:'Alice']
+-- +I[id:2, name:'Bob']
+-- +U[id:1, name:'Alice2']
+-- -D[id:2, name:'Bob']
+```
+
+Without `PARTITION BY`, or when the active `op_mapping` includes
`UPDATE_BEFORE`, the output remains a retract changelog.
Review Comment:
Document that validation fails if there is no `PARTITION BY` and no
`UPDATE_BEFORE`?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -200,22 +247,47 @@ private static Optional<List<DataType>> validateOpMapping(
if (validationError.isPresent()) {
return validationError;
}
-
- final boolean hasUpdateBefore =
- mapping.values().stream().anyMatch(v ->
UPDATE_BEFORE.equals(v.trim()));
- final boolean hasUpdateAfter =
- mapping.values().stream().anyMatch(v ->
UPDATE_AFTER.equals(v.trim()));
- if (hasUpdateAfter && !hasUpdateBefore) {
- return callContext.fail(
- throwOnFailure,
- "The 'op_mapping' must include UPDATE_BEFORE for
retract mode. "
- + "Upsert mode (without UPDATE_BEFORE) is not
supported "
- + "in this version.");
+ final Optional<List<DataType>> upsertKeyError =
+ validateUpsertRequiresPartitionBy(callContext, mapping,
throwOnFailure);
+ if (upsertKeyError.isPresent()) {
+ return upsertKeyError;
}
}
return Optional.empty();
}
+ /**
+ * An {@code op_mapping} that produces {@code UPDATE_AFTER} without {@code
UPDATE_BEFORE}
+ * describes an upsert changelog. Upsert mode requires a key, so the input
table must use set
+ * semantics via {@code PARTITION BY}.
+ */
+ private static Optional<List<DataType>> validateUpsertRequiresPartitionBy(
+ final CallContext callContext,
+ final Map<String, String> mapping,
+ final boolean throwOnFailure) {
+ final boolean hasUpdateAfter =
+ mapping.values().stream().anyMatch(v ->
UPDATE_AFTER.equals(v.trim()));
+ final boolean hasUpdateBefore =
+ mapping.values().stream().anyMatch(v ->
UPDATE_BEFORE.equals(v.trim()));
+ if (!hasUpdateAfter || hasUpdateBefore) {
+ return Optional.empty();
+ }
+ final boolean partitioned =
+ callContext
+ .getTableSemantics(ARG_TABLE)
+ .map(ts -> ts.partitionByColumns().length > 0)
+ .orElse(false);
Review Comment:
Could be extracted into a helper method (is also used in `isUpsertConfig()`
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -200,22 +247,47 @@ private static Optional<List<DataType>> validateOpMapping(
if (validationError.isPresent()) {
return validationError;
}
-
- final boolean hasUpdateBefore =
- mapping.values().stream().anyMatch(v ->
UPDATE_BEFORE.equals(v.trim()));
- final boolean hasUpdateAfter =
- mapping.values().stream().anyMatch(v ->
UPDATE_AFTER.equals(v.trim()));
- if (hasUpdateAfter && !hasUpdateBefore) {
- return callContext.fail(
- throwOnFailure,
- "The 'op_mapping' must include UPDATE_BEFORE for
retract mode. "
- + "Upsert mode (without UPDATE_BEFORE) is not
supported "
- + "in this version.");
+ final Optional<List<DataType>> upsertKeyError =
+ validateUpsertRequiresPartitionBy(callContext, mapping,
throwOnFailure);
+ if (upsertKeyError.isPresent()) {
+ return upsertKeyError;
}
}
return Optional.empty();
}
+ /**
+ * An {@code op_mapping} that produces {@code UPDATE_AFTER} without {@code
UPDATE_BEFORE}
+ * describes an upsert changelog. Upsert mode requires a key, so the input
table must use set
+ * semantics via {@code PARTITION BY}.
+ */
+ private static Optional<List<DataType>> validateUpsertRequiresPartitionBy(
+ final CallContext callContext,
+ final Map<String, String> mapping,
+ final boolean throwOnFailure) {
+ final boolean hasUpdateAfter =
+ mapping.values().stream().anyMatch(v ->
UPDATE_AFTER.equals(v.trim()));
Review Comment:
same question about NPE as above
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -200,6 +200,69 @@ public class FromChangelogTestPrograms {
+ "input => TABLE cdc_stream PARTITION BY
id)")
.build();
+ public static final TableTestProgram UPSERT_PARTITION_BY =
+ TableTestProgram.of(
+ "from-changelog-upsert-partition-by",
+ "PARTITION BY + op_mapping without UPDATE_BEFORE
produces an "
+ + "upsert changelog keyed on the partition
columns")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema("name STRING", "id INT", "op
STRING")
+ .producedValues(
+ Row.of("Alice", 1, "INSERT"),
+ Row.of("Bob", 2, "INSERT"),
+ Row.of("Alice2", 1,
"UPDATE_AFTER"),
+ Row.of("Bob", 2, "DELETE"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT PRIMARY KEY NOT
ENFORCED", "name STRING")
+ .consumedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"),
+ Row.ofKind(RowKind.UPDATE_AFTER,
1, "Alice2"),
+ Row.ofKind(RowKind.DELETE, 2,
"Bob"))
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ + "input => TABLE cdc_stream PARTITION BY
id, "
+ + "op_mapping => MAP["
+ + "'INSERT', 'INSERT', "
+ + "'UPDATE_AFTER', 'UPDATE_AFTER', "
+ + "'DELETE', 'DELETE'])")
+ .build();
+
+ public static final TableTestProgram UPSERT_PARTITION_BY_CUSTOM_MAPPING =
Review Comment:
Rather add a validation test for the case of `UPDATE_AFTER` without
`UPDATE_BEFORE` and `PARTITION BY`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]