[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2018-01-23 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r163401858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -60,7 +62,8 @@ case class InMemoryRelation(
 @transient child: SparkPlan,
 tableName: Option[String])(
 @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator)
+val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
+statsOfPlanToCache: Statistics = null)
--- End diff --

eh...we do not have other options, it's more like a placeholder, since 
InMemoryRelation is created by CacheManager through apply() in companion object 
it's no harm here IMHO


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2018-01-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r163347494
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -60,7 +62,8 @@ case class InMemoryRelation(
 @transient child: SparkPlan,
 tableName: Option[String])(
 @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator)
+val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
+statsOfPlanToCache: Statistics = null)
--- End diff --

Why we set this to `null`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19864


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-18 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r157653784
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +485,43 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats of the 
plan to be cached") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  withTempPath { workDir =>
+withTable("table1") {
+  val workDirPath = workDir.getAbsolutePath
+  val data = Seq(100, 200, 300, 400).toDF("count")
+  data.write.parquet(workDirPath)
+  val dfFromFile = spark.read.parquet(workDirPath).cache()
+  val inMemoryRelation = 
dfFromFile.queryExecution.optimizedPlan.collect {
+case plan: InMemoryRelation => plan
+  }.head
+  // InMemoryRelation's stats is file size before the underlying 
RDD is materialized
+  assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+
+  // InMemoryRelation's stats is updated after materializing RDD
+  dfFromFile.collect()
+  assert(inMemoryRelation.computeStats().sizeInBytes === 16)
+
+  // test of catalog table
+  val dfFromTable = spark.catalog.createTable("table1", 
workDirPath).cache()
+  val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
+collect { case plan: InMemoryRelation => plan }.head
+
+  // Even CBO enabled, InMemoryRelation's stats keeps as the file 
size before table's stats
+  // is calculated
+  assert(inMemoryRelation2.computeStats().sizeInBytes === 740)
+
+  // InMemoryRelation's stats should be updated after calculating 
stats of the table
+  // clear cache to simulate a fresh environment
+  dfFromTable.unpersist(blocking = true)
+  spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
+  val inMemoryRelation3 = 
spark.read.table("table1").cache().queryExecution.optimizedPlan.
+collect { case plan: InMemoryRelation => plan }.head
+  assert(inMemoryRelation3.computeStats().sizeInBytes === 48)
--- End diff --

because 16 is the `exact` in-memory size which is got by reading the 
accumulator's value after evaluating the RDD

48 is calculated by EstimationUtils: 
https://github.com/apache/spark/blob/bdb5e55c2a67d16a36ad6baa22296d714d3525af/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala#L78
 `(8 + 4 (average attribute length)) * 4`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r157652482
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +485,43 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats of the 
plan to be cached") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  withTempPath { workDir =>
+withTable("table1") {
+  val workDirPath = workDir.getAbsolutePath
+  val data = Seq(100, 200, 300, 400).toDF("count")
+  data.write.parquet(workDirPath)
+  val dfFromFile = spark.read.parquet(workDirPath).cache()
+  val inMemoryRelation = 
dfFromFile.queryExecution.optimizedPlan.collect {
+case plan: InMemoryRelation => plan
+  }.head
+  // InMemoryRelation's stats is file size before the underlying 
RDD is materialized
+  assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+
+  // InMemoryRelation's stats is updated after materializing RDD
+  dfFromFile.collect()
+  assert(inMemoryRelation.computeStats().sizeInBytes === 16)
+
+  // test of catalog table
+  val dfFromTable = spark.catalog.createTable("table1", 
workDirPath).cache()
+  val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
+collect { case plan: InMemoryRelation => plan }.head
+
+  // Even CBO enabled, InMemoryRelation's stats keeps as the file 
size before table's stats
+  // is calculated
+  assert(inMemoryRelation2.computeStats().sizeInBytes === 740)
+
+  // InMemoryRelation's stats should be updated after calculating 
stats of the table
+  // clear cache to simulate a fresh environment
+  dfFromTable.unpersist(blocking = true)
+  spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
+  val inMemoryRelation3 = 
spark.read.table("table1").cache().queryExecution.optimizedPlan.
+collect { case plan: InMemoryRelation => plan }.head
+  assert(inMemoryRelation3.computeStats().sizeInBytes === 48)
--- End diff --

missed this one, why does it have a different stats than the table cache 
stats `16`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r157427138
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +485,43 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats of the 
plan to be cached") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  withTempDir { workDir =>
+withTable("table1") {
+  val workDirPath = workDir.getAbsolutePath + "/table1"
--- End diff --

then we don't need to append `/table1` to the path


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r157427036
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +485,43 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats of the 
plan to be cached") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  withTempDir { workDir =>
+withTable("table1") {
+  val workDirPath = workDir.getAbsolutePath + "/table1"
--- End diff --

seems you can use `withTempPath`, which just gives you a path string 
without creating it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-17 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r157381615
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +485,35 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats of the 
plan to be cached") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  val workDir = s"${Utils.createTempDir()}/table1"
+  val data = Seq(100, 200, 300, 400).toDF("count")
+  data.write.parquet(workDir)
+  val dfFromFile = spark.read.parquet(workDir).cache()
+  val inMemoryRelation = 
dfFromFile.queryExecution.optimizedPlan.collect {
+case plan: InMemoryRelation => plan
+  }.head
+  // InMemoryRelation's stats is file size before the underlying RDD 
is materialized
+  assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+
+  // InMemoryRelation's stats is updated after materializing RDD
+  dfFromFile.collect()
+  assert(inMemoryRelation.computeStats().sizeInBytes === 16)
+
+  // test of catalog table
+  val dfFromTable = spark.catalog.createTable("table1", 
workDir).cache()
+  val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
+collect { case plan: InMemoryRelation => plan }.head
+
+  // Even CBO enabled, InMemoryRelation's stats keeps as the file size 
before table's stats
+  // is calculated
+  assert(inMemoryRelation2.computeStats().sizeInBytes === 740)
+
+  // InMemoryRelation's stats should be updated after calculating 
stats of the table
+  spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
+  assert(inMemoryRelation2.computeStats().sizeInBytes === 16)
--- End diff --

it was a mistake here, AnalyzeTableCommand would actually force the table 
to be evaluated with count(), and it actually hits the longAccumulator's value, 

fixed in the latest commit


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-17 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r157381595
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +485,35 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats of the 
plan to be cached") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  val workDir = s"${Utils.createTempDir()}/table1"
+  val data = Seq(100, 200, 300, 400).toDF("count")
+  data.write.parquet(workDir)
+  val dfFromFile = spark.read.parquet(workDir).cache()
+  val inMemoryRelation = 
dfFromFile.queryExecution.optimizedPlan.collect {
+case plan: InMemoryRelation => plan
+  }.head
+  // InMemoryRelation's stats is file size before the underlying RDD 
is materialized
+  assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+
+  // InMemoryRelation's stats is updated after materializing RDD
+  dfFromFile.collect()
+  assert(inMemoryRelation.computeStats().sizeInBytes === 16)
+
+  // test of catalog table
+  val dfFromTable = spark.catalog.createTable("table1", 
workDir).cache()
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-17 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r157381594
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +485,35 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats of the 
plan to be cached") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  val workDir = s"${Utils.createTempDir()}/table1"
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-17 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r157381589
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -71,9 +74,8 @@ case class InMemoryRelation(
 
   override def computeStats(): Statistics = {
 if (batchStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, no useful 
statistics information
-  // available, return the default statistics.
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r157141961
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +485,35 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats of the 
plan to be cached") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  val workDir = s"${Utils.createTempDir()}/table1"
+  val data = Seq(100, 200, 300, 400).toDF("count")
+  data.write.parquet(workDir)
+  val dfFromFile = spark.read.parquet(workDir).cache()
+  val inMemoryRelation = 
dfFromFile.queryExecution.optimizedPlan.collect {
+case plan: InMemoryRelation => plan
+  }.head
+  // InMemoryRelation's stats is file size before the underlying RDD 
is materialized
+  assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+
+  // InMemoryRelation's stats is updated after materializing RDD
+  dfFromFile.collect()
+  assert(inMemoryRelation.computeStats().sizeInBytes === 16)
+
+  // test of catalog table
+  val dfFromTable = spark.catalog.createTable("table1", 
workDir).cache()
+  val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
+collect { case plan: InMemoryRelation => plan }.head
+
+  // Even CBO enabled, InMemoryRelation's stats keeps as the file size 
before table's stats
+  // is calculated
+  assert(inMemoryRelation2.computeStats().sizeInBytes === 740)
+
+  // InMemoryRelation's stats should be updated after calculating 
stats of the table
+  spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
+  assert(inMemoryRelation2.computeStats().sizeInBytes === 16)
--- End diff --

what happened here? `InMemoryRelation.statsOfPlanToCache` gets updated 
aotumatically?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r157141730
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +485,35 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats of the 
plan to be cached") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  val workDir = s"${Utils.createTempDir()}/table1"
+  val data = Seq(100, 200, 300, 400).toDF("count")
+  data.write.parquet(workDir)
+  val dfFromFile = spark.read.parquet(workDir).cache()
+  val inMemoryRelation = 
dfFromFile.queryExecution.optimizedPlan.collect {
+case plan: InMemoryRelation => plan
+  }.head
+  // InMemoryRelation's stats is file size before the underlying RDD 
is materialized
+  assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+
+  // InMemoryRelation's stats is updated after materializing RDD
+  dfFromFile.collect()
+  assert(inMemoryRelation.computeStats().sizeInBytes === 16)
+
+  // test of catalog table
+  val dfFromTable = spark.catalog.createTable("table1", 
workDir).cache()
--- End diff --

nit: wrap with `withTable`, which will clean up the table automatically at 
the end.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r157141574
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +485,35 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats of the 
plan to be cached") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  val workDir = s"${Utils.createTempDir()}/table1"
--- End diff --

nit: use `withTempDir`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-14 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r157118091
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -80,6 +80,14 @@ class CacheManager extends Logging {
 cachedData.isEmpty
   }
 
+  private def extractStatsOfPlanForCache(plan: LogicalPlan): 
Option[Statistics] = {
+if (plan.stats.rowCount.isDefined) {
--- End diff --

change the code here and file a JIRA in 
https://issues.apache.org/jira/browse/SPARK-22790


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156847927
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -80,6 +80,14 @@ class CacheManager extends Logging {
 cachedData.isEmpty
   }
 
+  private def extractStatsOfPlanForCache(plan: LogicalPlan): 
Option[Statistics] = {
+if (plan.stats.rowCount.isDefined) {
--- End diff --

The expected value should be `rowCount * avgRowSize`. Without CBO, I think 
the file size is the best we can get, although it may not be correct.

That is to say, without CBO, parquet relation may have underestimated size 
and cause OOM, users need to turn on CBO to fix it. So the same thing should 
happen in table cache.

We can fix this by defining `sizeInBytes` as `file size * some factor`, but 
it should belong to another PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-13 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156718763
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -80,6 +80,14 @@ class CacheManager extends Logging {
 cachedData.isEmpty
   }
 
+  private def extractStatsOfPlanForCache(plan: LogicalPlan): 
Option[Statistics] = {
+if (plan.stats.rowCount.isDefined) {
--- End diff --

the current logic is that only when we have run have enough stats for the 
interested table, we would put the value here, 

I agree that we should choose a better value for format like parquet where 
the actual in-memory size would be much larger than the sizeInBytes (i.e. 
on-disk size), my question is 

1. what's the expected value if we have a HadoopFsRelation which is in 
parquet format ?

2. do we want to do it in this PR?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156718225
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -71,9 +74,10 @@ case class InMemoryRelation(
 
   override def computeStats(): Statistics = {
 if (batchStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, no useful 
statistics information
-  // available, return the default statistics.
-  Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
+  // Underlying columnar RDD hasn't been materialized, use the stats 
from the plan to cache when
+  // applicable
+  statsOfPlanToCache.getOrElse(Statistics(sizeInBytes =
+child.sqlContext.conf.defaultSizeInBytes))
--- End diff --

If we don't trust the size in bytes for parquet then we should fix that in 
the datasource and not here. The old version did not use statistics proper at 
all, now that we do, we should use that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-13 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156716896
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -71,9 +74,10 @@ case class InMemoryRelation(
 
   override def computeStats(): Statistics = {
 if (batchStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, no useful 
statistics information
-  // available, return the default statistics.
-  Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
+  // Underlying columnar RDD hasn't been materialized, use the stats 
from the plan to cache when
+  // applicable
+  statsOfPlanToCache.getOrElse(Statistics(sizeInBytes =
+child.sqlContext.conf.defaultSizeInBytes))
--- End diff --

I just follow the original implementationI do not have a better value 
to put it here...if we use sizeInBytes, the risk is in the data source like 
parquet-formatted files in which the sizeInBytes is much smaller than the 
actual size in memoryyour suggestion?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156610279
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -71,9 +74,10 @@ case class InMemoryRelation(
 
   override def computeStats(): Statistics = {
 if (batchStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, no useful 
statistics information
-  // available, return the default statistics.
-  Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
+  // Underlying columnar RDD hasn't been materialized, use the stats 
from the plan to cache when
+  // applicable
+  statsOfPlanToCache.getOrElse(Statistics(sizeInBytes =
+child.sqlContext.conf.defaultSizeInBytes))
--- End diff --

Mweh - this seems very arbitrary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156609849
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -60,7 +62,8 @@ case class InMemoryRelation(
 @transient child: SparkPlan,
 tableName: Option[String])(
 @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator)
+val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
+statsOfPlanToCache: Option[Statistics] = None)
--- End diff --

Yeah, the secondary argument list seems a better place. I don't think we 
should incorporate the stats in the hash/equals method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156609277
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -80,6 +80,14 @@ class CacheManager extends Logging {
 cachedData.isEmpty
   }
 
+  private def extractStatsOfPlanForCache(plan: LogicalPlan): 
Option[Statistics] = {
+if (plan.stats.rowCount.isDefined) {
--- End diff --

We could also collect the size as part of building the cache.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156554180
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -80,6 +80,14 @@ class CacheManager extends Logging {
 cachedData.isEmpty
   }
 
+  private def extractStatsOfPlanForCache(plan: LogicalPlan): 
Option[Statistics] = {
+if (plan.stats.rowCount.isDefined) {
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156553748
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -80,6 +80,14 @@ class CacheManager extends Logging {
 cachedData.isEmpty
   }
 
+  private def extractStatsOfPlanForCache(plan: LogicalPlan): 
Option[Statistics] = {
+if (plan.stats.rowCount.isDefined) {
--- End diff --

If we don't trust `sizeInBytes`, we should fix it in all places, not just 
here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156550811
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -80,6 +80,14 @@ class CacheManager extends Logging {
 cachedData.isEmpty
   }
 
+  private def extractStatsOfPlanForCache(plan: LogicalPlan): 
Option[Statistics] = {
+if (plan.stats.rowCount.isDefined) {
--- End diff --

I have the same question before. Seems @CodingCat's claim is that the 
`sizeInBytes` from relation can cause OOM issue: 
https://github.com/apache/spark/pull/19864#discussion_r155287758.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156550114
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -80,6 +80,14 @@ class CacheManager extends Logging {
 cachedData.isEmpty
   }
 
+  private def extractStatsOfPlanForCache(plan: LogicalPlan): 
Option[Statistics] = {
+if (plan.stats.rowCount.isDefined) {
--- End diff --

why this condition? Even there is no row count, the `sizeInBytes` is still 
better than `defaultSizeInBytes`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-12 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156445522
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -60,7 +62,8 @@ case class InMemoryRelation(
 @transient child: SparkPlan,
 tableName: Option[String])(
 @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator)
+val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
+statsOfPlanToCache: Option[Statistics] = None)
--- End diff --

my two cents here: I didn't look into the code which makes this influence 
the logic of equal and hash, but we may not want to make equals/hash dependent 
on this: 

as in Spark SQL, we usually compare plan based on the string-represented 
format instead of plus stats info, e.g. try to reuse the cached plan based on 
the execution plan's string-representation instead of anything + stats info

 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156294941
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -60,7 +62,8 @@ case class InMemoryRelation(
 @transient child: SparkPlan,
 tableName: Option[String])(
 @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator)
+val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
+statsOfPlanToCache: Option[Statistics] = None)
--- End diff --

where shall we put the stats parameter? in the main constructor or in the 
curried constructor? The major difference is whether we wanna include it in 
`equals`/`hashCode`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156294973
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -60,7 +62,8 @@ case class InMemoryRelation(
 @transient child: SparkPlan,
 tableName: Option[String])(
 @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator)
+val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
+statsOfPlanToCache: Option[Statistics] = None)
--- End diff --

cc @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r156078995
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -60,7 +62,8 @@ case class InMemoryRelation(
 @transient child: SparkPlan,
 tableName: Option[String])(
 @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator)
+val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
+statsOfPlanToCache: Option[Statistics] = None)
--- End diff --

one minor question: why put this in the curried constructor? can we move it 
to before `_cachedColumnBuffers`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-07 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155587468
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +481,32 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats 
whenever possible") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  // scalastyle:off
+  val workDir = s"${Utils.createTempDir()}/table1"
+  val data = Seq(100, 200, 300, 400).toDF("count")
+  data.write.parquet(workDir)
+  val dfFromFile = spark.read.parquet(workDir).cache()
+  val inMemoryRelation = 
dfFromFile.queryExecution.optimizedPlan.collect {
+case plan: InMemoryRelation => plan
+  }.head
+  // InMemoryRelation's stats is Long.MaxValue before the underlying 
RDD is materialized
+  assert(inMemoryRelation.computeStats().sizeInBytes === Long.MaxValue)
+  // InMemoryRelation's stats is updated after materializing RDD
+  dfFromFile.collect()
+  assert(inMemoryRelation.computeStats().sizeInBytes === 16)
+  // test of catalog table
+  val dfFromTable = spark.catalog.createTable("table1", 
workDir).cache()
+  val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
+collect { case plan: InMemoryRelation => plan }.head
+  // Even CBO enabled, InMemoryRelation's stats keeps as the default 
one before table's stats
+  // is calculated
+  assert(inMemoryRelation2.computeStats().sizeInBytes === 
Long.MaxValue)
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-07 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155587431
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +481,32 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats 
whenever possible") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  // scalastyle:off
--- End diff --

ah, sorry, it's a leftover for debugging


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155523343
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +481,32 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats 
whenever possible") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  // scalastyle:off
+  val workDir = s"${Utils.createTempDir()}/table1"
+  val data = Seq(100, 200, 300, 400).toDF("count")
+  data.write.parquet(workDir)
+  val dfFromFile = spark.read.parquet(workDir).cache()
+  val inMemoryRelation = 
dfFromFile.queryExecution.optimizedPlan.collect {
+case plan: InMemoryRelation => plan
+  }.head
+  // InMemoryRelation's stats is Long.MaxValue before the underlying 
RDD is materialized
+  assert(inMemoryRelation.computeStats().sizeInBytes === Long.MaxValue)
+  // InMemoryRelation's stats is updated after materializing RDD
+  dfFromFile.collect()
+  assert(inMemoryRelation.computeStats().sizeInBytes === 16)
+  // test of catalog table
+  val dfFromTable = spark.catalog.createTable("table1", 
workDir).cache()
+  val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
+collect { case plan: InMemoryRelation => plan }.head
+  // Even CBO enabled, InMemoryRelation's stats keeps as the default 
one before table's stats
+  // is calculated
+  assert(inMemoryRelation2.computeStats().sizeInBytes === 
Long.MaxValue)
--- End diff --

Properly inserting few blank lines can make the test more readable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155521550
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -479,4 +481,32 @@ class InMemoryColumnarQuerySuite extends QueryTest 
with SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-22673: InMemoryRelation should utilize existing stats 
whenever possible") {
+withSQLConf("spark.sql.cbo.enabled" -> "true") {
+  // scalastyle:off
--- End diff --

why turn off the style check here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-06 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155296970
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -37,8 +37,10 @@ object InMemoryRelation {
   batchSize: Int,
   storageLevel: StorageLevel,
   child: SparkPlan,
-  tableName: Option[String]): InMemoryRelation =
-new InMemoryRelation(child.output, useCompression, batchSize, 
storageLevel, child, tableName)()
+  tableName: Option[String],
+  stats: Option[Statistics]): InMemoryRelation =
--- End diff --

for using None, looks like scala does not allow me to do that 

with the earlier change for exactly this purpose in 
https://github.com/apache/spark/pull/19864/commits/4c347014f32aa3c3088f8a18b08786fb074a1790

it failed to compile 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84499/console


Looks like Scala compiler does not consider whether the default parameter 
is from the same or different function parameter lists


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-06 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155287758
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
--- End diff --

it's true, it affects I believethere is a similar discussion in 
https://github.com/apache/spark/pull/19743


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155223954
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -37,8 +37,10 @@ object InMemoryRelation {
   batchSize: Int,
   storageLevel: StorageLevel,
   child: SparkPlan,
-  tableName: Option[String]): InMemoryRelation =
-new InMemoryRelation(child.output, useCompression, batchSize, 
storageLevel, child, tableName)()
+  tableName: Option[String],
+  stats: Option[Statistics]): InMemoryRelation =
--- End diff --

and use `None` as default value to avoid updating the tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155223876
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -37,8 +37,10 @@ object InMemoryRelation {
   batchSize: Int,
   storageLevel: StorageLevel,
   child: SparkPlan,
-  tableName: Option[String]): InMemoryRelation =
-new InMemoryRelation(child.output, useCompression, batchSize, 
storageLevel, child, tableName)()
+  tableName: Option[String],
+  stats: Option[Statistics]): InMemoryRelation =
--- End diff --

also name it `statsOfPlanToCache `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155223822
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -71,9 +74,8 @@ case class InMemoryRelation(
 
   override def computeStats(): Statistics = {
 if (batchStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, no useful 
statistics information
-  // available, return the default statistics.
--- End diff --

```
// Underlying columnar RDD hasn't been materialized, use the stats from the 
plan to cache.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155159490
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
--- End diff --

The statistics from relation is based on files size, will it easily cause 
OOM issue? I think in the cases other than cached query, we still use this 
relation's statistics. If this is an issue, doesn't it also affect the other 
cases?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155147478
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
--- End diff --

@viirya  you're right! Thanks for clearing the confusion

however, to prevent using relation's stats which can be much smaller than 
the in-memory size and lead to a potential OOM error, we should still have this 
condition here (we can remove cboEnabled though), right?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155141140
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
--- End diff --

If a catalog table doesn't have statistics in its metadata, we will fill it 
with `defaultSizeInBytes`.


https://github.com/apache/spark/blob/326f1d6728a7734c228d8bfaa69442a1c7b92e9b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala#L121-L134


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155140756
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
--- End diff --

`LogicalRelation` uses the statistics from the `relation` only when there 
is no given `catalogTable`.  In this case, it doesn't consider if CBO is 
enabled or not.

Only `catalogTable` considers CBO when computing its statistics in 
`toPlanStats`. It doesn't refer to relation's statistics, IIUC.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155140125
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
--- End diff --

no, if CBO is disabled, the relation's sizeInBytes is the file size


https://github.com/apache/spark/blob/5c3a1f3fad695317c2fff1243cdb9b3ceb25c317/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala#L85





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r155137863
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
--- End diff --

When CBO is disabled, don't we just set the `sizeInBytes` to 
`defaultSizeInBytes`? Isn't it different than current statistics of first run?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r15502
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
--- End diff --

the reason I put it here is that when we did not enable CBO, the stats in 
the underlying plan might be much smaller than the actual size in memory 
leading to the potential risk of OOM error. 

The underlying cause is that without CBO enabled, the size of the plan is 
calculated with BaseRelation's sizeInBytes, but with CBO, we can have a more 
accurate estimation, 



https://github.com/apache/spark/blob/03fdc92e42d260a2b7c0090115f162ba5c091aae/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L42-L46


https://github.com/apache/spark/blob/03fdc92e42d260a2b7c0090115f162ba5c091aae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L370-L381


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r154973359
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -25,13 +25,15 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Statistics}
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
--- End diff --

Unused import.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r154973164
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -25,13 +25,15 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Statistics}
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.LongAccumulator
 
 
 object InMemoryRelation {
+
--- End diff --

Unnecessary change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r154972842
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
--- End diff --

Do we need to limit to those conditions? I think we can pass the stats into 
the created `InMemoryRelation` even the two conditions don't match.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r154972319
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
+inMemoryRelation.setStatsFromCachedPlan(planToCache)
+  }
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r154970842
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
+inMemoryRelation.setStatsFromCachedPlan(planToCache)
+  }
--- End diff --

`InMemoryRelation` is not part of the public API and should be treated as 
unstable/internal. You can use it at your own risk. Changing the constructor is 
fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r154968066
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
+inMemoryRelation.setStatsFromCachedPlan(planToCache)
+  }
--- End diff --

Looks like I have no way to access InMemoryRelation from outside of spark 
package, though it is not a package private class...how is that achieved?

If this is the case, I can modify the constructor

Thanks @cloud-fan


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r154938881
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
+inMemoryRelation.setStatsFromCachedPlan(planToCache)
+  }
--- End diff --

I think a more ideal change is to put the original plan stats into the 
constructor of `InMemoryRelation`, instead of making it mutable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-02 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r154501939
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -94,14 +94,16 @@ class CacheManager extends Logging {
   logWarning("Asked to cache already cached data.")
 } else {
   val sparkSession = query.sparkSession
-  cachedData.add(CachedData(
-planToCache,
-InMemoryRelation(
-  sparkSession.sessionState.conf.useCompression,
-  sparkSession.sessionState.conf.columnBatchSize,
-  storageLevel,
-  sparkSession.sessionState.executePlan(planToCache).executedPlan,
-  tableName)))
+  val inMemoryRelation = InMemoryRelation(
+sparkSession.sessionState.conf.useCompression,
+sparkSession.sessionState.conf.columnBatchSize,
+storageLevel,
+sparkSession.sessionState.executePlan(planToCache).executedPlan,
+tableName)
+  if (planToCache.conf.cboEnabled && 
planToCache.stats.rowCount.isDefined) {
+inMemoryRelation.setStatsFromCachedPlan(planToCache)
+  }
--- End diff --

I have to make InMemoryRelation stateful to avoid breaking APIs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-02 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r154500900
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -71,9 +73,17 @@ case class InMemoryRelation(
 
   override def computeStats(): Statistics = {
 if (batchStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, no useful 
statistics information
-  // available, return the default statistics.
-  Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
+  children.filter(_.isInstanceOf[LogicalRelation]) match {
+case Seq(c @ LogicalRelation(_, _, _, _), _) if c.conf.cboEnabled 
=>
--- End diff --

thanks, @viirya , yes, you're right! I misread the generated plan, working 
on it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19864#discussion_r154492920
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -71,9 +73,17 @@ case class InMemoryRelation(
 
   override def computeStats(): Statistics = {
 if (batchStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, no useful 
statistics information
-  // available, return the default statistics.
-  Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
+  children.filter(_.isInstanceOf[LogicalRelation]) match {
+case Seq(c @ LogicalRelation(_, _, _, _), _) if c.conf.cboEnabled 
=>
--- End diff --

`InMemoryRelation` is a `logical.LeafNode`. I think it has no children?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...

2017-12-01 Thread CodingCat
GitHub user CodingCat opened a pull request:

https://github.com/apache/spark/pull/19864

[SPARK-22673][SQL] InMemoryRelation should utilize on-disk table stats 
whenever possible

## What changes were proposed in this pull request?

The current implementation of InMemoryRelation always uses the most 
expensive execution plan when writing cache
With CBO enabled, we can actually have a more exact estimation of the 
underlying table size...

## How was this patch tested?

existing test


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/CodingCat/spark SPARK-22673

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19864.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19864


commit b2fb1d25804b7bdbe1a767306a319dc748965bce
Author: CodingCat 
Date:   2016-03-07T14:37:37Z

improve the doc for "spark.memory.offHeap.size"

commit 0971900d562cb1a18af6f7de02bb8eb95637a640
Author: CodingCat 
Date:   2016-03-07T19:00:16Z

fix

commit 32f7c74a9b5cf4f19e7d14357bb87064383e11b3
Author: CodingCat 
Date:   2017-12-01T23:05:35Z

use cbo stats in inmemoryrelation




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org