maropu commented on a change in pull request #32494: URL: https://github.com/apache/spark/pull/32494#discussion_r633519921
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/UnionEstimation.scala ########## @@ -81,40 +93,86 @@ object UnionEstimation { attrStats.get(attr).isDefined && attrStats(attr).hasMinMaxStats } } + val attrToComputeNullCount = union.children.map(_.output).transpose.zipWithIndex.filter { + case (attrs, _) => attrs.zipWithIndex.forall { + case (attr, childIndex) => + val attrStats = union.children(childIndex).stats.attributeStats + attrStats.get(attr).isDefined && attrStats(attr).nullCount.isDefined + } + } + (attrToComputeMinMaxStats, attrToComputeNullCount) + } - val newAttrStats = if (attrToComputeMinMaxStats.nonEmpty) { + /** This method computes the min-max statistics and return the attribute stats Map. */ + private def computeMinMaxAttributeStatsMap( + union: Union, + attrToComputeMinMaxStats: Seq[(Seq[Attribute], Int)]) = { + val unionOutput = union.output + val attrStatsWithMinMax = if (attrToComputeMinMaxStats.nonEmpty) { val outputAttrStats = new ArrayBuffer[(Attribute, ColumnStat)]() attrToComputeMinMaxStats.foreach { case (attrs, outputIndex) => val dataType = unionOutput(outputIndex).dataType val statComparator = createStatComparator(dataType) val minMaxValue = attrs.zipWithIndex.foldLeft[(Option[Any], Option[Any])]((None, None)) { - case ((minVal, maxVal), (attr, childIndex)) => - val colStat = union.children(childIndex).stats.attributeStats(attr) - val min = if (minVal.isEmpty || statComparator(colStat.min.get, minVal.get)) { - colStat.min - } else { - minVal - } - val max = if (maxVal.isEmpty || statComparator(maxVal.get, colStat.max.get)) { - colStat.max - } else { - maxVal - } - (min, max) - } + case ((minVal, maxVal), (attr, childIndex)) => + val colStat = union.children(childIndex).stats.attributeStats(attr) + val min = if (minVal.isEmpty || statComparator(colStat.min.get, minVal.get)) { + colStat.min + } else { + minVal + } + val max = if (maxVal.isEmpty || statComparator(maxVal.get, colStat.max.get)) { + colStat.max + } else { + maxVal + } + (min, max) + } val newStat = ColumnStat(min = minMaxValue._1, max = minMaxValue._2) outputAttrStats += unionOutput(outputIndex) -> newStat } AttributeMap(outputAttrStats.toSeq) } else { AttributeMap.empty[ColumnStat] } + attrStatsWithMinMax + } - Some( - Statistics( - sizeInBytes = sizeInBytes, - rowCount = outputRows, - attributeStats = newAttrStats)) + /** This method computes the null count statistics and update the attrStatsWithMinMax Map. */ + private def updateAttributeStatsMapWithNullCount( + union: Union, + attrStatsWithMinMax: AttributeMap[ColumnStat], + attrToComputeNullCount: Seq[(Seq[Attribute], Int)]) = { + val unionOutput = union.output + + val newAttrStats = if (attrToComputeNullCount.nonEmpty) { + val outputAttrStats = new ArrayBuffer[(Attribute, ColumnStat)]() + attrToComputeNullCount.foreach { Review comment: We can write it like this? `val outputAttrStats = attrToComputeNullCount.map {`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org