raminqaf commented on code in PR #28164:
URL: https://github.com/apache/flink/pull/28164#discussion_r3259736698
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -95,6 +98,50 @@ public Optional<List<DataType>> inferInputTypes(
return Optional.of(DataTypes.ROW(outputFields).notNull());
};
+ //
--------------------------------------------------------------------------------------------
+ // Changelog mode inference
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Emits an upsert changelog when the input is partitioned (set semantics)
and the resolved
+ * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code
UPDATE_BEFORE}. In all other
+ * cases the output is a retract changelog. When upsert mode is selected,
the partition key acts
+ * as the upsert key.
+ *
+ * <p>Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean)
upsert(false)})
+ * because the runtime forwards each input delete row with all fields
populated; only the {@link
+ * org.apache.flink.types.RowKind} is rewritten.
+ */
+ public static final ChangelogModeStrategy CHANGELOG_MODE_STRATEGY =
+ ctx -> isUpsertConfig(ctx) ? ChangelogMode.upsert(false) :
ChangelogMode.all();
+
+ /**
+ * Returns {@code true} when the FROM_CHANGELOG call should emit an upsert
changelog: the input
+ * table is partitioned AND the resolved {@code op_mapping} contains
{@code UPDATE_AFTER}
+ * without {@code UPDATE_BEFORE}. Falls back to {@code false} when the
mapping is absent or
+ * cannot be resolved as a literal, since the default mapping includes
both (retract).
+ */
+ @SuppressWarnings("unchecked")
+ public static boolean isUpsertConfig(final ChangelogContext ctx) {
Review Comment:
This can be (package-)private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]