Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2025#discussion_r64549056 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala --- @@ -69,16 +73,23 @@ class DataSetUnion( rows + metadata.getRowCount(child) } - planner.getCostFactory.makeCost(rowCnt, 0, 0) + planner.getCostFactory.makeCost( + rowCnt, + if (all) 0 else rowCnt, + if (all) 0 else rowCnt) } override def translateToPlan( tableEnv: BatchTableEnvironment, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]] + val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType) + val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType) + if (all) { + leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]] + } else { + leftDataSet.union(rightDataSet).distinct().asInstanceOf[DataSet[Any]] --- End diff -- This method should called by the optimizer when a new `DataSetUnion` node is created to estimate the cost of the subplan (the new node + all recursive input nodes). If there is a cheaper plan that does the same thing, the more expensive plan is discarded. So, you have a plan with a non-union all operator. The optimizer knows the cost of this plan. Then, the `UnionToDistinctRule` is called and a new union + distinct operators are created. For both, the `computeSelfCost` method is called to compute the cost estimate of the plan and then the cheaper of both plans is preserved. (This is a bit simplified, because the optimization rules are applied on `LogicalRel` nodes but the cost estimation happens on the `DataSetRel` nodes).
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---