Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143228054 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -73,25 +73,37 @@ case class BroadcastExchangeExec( try { val beforeCollect = System.nanoTime() // Note that we use .executeCollect() because we don't want to convert data to Scala types - val input: Array[InternalRow] = child.executeCollect() - if (input.length >= 512000000) { + val (numRows, input) = child.executeCollectIterator() + if (numRows >= 512000000) { throw new SparkException( - s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows") + s"Cannot broadcast the table with more than 512 millions rows: $numRows rows") } + val beforeBuild = System.nanoTime() longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000 - val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum + + // Construct the relation. + val relation = mode.transform(input, Some(numRows)) + + val dataSize = relation match { + case map: HashedRelation => + map.estimatedSize --- End diff -- True, but I think using the accurate size is better. Maybe we should revisit the 8gb maximum instead of using a size that isn't correct. What's the reason for the 8gb max? In master, building a broadcast table just under the 8gb limit would require 8gb for the rows (what we actually count here), plus the size of those rows compressed, which is probably at least 1gb. So we're talking about a pretty huge broadcast table to hit the case where this check will cause failures that didn't happen before. Not that it couldn't happen, but I've never seen this in practice and we have some huge jobs.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org