lincoln-lil commented on a change in pull request #17699:
URL: https://github.com/apache/flink/pull/17699#discussion_r754006462



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
       }
     }
+
+    private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+      val childModifyKindSet = getModifyKindSet(sink.getInput)
+      val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+      val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+      val sinkTrait = UpdateKindTrait.fromChangelogMode(
+        sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+      val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+        // if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+        // to beforeAndAfter mode for the correctness
+        var shouldFallback: Boolean = false
+        val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+            
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+        if (sinkDefinedPks.nonEmpty) {
+          val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+          val sinkPks = 
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+          val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+          val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+          // if input is UA only, primary key != upsert key (upsert key can be 
null) we should
+          // fallback to beforeAndAfter.
+          // Notice: even sink pk(s) contains input upsert key we cannot 
optimize to UA only,
+          // this differs from batch job's unique key inference
+          if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0

Review comment:
       This should be reserved because the metadata query may return a empty 
`changeLogUpsertKeys` set.

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
       }
     }
+
+    private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+      val childModifyKindSet = getModifyKindSet(sink.getInput)
+      val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+      val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+      val sinkTrait = UpdateKindTrait.fromChangelogMode(
+        sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+      val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+        // if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+        // to beforeAndAfter mode for the correctness
+        var shouldFallback: Boolean = false
+        val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+            
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+        if (sinkDefinedPks.nonEmpty) {
+          val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+          val sinkPks = 
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+          val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+          val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+          // if input is UA only, primary key != upsert key (upsert key can be 
null) we should
+          // fallback to beforeAndAfter.
+          // Notice: even sink pk(s) contains input upsert key we cannot 
optimize to UA only,
+          // this differs from batch job's unique key inference
+          if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0
+              || !changeLogUpsertKeys.exists {0 == _.compareTo(sinkPks)}) {
+            shouldFallback = true
+          }
+        }
+        if (shouldFallback) {
+          Seq(beforeAndAfter)
+        } else {
+          Seq(onlyAfter, beforeAndAfter)
+        }
+      } else if (sinkTrait.equals(BEFORE_AND_AFTER)){
+        Seq(beforeAndAfter)
+      } else {
+        Seq(UpdateKindTrait.NONE)
+      }
+      sinkRequiredTraits
+    }
+
+    private def analyzeUpsertMaterializeStrategy(sink: StreamPhysicalSink): 
Boolean = {

Review comment:
       Initially I put the two methods together, but seems a little bit 
complex, and the two methods do the different things indeed, so I change to the 
current version.

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
       }
     }
+
+    private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+      val childModifyKindSet = getModifyKindSet(sink.getInput)
+      val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+      val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+      val sinkTrait = UpdateKindTrait.fromChangelogMode(
+        sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+      val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+        // if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+        // to beforeAndAfter mode for the correctness
+        var shouldFallback: Boolean = false
+        val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema

Review comment:
       Good point! I simply moved the code from `StreamPhysicalSink` to here, 
`getPrimaryKeyIndexes` is more simpler.

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
       }
     }
+
+    private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
+      val childModifyKindSet = getModifyKindSet(sink.getInput)
+      val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+      val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+      val sinkTrait = UpdateKindTrait.fromChangelogMode(
+        sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+      val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+        // if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
+        // to beforeAndAfter mode for the correctness
+        var shouldFallback: Boolean = false

Review comment:
       yes, it's more clearly.

##########
File path: docs/layouts/shortcodes/generated/execution_config_configuration.html
##########
@@ -58,6 +58,12 @@
             <td><p>Enum</p></td>
             <td>The NOT NULL column constraint on a table enforces that null 
values can't be inserted into the table. Flink supports 'error' (default) and 
'drop' enforcement behavior. By default, Flink will check values and throw 
runtime exception when null values writing into NOT NULL columns. Users can 
change the behavior to 'drop' to silently drop such records without throwing 
exception.<br /><br />Possible 
values:<ul><li>"ERROR"</li><li>"DROP"</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>table.exec.sink.pk-shuffle</h5><br> <span class="label 
label-primary">Streaming</span></td>

Review comment:
       Okay, I thought the target issue is the same one. I'll create a separate 
pr.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to