gustavodemorais commented on code in PR #28164:
URL: https://github.com/apache/flink/pull/28164#discussion_r3264978825
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,44 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Upsert table
+
+To generate an upsert table, two requirements must be met:
+
+* **Key partitioning**: use `PARTITION BY <key>`, where the partition key
corresponds to the unique/primary key of the dataset.
+* **Op mapping configuration**: the `op_mapping` must include `UPDATE_AFTER`
and must NOT include `UPDATE_BEFORE`.
+
+The engine assumes that the keys provided in the `PARTITION BY` clause
function as the unique upsert keys. The resulting output changelog becomes an
upsert table keyed on these partition columns. Each incoming row is evaluated
and produces `INSERT`, `UPDATE_AFTER`, or `DELETE` events, using the partition
key as the explicit upsert key. Therefore, if the incoming changelog contains
unique keys (such as a primary key), they **must** be used in the `PARTITION
BY` clause.
+
+<span class="label label-danger">Note</span>
+- An `op_mapping` that produces `UPDATE_AFTER` without `UPDATE_BEFORE`
describes an upsert changelog and requires a key. `PARTITION BY` must be
present on the table argument; otherwise the call would produce key-less
updates and is rejected at validation time with a `ValidationException`.
Review Comment:
The note is a bit repetitive, we already stated that it's an upsert table
and we have explicit above that the partition by is required
```suggestion
An `op_mapping` that produces `UPDATE_AFTER` without `UPDATE_BEFORE`
describes an upsert changelog and requires a key. The engine assumes that the
keys provided in the `PARTITION BY` clause function as the unique upsert keys.
The resulting output changelog becomes an upsert table keyed on these partition
columns. Each incoming row is evaluated and produces `INSERT`, `UPDATE_AFTER`,
or `DELETE` events, using the partition key as the explicit upsert key.
Therefore, if the incoming changelog contains unique keys (such as a primary
key), they **must** be used in the `PARTITION BY` clause.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -226,6 +226,18 @@ public interface PartitionedTable {
* <p>For row semantics (each row processed independently), use {@link
Table#fromChangelog} on
* the unpartitioned table.
*
+ * <p>Output changelog mode:
+ *
+ * <ul>
+ * <li><b>Retract</b> (default): the active {@code op_mapping} includes
{@code UPDATE_BEFORE}
+ * or no updates at all. The output emits {@code INSERT}, {@code
UPDATE_BEFORE}, {@code
+ * UPDATE_AFTER}, and {@code DELETE}.
+ * <li><b>Upsert</b>: the {@code op_mapping} maps to {@code
UPDATE_AFTER} without {@code
+ * UPDATE_BEFORE}. The output emits {@code INSERT}, {@code
UPDATE_AFTER}, and full {@code
Review Comment:
hmmm, it wasn't :D. I see "full"
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala:
##########
@@ -614,7 +614,32 @@ abstract class TableTestUtilBase(test: TableTestBase,
isStreamingMode: Boolean)
/** Java-friendly overload that accepts a list of [[ExplainDetail]]s. */
def verifyRelPlan(query: String, extraDetails:
java.util.List[ExplainDetail]): Unit = {
- verifyRelPlan(query, extraDetails.asScala.toSeq: _*)
+ verifyRelPlan(query, extraDetails.asScala: _*)
+ }
+
+ /**
+ * Verify the AST and the optimized rel plan for the given SELECT query. The
rendered optimized
+ * rel plan includes the `upsertKeys=[...]` term for rel nodes that derive
upsert keys.
+ */
+ def verifyRelPlanWithUpsertKey(query: String, extraDetails: ExplainDetail*):
Unit = {
Review Comment:
This does not seem to fit the style of the utilities we have here. We don
have unique verifyRelPlan for each param we can configure and rather use the
extraDetail list. Take a look at how withDuplicateChanges and
withQueryBlockAlias were implemented
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]