This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 026f233ef039be6d1739bf01daf2470a39c4f51d Author: lincoln lee <lincoln.8...@gmail.com> AuthorDate: Tue Sep 24 23:27:55 2024 +0800 [FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from FlinkRelMdModifiedMonotonicity --- .../metadata/FlinkRelMdModifiedMonotonicity.scala | 111 +++++++++++---------- .../nodes/physical/stream/StreamPhysicalRank.scala | 2 +- .../flink/table/planner/plan/utils/RankUtil.scala | 6 +- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 2 +- 4 files changed, 66 insertions(+), 55 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala index f25e2cebdd2..0f8b5e83ad8 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala @@ -29,6 +29,7 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalCo import org.apache.flink.table.planner.plan.nodes.physical.stream._ import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, IntermediateRelTable, TableSourceTable} import org.apache.flink.table.planner.plan.stats.{WithLower, WithUpper} +import org.apache.flink.table.planner.plan.utils.RankUtil import org.apache.flink.types.RowKind import org.apache.calcite.plan.hep.HepRelVertex @@ -186,70 +187,78 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon } def getRelModifiedMonotonicity(rel: Rank, mq: RelMetadataQuery): RelModifiedMonotonicity = { - val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq) - val inputMonotonicity = fmq.getRelModifiedMonotonicity(rel.getInput) + rel match { + case physicalRank: StreamPhysicalRank if RankUtil.isDeduplication(rel) => + getPhysicalRankModifiedMonotonicity(physicalRank, mq) - // If child monotonicity is null, we should return early. - if (inputMonotonicity == null) { - return null - } + case _ => + val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq) + val inputMonotonicity = fmq.getRelModifiedMonotonicity(rel.getInput) - // if partitionBy a update field or partitionBy a field whose mono is null, just return null - if (rel.partitionKey.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { - return null - } + // If child monotonicity is null, we should return early. + if (inputMonotonicity == null) { + return null + } - val fieldCount = rel.getRowType.getFieldCount + // if partitionBy a update field or partitionBy a field whose mono is null, just return null + if (rel.partitionKey.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { + return null + } - // init current mono - val currentMonotonicity = notMonotonic(fieldCount) - // 1. partitionBy field is CONSTANT - rel.partitionKey.foreach(e => currentMonotonicity.fieldMonotonicities(e) = CONSTANT) - // 2. row number filed is CONSTANT - if (rel.outputRankNumber) { - currentMonotonicity.fieldMonotonicities(fieldCount - 1) = CONSTANT - } - // 3. time attribute field is increasing - (0 until fieldCount).foreach( - e => { - if (FlinkTypeFactory.isTimeIndicatorType(rel.getRowType.getFieldList.get(e).getType)) { - inputMonotonicity.fieldMonotonicities(e) = INCREASING + val fieldCount = rel.getRowType.getFieldCount + + // init current mono + val currentMonotonicity = notMonotonic(fieldCount) + // 1. partitionBy field is CONSTANT + rel.partitionKey.foreach(e => currentMonotonicity.fieldMonotonicities(e) = CONSTANT) + // 2. row number filed is CONSTANT + if (rel.outputRankNumber) { + currentMonotonicity.fieldMonotonicities(fieldCount - 1) = CONSTANT + } + // 3. time attribute field is increasing + (0 until fieldCount).foreach( + e => { + if (FlinkTypeFactory.isTimeIndicatorType(rel.getRowType.getFieldList.get(e).getType)) { + inputMonotonicity.fieldMonotonicities(e) = INCREASING + } + }) + val fieldCollations = rel.orderKey.getFieldCollations + if (fieldCollations.nonEmpty) { + // 4. process the first collation field, we can only deduce the first collation field + val firstCollation = fieldCollations.get(0) + // Collation field index in child node will be same with Rank node, + // see ProjectToLogicalProjectAndWindowRule for details. + val fieldMonotonicity = + inputMonotonicity.fieldMonotonicities(firstCollation.getFieldIndex) + val result = fieldMonotonicity match { + case SqlMonotonicity.INCREASING | SqlMonotonicity.CONSTANT + if firstCollation.direction == RelFieldCollation.Direction.DESCENDING => + INCREASING + case SqlMonotonicity.DECREASING | SqlMonotonicity.CONSTANT + if firstCollation.direction == RelFieldCollation.Direction.ASCENDING => + DECREASING + case _ => NOT_MONOTONIC + } + currentMonotonicity.fieldMonotonicities(firstCollation.getFieldIndex) = result } - }) - val fieldCollations = rel.orderKey.getFieldCollations - if (fieldCollations.nonEmpty) { - // 4. process the first collation field, we can only deduce the first collation field - val firstCollation = fieldCollations.get(0) - // Collation field index in child node will be same with Rank node, - // see ProjectToLogicalProjectAndWindowRule for details. - val fieldMonotonicity = inputMonotonicity.fieldMonotonicities(firstCollation.getFieldIndex) - val result = fieldMonotonicity match { - case SqlMonotonicity.INCREASING | SqlMonotonicity.CONSTANT - if firstCollation.direction == RelFieldCollation.Direction.DESCENDING => - INCREASING - case SqlMonotonicity.DECREASING | SqlMonotonicity.CONSTANT - if firstCollation.direction == RelFieldCollation.Direction.ASCENDING => - DECREASING - case _ => NOT_MONOTONIC - } - currentMonotonicity.fieldMonotonicities(firstCollation.getFieldIndex) = result - } - currentMonotonicity + currentMonotonicity + } } - def getRelModifiedMonotonicity( - rel: StreamPhysicalDeduplicate, + private def getPhysicalRankModifiedMonotonicity( + rank: StreamPhysicalRank, mq: RelMetadataQuery): RelModifiedMonotonicity = { - if (allAppend(mq, rel.getInput)) { - if (rel.keepLastRow || rel.isRowtime) { + // Can't use RankUtil.canConvertToDeduplicate directly because modifyKindSetTrait is undefined. + if (allAppend(mq, rank.getInput)) { + if (RankUtil.keepLastDeduplicateRow(rank.orderKey) || rank.sortOnRowTime) { val mono = new RelModifiedMonotonicity( - Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC)) - rel.getUniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT) + Array.fill(rank.getRowType.getFieldCount)(NOT_MONOTONIC)) + rank.partitionKey.toArray.foreach(e => mono.fieldMonotonicities(e) = CONSTANT) mono } else { // FirstRow do not generate updates. - new RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(CONSTANT)) + new RelModifiedMonotonicity(Array.fill(rank.getRowType.getFieldCount)(CONSTANT)) } } else { null diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala index 084d59d9893..94136d35ddb 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala @@ -120,7 +120,7 @@ class StreamPhysicalRank( val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this) if (RankUtil.canConvertToDeduplicate(this)) { - val keepLastRow = RankUtil.keepLastRow(orderKey) + val keepLastRow = RankUtil.keepLastDeduplicateRow(orderKey) new StreamExecDeduplicate( unwrapTableConfig(this), diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala index d0f1efef198..8e6ceaa3cc8 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala @@ -392,8 +392,10 @@ object RankUtil { isDeduplication(rank) && inputInsertOnly && sortOnTimeAttributeOnly } - /** Determines if the given order key indicates that the last row should be kept. */ - def keepLastRow(orderKey: RelCollation): Boolean = { + /** + * Determines if the given order key indicates that the last row should be kept for deduplication. + */ + def keepLastDeduplicateRow(orderKey: RelCollation): Boolean = { // order by timeIndicator desc ==> lastRow, otherwise is firstRow if (orderKey.getFieldCollations.size() != 1) { return false diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index e9a224da2d3..0a818dfb0da 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -860,7 +860,7 @@ class FlinkRelMdHandlerTestBase { new RelDataTypeFieldImpl("rn", 7, longType), outputRankNumber = false, RankProcessStrategy.UNDEFINED_STRATEGY, - sortOnRowTime = false + sortOnRowTime = isRowtime ) val builder = typeFactory.builder()