Repository: spark Updated Branches: refs/heads/master a6aade004 -> 9ac68dbc5
[SPARK-17549][SQL] Revert "[] Only collect table size stat in driver for cached relation." This reverts commit 39e2bad6a866d27c3ca594d15e574a1da3ee84cc because of the problem mentioned at https://issues.apache.org/jira/browse/SPARK-17549?focusedCommentId=15505060&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15505060 Author: Yin Huai <yh...@databricks.com> Closes #15157 from yhuai/revert-SPARK-17549. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ac68dbc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ac68dbc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ac68dbc Branch: refs/heads/master Commit: 9ac68dbc5720026ea92acc61d295ca64d0d3d132 Parents: a6aade0 Author: Yin Huai <yh...@databricks.com> Authored: Tue Sep 20 11:53:57 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Tue Sep 20 11:53:57 2016 -0700 ---------------------------------------------------------------------- .../execution/columnar/InMemoryRelation.scala | 24 +++++++++++++++----- .../columnar/InMemoryColumnarQuerySuite.scala | 14 ------------ 2 files changed, 18 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9ac68dbc/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 56bd5c1..479934a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.columnar +import scala.collection.JavaConverters._ + import org.apache.commons.lang3.StringUtils import org.apache.spark.network.util.JavaUtils @@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.LongAccumulator +import org.apache.spark.util.CollectionAccumulator object InMemoryRelation { @@ -61,7 +63,8 @@ case class InMemoryRelation( @transient child: SparkPlan, tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, - val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) + val batchStats: CollectionAccumulator[InternalRow] = + child.sqlContext.sparkContext.collectionAccumulator[InternalRow]) extends logical.LeafNode with MultiInstanceRelation { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) @@ -71,12 +74,21 @@ case class InMemoryRelation( @transient val partitionStatistics = new PartitionStatistics(output) override lazy val statistics: Statistics = { - if (batchStats.value == 0L) { + if (batchStats.value.isEmpty) { // Underlying columnar RDD hasn't been materialized, no useful statistics information // available, return the default statistics. Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) } else { - Statistics(sizeInBytes = batchStats.value.longValue) + // Underlying columnar RDD has been materialized, required information has also been + // collected via the `batchStats` accumulator. + val sizeOfRow: Expression = + BindReferences.bindReference( + output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), + partitionStatistics.schema) + + val sizeInBytes = + batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum + Statistics(sizeInBytes = sizeInBytes) } } @@ -127,10 +139,10 @@ case class InMemoryRelation( rowCount += 1 } - batchStats.add(totalSize) - val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) .flatMap(_.values)) + + batchStats.add(stats) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats) http://git-wip-us.apache.org/repos/asf/spark/blob/9ac68dbc/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 0daa29b..9378396 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -232,18 +232,4 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val columnTypes2 = List.fill(length2)(IntegerType) val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2) } - - test("SPARK-17549: cached table size should be correctly calculated") { - val data = spark.sparkContext.parallelize(1 to 10, 5).toDF() - val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan - val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None) - - // Materialize the data. - val expectedAnswer = data.collect() - checkAnswer(cached, expectedAnswer) - - // Check that the right size was calculated. - assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize) - } - } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org