gustavodemorais commented on code in PR #26879:
URL: https://github.com/apache/flink/pull/26879#discussion_r2259661252
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -1052,33 +1052,29 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
val inputChangelogMode =
ChangelogPlanUtils.getChangelogMode(sink.getInput.asInstanceOf[StreamPhysicalRel]).get
val primaryKeys =
sink.contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
- val upsertMaterialize =
-
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE)
match {
- case UpsertMaterialize.FORCE => primaryKeys.nonEmpty
- case UpsertMaterialize.NONE => false
- case UpsertMaterialize.AUTO =>
- val sinkAcceptInsertOnly = sink.tableSink
- .getChangelogMode(inputChangelogMode)
- .containsOnly(RowKind.INSERT)
- val inputInsertOnly =
inputChangelogMode.containsOnly(RowKind.INSERT)
-
- if (!sinkAcceptInsertOnly && !inputInsertOnly &&
primaryKeys.nonEmpty) {
- val pks = ImmutableBitSet.of(primaryKeys: _*)
- val fmq =
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
- val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
- // if input has update and primary key != upsert key (upsert key
can be null) we should
- // enable upsertMaterialize. An optimize is: do not enable
upsertMaterialize when sink
- // pk(s) contains input changeLogUpsertKeys
- if (changeLogUpsertKeys == null ||
!changeLogUpsertKeys.exists(pks.contains)) {
- true
- } else {
- false
- }
- } else {
- false
- }
- }
- upsertMaterialize
+ val sinkChangelogMode =
sink.tableSink.getChangelogMode(inputChangelogMode)
+ val inputIsAppend = inputChangelogMode.containsOnly(RowKind.INSERT)
+ val sinkIsAppend = sinkChangelogMode.containsOnly(RowKind.INSERT)
+ val sinkIsRetract = sinkChangelogMode.contains(RowKind.UPDATE_BEFORE)
+
+
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE)
match {
+ case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract
Review Comment:
Regarding the two changes
- Auto mode: makes sense we don't add SUM if the sinkIsRetract
- Force: Do we know where `UpsertMaterialize.FORCE` is used? Is it safe to
"not respect" the force here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]