This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 026f233ef039be6d1739bf01daf2470a39c4f51d
Author: lincoln lee <lincoln.8...@gmail.com>
AuthorDate: Tue Sep 24 23:27:55 2024 +0800

    [FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from 
FlinkRelMdModifiedMonotonicity
---
 .../metadata/FlinkRelMdModifiedMonotonicity.scala  | 111 +++++++++++----------
 .../nodes/physical/stream/StreamPhysicalRank.scala |   2 +-
 .../flink/table/planner/plan/utils/RankUtil.scala  |   6 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   2 +-
 4 files changed, 66 insertions(+), 55 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
index f25e2cebdd2..0f8b5e83ad8 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalCo
 import org.apache.flink.table.planner.plan.nodes.physical.stream._
 import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, 
IntermediateRelTable, TableSourceTable}
 import org.apache.flink.table.planner.plan.stats.{WithLower, WithUpper}
+import org.apache.flink.table.planner.plan.utils.RankUtil
 import org.apache.flink.types.RowKind
 
 import org.apache.calcite.plan.hep.HepRelVertex
@@ -186,70 +187,78 @@ class FlinkRelMdModifiedMonotonicity private extends 
MetadataHandler[ModifiedMon
   }
 
   def getRelModifiedMonotonicity(rel: Rank, mq: RelMetadataQuery): 
RelModifiedMonotonicity = {
-    val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
-    val inputMonotonicity = fmq.getRelModifiedMonotonicity(rel.getInput)
+    rel match {
+      case physicalRank: StreamPhysicalRank if RankUtil.isDeduplication(rel) =>
+        getPhysicalRankModifiedMonotonicity(physicalRank, mq)
 
-    // If child monotonicity is null, we should return early.
-    if (inputMonotonicity == null) {
-      return null
-    }
+      case _ =>
+        val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+        val inputMonotonicity = fmq.getRelModifiedMonotonicity(rel.getInput)
 
-    // if partitionBy a update field or partitionBy a field whose mono is 
null, just return null
-    if (rel.partitionKey.exists(e => inputMonotonicity.fieldMonotonicities(e) 
!= CONSTANT)) {
-      return null
-    }
+        // If child monotonicity is null, we should return early.
+        if (inputMonotonicity == null) {
+          return null
+        }
 
-    val fieldCount = rel.getRowType.getFieldCount
+        // if partitionBy a update field or partitionBy a field whose mono is 
null, just return null
+        if (rel.partitionKey.exists(e => 
inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) {
+          return null
+        }
 
-    // init current mono
-    val currentMonotonicity = notMonotonic(fieldCount)
-    // 1. partitionBy field is CONSTANT
-    rel.partitionKey.foreach(e => currentMonotonicity.fieldMonotonicities(e) = 
CONSTANT)
-    // 2. row number filed is CONSTANT
-    if (rel.outputRankNumber) {
-      currentMonotonicity.fieldMonotonicities(fieldCount - 1) = CONSTANT
-    }
-    // 3. time attribute field is increasing
-    (0 until fieldCount).foreach(
-      e => {
-        if 
(FlinkTypeFactory.isTimeIndicatorType(rel.getRowType.getFieldList.get(e).getType))
 {
-          inputMonotonicity.fieldMonotonicities(e) = INCREASING
+        val fieldCount = rel.getRowType.getFieldCount
+
+        // init current mono
+        val currentMonotonicity = notMonotonic(fieldCount)
+        // 1. partitionBy field is CONSTANT
+        rel.partitionKey.foreach(e => 
currentMonotonicity.fieldMonotonicities(e) = CONSTANT)
+        // 2. row number filed is CONSTANT
+        if (rel.outputRankNumber) {
+          currentMonotonicity.fieldMonotonicities(fieldCount - 1) = CONSTANT
+        }
+        // 3. time attribute field is increasing
+        (0 until fieldCount).foreach(
+          e => {
+            if 
(FlinkTypeFactory.isTimeIndicatorType(rel.getRowType.getFieldList.get(e).getType))
 {
+              inputMonotonicity.fieldMonotonicities(e) = INCREASING
+            }
+          })
+        val fieldCollations = rel.orderKey.getFieldCollations
+        if (fieldCollations.nonEmpty) {
+          // 4. process the first collation field, we can only deduce the 
first collation field
+          val firstCollation = fieldCollations.get(0)
+          // Collation field index in child node will be same with Rank node,
+          // see ProjectToLogicalProjectAndWindowRule for details.
+          val fieldMonotonicity =
+            inputMonotonicity.fieldMonotonicities(firstCollation.getFieldIndex)
+          val result = fieldMonotonicity match {
+            case SqlMonotonicity.INCREASING | SqlMonotonicity.CONSTANT
+                if firstCollation.direction == 
RelFieldCollation.Direction.DESCENDING =>
+              INCREASING
+            case SqlMonotonicity.DECREASING | SqlMonotonicity.CONSTANT
+                if firstCollation.direction == 
RelFieldCollation.Direction.ASCENDING =>
+              DECREASING
+            case _ => NOT_MONOTONIC
+          }
+          
currentMonotonicity.fieldMonotonicities(firstCollation.getFieldIndex) = result
         }
-      })
-    val fieldCollations = rel.orderKey.getFieldCollations
-    if (fieldCollations.nonEmpty) {
-      // 4. process the first collation field, we can only deduce the first 
collation field
-      val firstCollation = fieldCollations.get(0)
-      // Collation field index in child node will be same with Rank node,
-      // see ProjectToLogicalProjectAndWindowRule for details.
-      val fieldMonotonicity = 
inputMonotonicity.fieldMonotonicities(firstCollation.getFieldIndex)
-      val result = fieldMonotonicity match {
-        case SqlMonotonicity.INCREASING | SqlMonotonicity.CONSTANT
-            if firstCollation.direction == 
RelFieldCollation.Direction.DESCENDING =>
-          INCREASING
-        case SqlMonotonicity.DECREASING | SqlMonotonicity.CONSTANT
-            if firstCollation.direction == 
RelFieldCollation.Direction.ASCENDING =>
-          DECREASING
-        case _ => NOT_MONOTONIC
-      }
-      currentMonotonicity.fieldMonotonicities(firstCollation.getFieldIndex) = 
result
-    }
 
-    currentMonotonicity
+        currentMonotonicity
+    }
   }
 
-  def getRelModifiedMonotonicity(
-      rel: StreamPhysicalDeduplicate,
+  private def getPhysicalRankModifiedMonotonicity(
+      rank: StreamPhysicalRank,
       mq: RelMetadataQuery): RelModifiedMonotonicity = {
-    if (allAppend(mq, rel.getInput)) {
-      if (rel.keepLastRow || rel.isRowtime) {
+    // Can't use RankUtil.canConvertToDeduplicate directly because 
modifyKindSetTrait is undefined.
+    if (allAppend(mq, rank.getInput)) {
+      if (RankUtil.keepLastDeduplicateRow(rank.orderKey) || 
rank.sortOnRowTime) {
         val mono = new RelModifiedMonotonicity(
-          Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC))
-        rel.getUniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT)
+          Array.fill(rank.getRowType.getFieldCount)(NOT_MONOTONIC))
+        rank.partitionKey.toArray.foreach(e => mono.fieldMonotonicities(e) = 
CONSTANT)
         mono
       } else {
         // FirstRow do not generate updates.
-        new 
RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(CONSTANT))
+        new 
RelModifiedMonotonicity(Array.fill(rank.getRowType.getFieldCount)(CONSTANT))
       }
     } else {
       null
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala
index 084d59d9893..94136d35ddb 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala
@@ -120,7 +120,7 @@ class StreamPhysicalRank(
     val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
 
     if (RankUtil.canConvertToDeduplicate(this)) {
-      val keepLastRow = RankUtil.keepLastRow(orderKey)
+      val keepLastRow = RankUtil.keepLastDeduplicateRow(orderKey)
 
       new StreamExecDeduplicate(
         unwrapTableConfig(this),
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
index d0f1efef198..8e6ceaa3cc8 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
@@ -392,8 +392,10 @@ object RankUtil {
     isDeduplication(rank) && inputInsertOnly && sortOnTimeAttributeOnly
   }
 
-  /** Determines if the given order key indicates that the last row should be 
kept. */
-  def keepLastRow(orderKey: RelCollation): Boolean = {
+  /**
+   * Determines if the given order key indicates that the last row should be 
kept for deduplication.
+   */
+  def keepLastDeduplicateRow(orderKey: RelCollation): Boolean = {
     // order by timeIndicator desc ==> lastRow, otherwise is firstRow
     if (orderKey.getFieldCollations.size() != 1) {
       return false
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index e9a224da2d3..0a818dfb0da 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -860,7 +860,7 @@ class FlinkRelMdHandlerTestBase {
       new RelDataTypeFieldImpl("rn", 7, longType),
       outputRankNumber = false,
       RankProcessStrategy.UNDEFINED_STRATEGY,
-      sortOnRowTime = false
+      sortOnRowTime = isRowtime
     )
 
     val builder = typeFactory.builder()

Reply via email to