JingsongLi commented on a change in pull request #17699:
URL: https://github.com/apache/flink/pull/17699#discussion_r753993288



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
       }
     }
+
+    private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+      val childModifyKindSet = getModifyKindSet(sink.getInput)
+      val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+      val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+      val sinkTrait = UpdateKindTrait.fromChangelogMode(
+        sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+      val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+        // if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+        // to beforeAndAfter mode for the correctness
+        var shouldFallback: Boolean = false
+        val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema

Review comment:
       Maybe you just want `getPrimaryKeyIndexes`?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
       }
     }
+
+    private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+      val childModifyKindSet = getModifyKindSet(sink.getInput)
+      val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+      val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+      val sinkTrait = UpdateKindTrait.fromChangelogMode(
+        sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+      val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+        // if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+        // to beforeAndAfter mode for the correctness
+        var shouldFallback: Boolean = false
+        val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+            
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+        if (sinkDefinedPks.nonEmpty) {
+          val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+          val sinkPks = 
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+          val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+          val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+          // if input is UA only, primary key != upsert key (upsert key can be 
null) we should
+          // fallback to beforeAndAfter.
+          // Notice: even sink pk(s) contains input upsert key we cannot 
optimize to UA only,
+          // this differs from batch job's unique key inference
+          if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0
+              || !changeLogUpsertKeys.exists {0 == _.compareTo(sinkPks)}) {

Review comment:
       `compareTo`  -> `equals`?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
       }
     }
+
+    private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+      val childModifyKindSet = getModifyKindSet(sink.getInput)
+      val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+      val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+      val sinkTrait = UpdateKindTrait.fromChangelogMode(
+        sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+      val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+        // if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+        // to beforeAndAfter mode for the correctness
+        var shouldFallback: Boolean = false
+        val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+            
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+        if (sinkDefinedPks.nonEmpty) {
+          val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+          val sinkPks = 
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+          val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+          val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+          // if input is UA only, primary key != upsert key (upsert key can be 
null) we should
+          // fallback to beforeAndAfter.
+          // Notice: even sink pk(s) contains input upsert key we cannot 
optimize to UA only,
+          // this differs from batch job's unique key inference
+          if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0
+              || !changeLogUpsertKeys.exists {0 == _.compareTo(sinkPks)}) {
+            shouldFallback = true
+          }
+        }
+        if (shouldFallback) {
+          Seq(beforeAndAfter)
+        } else {
+          Seq(onlyAfter, beforeAndAfter)
+        }
+      } else if (sinkTrait.equals(BEFORE_AND_AFTER)){
+        Seq(beforeAndAfter)
+      } else {
+        Seq(UpdateKindTrait.NONE)
+      }
+      sinkRequiredTraits
+    }
+
+    private def analyzeUpsertMaterializeStrategy(sink: StreamPhysicalSink): 
Boolean = {

Review comment:
       Can we merge this method to `inferSinkRequiredTraits`? For example, 
`inferSinkRequiredTraits` returns `(Seq[UpdateKindTrait], Boolean)`

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
       }
     }
+
+    private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+      val childModifyKindSet = getModifyKindSet(sink.getInput)
+      val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+      val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+      val sinkTrait = UpdateKindTrait.fromChangelogMode(
+        sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+      val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+        // if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+        // to beforeAndAfter mode for the correctness
+        var shouldFallback: Boolean = false

Review comment:
       requireBeforeAndAfter?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
       }
     }
+
+    private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+      val childModifyKindSet = getModifyKindSet(sink.getInput)
+      val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+      val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+      val sinkTrait = UpdateKindTrait.fromChangelogMode(
+        sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+      val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+        // if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+        // to beforeAndAfter mode for the correctness
+        var shouldFallback: Boolean = false
+        val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+            
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+        if (sinkDefinedPks.nonEmpty) {
+          val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+          val sinkPks = 
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+          val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+          val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+          // if input is UA only, primary key != upsert key (upsert key can be 
null) we should
+          // fallback to beforeAndAfter.
+          // Notice: even sink pk(s) contains input upsert key we cannot 
optimize to UA only,
+          // this differs from batch job's unique key inference
+          if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0

Review comment:
       remove `changeLogUpsertKeys.size() == 0`?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
       }
     }
+
+    private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+      val childModifyKindSet = getModifyKindSet(sink.getInput)
+      val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+      val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+      val sinkTrait = UpdateKindTrait.fromChangelogMode(
+        sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+      val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+        // if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+        // to beforeAndAfter mode for the correctness
+        var shouldFallback: Boolean = false
+        val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+            
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+        if (sinkDefinedPks.nonEmpty) {
+          val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+          val sinkPks = 
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+          val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+          val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+          // if input is UA only, primary key != upsert key (upsert key can be 
null) we should
+          // fallback to beforeAndAfter.
+          // Notice: even sink pk(s) contains input upsert key we cannot 
optimize to UA only,
+          // this differs from batch job's unique key inference
+          if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0
+              || !changeLogUpsertKeys.exists {0 == _.compareTo(sinkPks)}) {
+            shouldFallback = true
+          }
+        }
+        if (shouldFallback) {
+          Seq(beforeAndAfter)
+        } else {
+          Seq(onlyAfter, beforeAndAfter)
+        }
+      } else if (sinkTrait.equals(BEFORE_AND_AFTER)){
+        Seq(beforeAndAfter)
+      } else {
+        Seq(UpdateKindTrait.NONE)
+      }
+      sinkRequiredTraits
+    }
+
+    private def analyzeUpsertMaterializeStrategy(sink: StreamPhysicalSink): 
Boolean = {
+      val tableConfig = 
sink.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext])
+          .getTableConfig
+      val inputChangelogMode = ChangelogPlanUtils.getChangelogMode(
+        sink.getInput.asInstanceOf[StreamPhysicalRel]).get
+      val catalogTable = sink.catalogTable
+      val primaryKeys = toScala(catalogTable.getResolvedSchema

Review comment:
       Ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to