This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f8b2d81a97a681d67964ef17932ea0abf35730f
Author: lincoln lee <lincoln.8...@gmail.com>
AuthorDate: Tue Sep 24 23:43:25 2024 +0800

    [FLINK-34702][table-planner] Remove physical node StreamPhysicalDeduplicate
---
 .../TemporalJoinRewriteWithUniqueKeyRule.java      |  6 +-
 .../stream/StreamPhysicalDeduplicate.scala         | 87 ----------------------
 .../physical/stream/StreamPhysicalRankRule.scala   |  7 +-
 .../flink/table/planner/plan/utils/RankUtil.scala  |  6 +-
 4 files changed, 8 insertions(+), 98 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java
index 342cafd2526..b6e3f1eaf83 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java
@@ -20,10 +20,10 @@ package org.apache.flink.table.planner.plan.rules.logical;
 
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate;
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank;
 import 
org.apache.flink.table.planner.plan.rules.common.CommonTemporalTableJoinRule;
 import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil;
@@ -57,8 +57,8 @@ import scala.collection.JavaConverters;
  * a table source or a view only if it contains the unique key and time 
attribute.
  *
  * <p>Flink supports extract the primary key and row time attribute from the 
view if the view comes
- * from {@link StreamPhysicalRank} node which can convert to a {@link 
StreamPhysicalDeduplicate}
- * node.
+ * from {@link StreamPhysicalRank} node which can convert to a {@link 
StreamExecDeduplicate} node
+ * finally.
  */
 @Value.Enclosing
 public class TemporalJoinRewriteWithUniqueKeyRule
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala
deleted file mode 100644
index dbc5e60043d..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.nodes.physical.stream
-
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
-import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate
-import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils
-import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-
-import java.util
-
-/**
- * TODO to be removed after FLINK-34702 is fixed. Stream physical RelNode 
which deduplicate on keys
- * and keeps only first row or last row. This node is an optimization of 
[[StreamPhysicalRank]] for
- * some special cases. Compared to [[StreamPhysicalRank]], this node could use 
mini-batch and access
- * less state.
- */
-class StreamPhysicalDeduplicate(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputRel: RelNode,
-    uniqueKeys: Array[Int],
-    val isRowtime: Boolean,
-    val keepLastRow: Boolean)
-  extends SingleRel(cluster, traitSet, inputRel)
-  with StreamPhysicalRel {
-
-  def getUniqueKeys: Array[Int] = uniqueKeys
-
-  override def requireWatermark: Boolean = isRowtime
-
-  override def deriveRowType(): RelDataType = getInput.getRowType
-
-  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
-    new StreamPhysicalDeduplicate(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      uniqueKeys,
-      isRowtime,
-      keepLastRow)
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    val fieldNames = getRowType.getFieldNames
-    val orderString = if (isRowtime) "ROWTIME" else "PROCTIME"
-    val keep = if (keepLastRow) "LastRow" else "FirstRow"
-    super
-      .explainTerms(pw)
-      .item("keep", keep)
-      .item("key", uniqueKeys.map(fieldNames.get).mkString(", "))
-      .item("order", orderString)
-  }
-
-  override def translateToExecNode(): ExecNode[_] = {
-    val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
-    new StreamExecDeduplicate(
-      unwrapTableConfig(this),
-      uniqueKeys,
-      isRowtime,
-      keepLastRow,
-      generateUpdateBefore,
-      InputProperty.DEFAULT,
-      FlinkTypeFactory.toLogicalRowType(getRowType),
-      getRelDetailedDescription)
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
index 0c878a8fdcc..63fbee077b4 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
@@ -20,7 +20,7 @@ package 
org.apache.flink.table.planner.plan.rules.physical.stream
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate,
 StreamPhysicalRank}
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank
 import org.apache.flink.table.planner.plan.utils.{RankProcessStrategy, 
RankUtil}
 
 import org.apache.calcite.plan.RelOptRule
@@ -28,10 +28,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.convert.ConverterRule.Config
 
-/**
- * Rule that converts [[FlinkLogicalRank]] with fetch to 
[[StreamPhysicalRank]]. NOTES: the rank can
- * not be converted to [[StreamPhysicalDeduplicate]].
- */
+/** Rule that converts [[FlinkLogicalRank]] with fetch to 
[[StreamPhysicalRank]]. */
 class StreamPhysicalRankRule(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
index 8e6ceaa3cc8..748a364f921 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.ExpressionReducer
 import org.apache.flink.table.planner.plan.nodes.calcite.Rank
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate,
 StreamPhysicalRank}
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalRank, 
StreamPhysicalWindowDeduplicate}
 import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
ConstantRankRangeWithoutEnd, RankRange, RankType, VariableRankRange}
 
 import org.apache.calcite.plan.RelOptUtil
@@ -327,7 +327,7 @@ object RankUtil {
   }
 
   /**
-   * Whether the given rank could be converted to 
[[StreamPhysicalDeduplicate]].
+   * Whether the given rank could be converted to 
[[StreamPhysicalWindowDeduplicate]].
    *
    * Returns true if the given rank is sorted by time attribute and limits 1 
and its RankFunction is
    * ROW_NUMBER, else false.
@@ -335,7 +335,7 @@ object RankUtil {
    * @param rank
    *   The [[FlinkLogicalRank]] node
    * @return
-   *   True if the input rank could be converted to 
[[StreamPhysicalDeduplicate]]
+   *   True if the input rank could be converted to 
[[StreamPhysicalWindowDeduplicate]]
    */
   def canConvertToDeduplicate(rank: FlinkLogicalRank): Boolean = {
     val sortCollation = rank.orderKey

Reply via email to