Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2025#discussion_r64549056
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
---
@@ -69,16 +73,23 @@ class DataSetUnion(
rows + metadata.getRowCount(child)
}
- planner.getCostFactory.makeCost(rowCnt, 0, 0)
+ planner.getCostFactory.makeCost(
+ rowCnt,
+ if (all) 0 else rowCnt,
+ if (all) 0 else rowCnt)
}
override def translateToPlan(
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
- val leftDataSet =
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val rightDataSet =
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+ val leftDataSet =
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+ val rightDataSet =
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+ if (all) {
+ leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+ } else {
+ leftDataSet.union(rightDataSet).distinct().asInstanceOf[DataSet[Any]]
--- End diff --
This method should called by the optimizer when a new `DataSetUnion` node
is created to estimate the cost of the subplan (the new node + all recursive
input nodes). If there is a cheaper plan that does the same thing, the more
expensive plan is discarded.
So, you have a plan with a non-union all operator. The optimizer knows the
cost of this plan. Then, the `UnionToDistinctRule` is called and a new union +
distinct operators are created. For both, the `computeSelfCost` method is
called to compute the cost estimate of the plan and then the cheaper of both
plans is preserved. (This is a bit simplified, because the optimization rules
are applied on `LogicalRel` nodes but the cost estimation happens on the
`DataSetRel` nodes).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---