[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r163401858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -60,7 +62,8 @@ case class InMemoryRelation( @transient child: SparkPlan, tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) +val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, +statsOfPlanToCache: Statistics = null) --- End diff -- eh...we do not have other options, it's more like a placeholder, since InMemoryRelation is created by CacheManager through apply() in companion object it's no harm here IMHO --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r163347494 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -60,7 +62,8 @@ case class InMemoryRelation( @transient child: SparkPlan, tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) +val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, +statsOfPlanToCache: Statistics = null) --- End diff -- Why we set this to `null`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19864 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157653784 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +485,43 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + withTempPath { workDir => +withTable("table1") { + val workDirPath = workDir.getAbsolutePath + val data = Seq(100, 200, 300, 400).toDF("count") + data.write.parquet(workDirPath) + val dfFromFile = spark.read.parquet(workDirPath).cache() + val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { +case plan: InMemoryRelation => plan + }.head + // InMemoryRelation's stats is file size before the underlying RDD is materialized + assert(inMemoryRelation.computeStats().sizeInBytes === 740) + + // InMemoryRelation's stats is updated after materializing RDD + dfFromFile.collect() + assert(inMemoryRelation.computeStats().sizeInBytes === 16) + + // test of catalog table + val dfFromTable = spark.catalog.createTable("table1", workDirPath).cache() + val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. +collect { case plan: InMemoryRelation => plan }.head + + // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats + // is calculated + assert(inMemoryRelation2.computeStats().sizeInBytes === 740) + + // InMemoryRelation's stats should be updated after calculating stats of the table + // clear cache to simulate a fresh environment + dfFromTable.unpersist(blocking = true) + spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") + val inMemoryRelation3 = spark.read.table("table1").cache().queryExecution.optimizedPlan. +collect { case plan: InMemoryRelation => plan }.head + assert(inMemoryRelation3.computeStats().sizeInBytes === 48) --- End diff -- because 16 is the `exact` in-memory size which is got by reading the accumulator's value after evaluating the RDD 48 is calculated by EstimationUtils: https://github.com/apache/spark/blob/bdb5e55c2a67d16a36ad6baa22296d714d3525af/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala#L78 `(8 + 4 (average attribute length)) * 4` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157652482 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +485,43 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + withTempPath { workDir => +withTable("table1") { + val workDirPath = workDir.getAbsolutePath + val data = Seq(100, 200, 300, 400).toDF("count") + data.write.parquet(workDirPath) + val dfFromFile = spark.read.parquet(workDirPath).cache() + val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { +case plan: InMemoryRelation => plan + }.head + // InMemoryRelation's stats is file size before the underlying RDD is materialized + assert(inMemoryRelation.computeStats().sizeInBytes === 740) + + // InMemoryRelation's stats is updated after materializing RDD + dfFromFile.collect() + assert(inMemoryRelation.computeStats().sizeInBytes === 16) + + // test of catalog table + val dfFromTable = spark.catalog.createTable("table1", workDirPath).cache() + val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. +collect { case plan: InMemoryRelation => plan }.head + + // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats + // is calculated + assert(inMemoryRelation2.computeStats().sizeInBytes === 740) + + // InMemoryRelation's stats should be updated after calculating stats of the table + // clear cache to simulate a fresh environment + dfFromTable.unpersist(blocking = true) + spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") + val inMemoryRelation3 = spark.read.table("table1").cache().queryExecution.optimizedPlan. +collect { case plan: InMemoryRelation => plan }.head + assert(inMemoryRelation3.computeStats().sizeInBytes === 48) --- End diff -- missed this one, why does it have a different stats than the table cache stats `16`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157427138 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +485,43 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + withTempDir { workDir => +withTable("table1") { + val workDirPath = workDir.getAbsolutePath + "/table1" --- End diff -- then we don't need to append `/table1` to the path --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157427036 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +485,43 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + withTempDir { workDir => +withTable("table1") { + val workDirPath = workDir.getAbsolutePath + "/table1" --- End diff -- seems you can use `withTempPath`, which just gives you a path string without creating it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157381615 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +485,35 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + val workDir = s"${Utils.createTempDir()}/table1" + val data = Seq(100, 200, 300, 400).toDF("count") + data.write.parquet(workDir) + val dfFromFile = spark.read.parquet(workDir).cache() + val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { +case plan: InMemoryRelation => plan + }.head + // InMemoryRelation's stats is file size before the underlying RDD is materialized + assert(inMemoryRelation.computeStats().sizeInBytes === 740) + + // InMemoryRelation's stats is updated after materializing RDD + dfFromFile.collect() + assert(inMemoryRelation.computeStats().sizeInBytes === 16) + + // test of catalog table + val dfFromTable = spark.catalog.createTable("table1", workDir).cache() + val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. +collect { case plan: InMemoryRelation => plan }.head + + // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats + // is calculated + assert(inMemoryRelation2.computeStats().sizeInBytes === 740) + + // InMemoryRelation's stats should be updated after calculating stats of the table + spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") + assert(inMemoryRelation2.computeStats().sizeInBytes === 16) --- End diff -- it was a mistake here, AnalyzeTableCommand would actually force the table to be evaluated with count(), and it actually hits the longAccumulator's value, fixed in the latest commit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157381595 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +485,35 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + val workDir = s"${Utils.createTempDir()}/table1" + val data = Seq(100, 200, 300, 400).toDF("count") + data.write.parquet(workDir) + val dfFromFile = spark.read.parquet(workDir).cache() + val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { +case plan: InMemoryRelation => plan + }.head + // InMemoryRelation's stats is file size before the underlying RDD is materialized + assert(inMemoryRelation.computeStats().sizeInBytes === 740) + + // InMemoryRelation's stats is updated after materializing RDD + dfFromFile.collect() + assert(inMemoryRelation.computeStats().sizeInBytes === 16) + + // test of catalog table + val dfFromTable = spark.catalog.createTable("table1", workDir).cache() --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157381594 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +485,35 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + val workDir = s"${Utils.createTempDir()}/table1" --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157381589 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -71,9 +74,8 @@ case class InMemoryRelation( override def computeStats(): Statistics = { if (batchStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, no useful statistics information - // available, return the default statistics. --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157141961 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +485,35 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + val workDir = s"${Utils.createTempDir()}/table1" + val data = Seq(100, 200, 300, 400).toDF("count") + data.write.parquet(workDir) + val dfFromFile = spark.read.parquet(workDir).cache() + val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { +case plan: InMemoryRelation => plan + }.head + // InMemoryRelation's stats is file size before the underlying RDD is materialized + assert(inMemoryRelation.computeStats().sizeInBytes === 740) + + // InMemoryRelation's stats is updated after materializing RDD + dfFromFile.collect() + assert(inMemoryRelation.computeStats().sizeInBytes === 16) + + // test of catalog table + val dfFromTable = spark.catalog.createTable("table1", workDir).cache() + val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. +collect { case plan: InMemoryRelation => plan }.head + + // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats + // is calculated + assert(inMemoryRelation2.computeStats().sizeInBytes === 740) + + // InMemoryRelation's stats should be updated after calculating stats of the table + spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") + assert(inMemoryRelation2.computeStats().sizeInBytes === 16) --- End diff -- what happened here? `InMemoryRelation.statsOfPlanToCache` gets updated aotumatically? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157141730 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +485,35 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + val workDir = s"${Utils.createTempDir()}/table1" + val data = Seq(100, 200, 300, 400).toDF("count") + data.write.parquet(workDir) + val dfFromFile = spark.read.parquet(workDir).cache() + val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { +case plan: InMemoryRelation => plan + }.head + // InMemoryRelation's stats is file size before the underlying RDD is materialized + assert(inMemoryRelation.computeStats().sizeInBytes === 740) + + // InMemoryRelation's stats is updated after materializing RDD + dfFromFile.collect() + assert(inMemoryRelation.computeStats().sizeInBytes === 16) + + // test of catalog table + val dfFromTable = spark.catalog.createTable("table1", workDir).cache() --- End diff -- nit: wrap with `withTable`, which will clean up the table automatically at the end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157141574 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +485,35 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + val workDir = s"${Utils.createTempDir()}/table1" --- End diff -- nit: use `withTempDir` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157118091 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -80,6 +80,14 @@ class CacheManager extends Logging { cachedData.isEmpty } + private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = { +if (plan.stats.rowCount.isDefined) { --- End diff -- change the code here and file a JIRA in https://issues.apache.org/jira/browse/SPARK-22790 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156847927 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -80,6 +80,14 @@ class CacheManager extends Logging { cachedData.isEmpty } + private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = { +if (plan.stats.rowCount.isDefined) { --- End diff -- The expected value should be `rowCount * avgRowSize`. Without CBO, I think the file size is the best we can get, although it may not be correct. That is to say, without CBO, parquet relation may have underestimated size and cause OOM, users need to turn on CBO to fix it. So the same thing should happen in table cache. We can fix this by defining `sizeInBytes` as `file size * some factor`, but it should belong to another PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156718763 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -80,6 +80,14 @@ class CacheManager extends Logging { cachedData.isEmpty } + private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = { +if (plan.stats.rowCount.isDefined) { --- End diff -- the current logic is that only when we have run have enough stats for the interested table, we would put the value here, I agree that we should choose a better value for format like parquet where the actual in-memory size would be much larger than the sizeInBytes (i.e. on-disk size), my question is 1. what's the expected value if we have a HadoopFsRelation which is in parquet format ? 2. do we want to do it in this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156718225 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -71,9 +74,10 @@ case class InMemoryRelation( override def computeStats(): Statistics = { if (batchStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, no useful statistics information - // available, return the default statistics. - Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) + // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache when + // applicable + statsOfPlanToCache.getOrElse(Statistics(sizeInBytes = +child.sqlContext.conf.defaultSizeInBytes)) --- End diff -- If we don't trust the size in bytes for parquet then we should fix that in the datasource and not here. The old version did not use statistics proper at all, now that we do, we should use that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156716896 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -71,9 +74,10 @@ case class InMemoryRelation( override def computeStats(): Statistics = { if (batchStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, no useful statistics information - // available, return the default statistics. - Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) + // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache when + // applicable + statsOfPlanToCache.getOrElse(Statistics(sizeInBytes = +child.sqlContext.conf.defaultSizeInBytes)) --- End diff -- I just follow the original implementationI do not have a better value to put it here...if we use sizeInBytes, the risk is in the data source like parquet-formatted files in which the sizeInBytes is much smaller than the actual size in memoryyour suggestion? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156610279 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -71,9 +74,10 @@ case class InMemoryRelation( override def computeStats(): Statistics = { if (batchStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, no useful statistics information - // available, return the default statistics. - Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) + // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache when + // applicable + statsOfPlanToCache.getOrElse(Statistics(sizeInBytes = +child.sqlContext.conf.defaultSizeInBytes)) --- End diff -- Mweh - this seems very arbitrary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156609849 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -60,7 +62,8 @@ case class InMemoryRelation( @transient child: SparkPlan, tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) +val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, +statsOfPlanToCache: Option[Statistics] = None) --- End diff -- Yeah, the secondary argument list seems a better place. I don't think we should incorporate the stats in the hash/equals method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156609277 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -80,6 +80,14 @@ class CacheManager extends Logging { cachedData.isEmpty } + private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = { +if (plan.stats.rowCount.isDefined) { --- End diff -- We could also collect the size as part of building the cache. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156554180 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -80,6 +80,14 @@ class CacheManager extends Logging { cachedData.isEmpty } + private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = { +if (plan.stats.rowCount.isDefined) { --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156553748 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -80,6 +80,14 @@ class CacheManager extends Logging { cachedData.isEmpty } + private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = { +if (plan.stats.rowCount.isDefined) { --- End diff -- If we don't trust `sizeInBytes`, we should fix it in all places, not just here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156550811 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -80,6 +80,14 @@ class CacheManager extends Logging { cachedData.isEmpty } + private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = { +if (plan.stats.rowCount.isDefined) { --- End diff -- I have the same question before. Seems @CodingCat's claim is that the `sizeInBytes` from relation can cause OOM issue: https://github.com/apache/spark/pull/19864#discussion_r155287758. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156550114 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -80,6 +80,14 @@ class CacheManager extends Logging { cachedData.isEmpty } + private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = { +if (plan.stats.rowCount.isDefined) { --- End diff -- why this condition? Even there is no row count, the `sizeInBytes` is still better than `defaultSizeInBytes` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156445522 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -60,7 +62,8 @@ case class InMemoryRelation( @transient child: SparkPlan, tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) +val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, +statsOfPlanToCache: Option[Statistics] = None) --- End diff -- my two cents here: I didn't look into the code which makes this influence the logic of equal and hash, but we may not want to make equals/hash dependent on this: as in Spark SQL, we usually compare plan based on the string-represented format instead of plus stats info, e.g. try to reuse the cached plan based on the execution plan's string-representation instead of anything + stats info --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156294941 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -60,7 +62,8 @@ case class InMemoryRelation( @transient child: SparkPlan, tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) +val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, +statsOfPlanToCache: Option[Statistics] = None) --- End diff -- where shall we put the stats parameter? in the main constructor or in the curried constructor? The major difference is whether we wanna include it in `equals`/`hashCode` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156294973 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -60,7 +62,8 @@ case class InMemoryRelation( @transient child: SparkPlan, tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) +val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, +statsOfPlanToCache: Option[Statistics] = None) --- End diff -- cc @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r156078995 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -60,7 +62,8 @@ case class InMemoryRelation( @transient child: SparkPlan, tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) +val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, +statsOfPlanToCache: Option[Statistics] = None) --- End diff -- one minor question: why put this in the curried constructor? can we move it to before `_cachedColumnBuffers`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155587468 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +481,32 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats whenever possible") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + // scalastyle:off + val workDir = s"${Utils.createTempDir()}/table1" + val data = Seq(100, 200, 300, 400).toDF("count") + data.write.parquet(workDir) + val dfFromFile = spark.read.parquet(workDir).cache() + val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { +case plan: InMemoryRelation => plan + }.head + // InMemoryRelation's stats is Long.MaxValue before the underlying RDD is materialized + assert(inMemoryRelation.computeStats().sizeInBytes === Long.MaxValue) + // InMemoryRelation's stats is updated after materializing RDD + dfFromFile.collect() + assert(inMemoryRelation.computeStats().sizeInBytes === 16) + // test of catalog table + val dfFromTable = spark.catalog.createTable("table1", workDir).cache() + val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. +collect { case plan: InMemoryRelation => plan }.head + // Even CBO enabled, InMemoryRelation's stats keeps as the default one before table's stats + // is calculated + assert(inMemoryRelation2.computeStats().sizeInBytes === Long.MaxValue) --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155587431 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +481,32 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats whenever possible") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + // scalastyle:off --- End diff -- ah, sorry, it's a leftover for debugging --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155523343 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +481,32 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats whenever possible") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + // scalastyle:off + val workDir = s"${Utils.createTempDir()}/table1" + val data = Seq(100, 200, 300, 400).toDF("count") + data.write.parquet(workDir) + val dfFromFile = spark.read.parquet(workDir).cache() + val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { +case plan: InMemoryRelation => plan + }.head + // InMemoryRelation's stats is Long.MaxValue before the underlying RDD is materialized + assert(inMemoryRelation.computeStats().sizeInBytes === Long.MaxValue) + // InMemoryRelation's stats is updated after materializing RDD + dfFromFile.collect() + assert(inMemoryRelation.computeStats().sizeInBytes === 16) + // test of catalog table + val dfFromTable = spark.catalog.createTable("table1", workDir).cache() + val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. +collect { case plan: InMemoryRelation => plan }.head + // Even CBO enabled, InMemoryRelation's stats keeps as the default one before table's stats + // is calculated + assert(inMemoryRelation2.computeStats().sizeInBytes === Long.MaxValue) --- End diff -- Properly inserting few blank lines can make the test more readable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155521550 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -479,4 +481,32 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats whenever possible") { +withSQLConf("spark.sql.cbo.enabled" -> "true") { + // scalastyle:off --- End diff -- why turn off the style check here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155296970 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -37,8 +37,10 @@ object InMemoryRelation { batchSize: Int, storageLevel: StorageLevel, child: SparkPlan, - tableName: Option[String]): InMemoryRelation = -new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)() + tableName: Option[String], + stats: Option[Statistics]): InMemoryRelation = --- End diff -- for using None, looks like scala does not allow me to do that with the earlier change for exactly this purpose in https://github.com/apache/spark/pull/19864/commits/4c347014f32aa3c3088f8a18b08786fb074a1790 it failed to compile https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84499/console Looks like Scala compiler does not consider whether the default parameter is from the same or different function parameter lists --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155287758 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { --- End diff -- it's true, it affects I believethere is a similar discussion in https://github.com/apache/spark/pull/19743 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155223954 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -37,8 +37,10 @@ object InMemoryRelation { batchSize: Int, storageLevel: StorageLevel, child: SparkPlan, - tableName: Option[String]): InMemoryRelation = -new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)() + tableName: Option[String], + stats: Option[Statistics]): InMemoryRelation = --- End diff -- and use `None` as default value to avoid updating the tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155223876 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -37,8 +37,10 @@ object InMemoryRelation { batchSize: Int, storageLevel: StorageLevel, child: SparkPlan, - tableName: Option[String]): InMemoryRelation = -new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)() + tableName: Option[String], + stats: Option[Statistics]): InMemoryRelation = --- End diff -- also name it `statsOfPlanToCache ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155223822 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -71,9 +74,8 @@ case class InMemoryRelation( override def computeStats(): Statistics = { if (batchStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, no useful statistics information - // available, return the default statistics. --- End diff -- ``` // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155159490 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { --- End diff -- The statistics from relation is based on files size, will it easily cause OOM issue? I think in the cases other than cached query, we still use this relation's statistics. If this is an issue, doesn't it also affect the other cases? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155147478 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { --- End diff -- @viirya you're right! Thanks for clearing the confusion however, to prevent using relation's stats which can be much smaller than the in-memory size and lead to a potential OOM error, we should still have this condition here (we can remove cboEnabled though), right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155141140 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { --- End diff -- If a catalog table doesn't have statistics in its metadata, we will fill it with `defaultSizeInBytes`. https://github.com/apache/spark/blob/326f1d6728a7734c228d8bfaa69442a1c7b92e9b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala#L121-L134 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155140756 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { --- End diff -- `LogicalRelation` uses the statistics from the `relation` only when there is no given `catalogTable`. In this case, it doesn't consider if CBO is enabled or not. Only `catalogTable` considers CBO when computing its statistics in `toPlanStats`. It doesn't refer to relation's statistics, IIUC. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155140125 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { --- End diff -- no, if CBO is disabled, the relation's sizeInBytes is the file size https://github.com/apache/spark/blob/5c3a1f3fad695317c2fff1243cdb9b3ceb25c317/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala#L85 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r155137863 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { --- End diff -- When CBO is disabled, don't we just set the `sizeInBytes` to `defaultSizeInBytes`? Isn't it different than current statistics of first run? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r15502 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { --- End diff -- the reason I put it here is that when we did not enable CBO, the stats in the underlying plan might be much smaller than the actual size in memory leading to the potential risk of OOM error. The underlying cause is that without CBO enabled, the size of the plan is calculated with BaseRelation's sizeInBytes, but with CBO, we can have a more accurate estimation, https://github.com/apache/spark/blob/03fdc92e42d260a2b7c0090115f162ba5c091aae/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L42-L46 https://github.com/apache/spark/blob/03fdc92e42d260a2b7c0090115f162ba5c091aae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L370-L381 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r154973359 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -25,13 +25,15 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation --- End diff -- Unused import. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r154973164 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -25,13 +25,15 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.storage.StorageLevel import org.apache.spark.util.LongAccumulator object InMemoryRelation { + --- End diff -- Unnecessary change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r154972842 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { --- End diff -- Do we need to limit to those conditions? I think we can pass the stats into the created `InMemoryRelation` even the two conditions don't match. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r154972319 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { +inMemoryRelation.setStatsFromCachedPlan(planToCache) + } --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r154970842 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { +inMemoryRelation.setStatsFromCachedPlan(planToCache) + } --- End diff -- `InMemoryRelation` is not part of the public API and should be treated as unstable/internal. You can use it at your own risk. Changing the constructor is fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r154968066 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { +inMemoryRelation.setStatsFromCachedPlan(planToCache) + } --- End diff -- Looks like I have no way to access InMemoryRelation from outside of spark package, though it is not a package private class...how is that achieved? If this is the case, I can modify the constructor Thanks @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r154938881 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { +inMemoryRelation.setStatsFromCachedPlan(planToCache) + } --- End diff -- I think a more ideal change is to put the original plan stats into the constructor of `InMemoryRelation`, instead of making it mutable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r154501939 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( -planToCache, -InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( +sparkSession.sessionState.conf.useCompression, +sparkSession.sessionState.conf.columnBatchSize, +storageLevel, +sparkSession.sessionState.executePlan(planToCache).executedPlan, +tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { +inMemoryRelation.setStatsFromCachedPlan(planToCache) + } --- End diff -- I have to make InMemoryRelation stateful to avoid breaking APIs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r154500900 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -71,9 +73,17 @@ case class InMemoryRelation( override def computeStats(): Statistics = { if (batchStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, no useful statistics information - // available, return the default statistics. - Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) + children.filter(_.isInstanceOf[LogicalRelation]) match { +case Seq(c @ LogicalRelation(_, _, _, _), _) if c.conf.cboEnabled => --- End diff -- thanks, @viirya , yes, you're right! I misread the generated plan, working on it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r154492920 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -71,9 +73,17 @@ case class InMemoryRelation( override def computeStats(): Statistics = { if (batchStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, no useful statistics information - // available, return the default statistics. - Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) + children.filter(_.isInstanceOf[LogicalRelation]) match { +case Seq(c @ LogicalRelation(_, _, _, _), _) if c.conf.cboEnabled => --- End diff -- `InMemoryRelation` is a `logical.LeafNode`. I think it has no children? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
GitHub user CodingCat opened a pull request: https://github.com/apache/spark/pull/19864 [SPARK-22673][SQL] InMemoryRelation should utilize on-disk table stats whenever possible ## What changes were proposed in this pull request? The current implementation of InMemoryRelation always uses the most expensive execution plan when writing cache With CBO enabled, we can actually have a more exact estimation of the underlying table size... ## How was this patch tested? existing test You can merge this pull request into a Git repository by running: $ git pull https://github.com/CodingCat/spark SPARK-22673 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19864.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19864 commit b2fb1d25804b7bdbe1a767306a319dc748965bce Author: CodingCatDate: 2016-03-07T14:37:37Z improve the doc for "spark.memory.offHeap.size" commit 0971900d562cb1a18af6f7de02bb8eb95637a640 Author: CodingCat Date: 2016-03-07T19:00:16Z fix commit 32f7c74a9b5cf4f19e7d14357bb87064383e11b3 Author: CodingCat Date: 2017-12-01T23:05:35Z use cbo stats in inmemoryrelation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org