Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5347#discussion_r163877879 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala --- @@ -149,26 +152,42 @@ trait CommonCalc { // conditions, etc. We only want to account for computations, not for simple projections. // CASTs in RexProgram are reduced as far as possible by ReduceExpressionsRule // in normalization stage. So we should ignore CASTs here in optimization stage. - val compCnt = calcProgram.getExprList.asScala.toList.count { - case _: RexInputRef => false - case _: RexLiteral => false - case c: RexCall if c.getOperator.getName.equals("CAST") => false - case _ => true - } + // Also, we add 1 to take calc RelNode number into consideration, so the cost of merged calc + // RelNode will less than the total cost of un-merged calcs. + val compCnt = calcProgram.getExprList.asScala.toList.count(isCom(_)) + 1 - val newRowCnt = estimateRowCount(calcProgram, rowCnt) + val newRowCnt = estimateRowCount(rexBuilder, calcProgram, rowCnt) planner.getCostFactory.makeCost(newRowCnt, newRowCnt * compCnt, 0) } private[flink] def estimateRowCount( + rexBuilder: RexBuilder, calcProgram: RexProgram, rowCnt: Double): Double = { if (calcProgram.getCondition != null) { // we reduce the result card to push filters down - (rowCnt * 0.75).max(1.0) + val exprs = RexUtil.composeConjunction( --- End diff -- use `calcProgram.expandLocalRef(calcProgram.getCondition())` to only include the expressions that are relevant for the condition.
---