[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21018


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-24 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183907321
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -794,4 +795,28 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   }
 }
   }
+
+  private def checkIfNoJobTriggered(f: => DataFrame): DataFrame = {
+var numJobTrigered = 0
+val jobListener = new SparkListener {
+  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+numJobTrigered += 1
+  }
+}
+sparkContext.addSparkListener(jobListener)
+try {
+  val df = f
+  assert(numJobTrigered === 0)
+  df
+} finally {
+  sparkContext.removeSparkListener(jobListener)
+}
+  }
+
+  test("SPARK-23880 table cache should be lazy and don't trigger any 
jobs") {
--- End diff --

oh, I'll recheck. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-24 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183903195
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -794,4 +795,28 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   }
 }
   }
+
+  private def checkIfNoJobTriggered(f: => DataFrame): DataFrame = {
+var numJobTrigered = 0
+val jobListener = new SparkListener {
+  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+numJobTrigered += 1
+  }
+}
+sparkContext.addSparkListener(jobListener)
+try {
+  val df = f
+  assert(numJobTrigered === 0)
+  df
+} finally {
+  sparkContext.removeSparkListener(jobListener)
+}
+  }
+
+  test("SPARK-23880 table cache should be lazy and don't trigger any 
jobs") {
--- End diff --

Without the changes in this PR, this test still can pass. : )


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183638215
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -794,4 +795,28 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   }
 }
   }
+
+  private def checkIfNoJobTriggered(f: => DataFrame): DataFrame = {
+var numJobTrigered = 0
+val jobListener = new SparkListener {
+  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+numJobTrigered += 1
+  }
+}
+sparkContext.addSparkListener(jobListener)
+try {
+  val df = f
+  assert(numJobTrigered === 0)
--- End diff --

before this assert, we should make sure the event queue is empty, via 
`sparkContext.listenerBus.waitUntilEmpty`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183637688
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -154,32 +124,77 @@ case class InMemoryRelation(
 
 cached.setName(
   tableName.map(n => s"In-memory table $n")
-.getOrElse(StringUtils.abbreviate(child.toString, 1024)))
-_cachedColumnBuffers = cached
+.getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024)))
+cached
+  }
+}
+
+object InMemoryRelation {
+
+  def apply(
+  useCompression: Boolean,
+  batchSize: Int,
+  storageLevel: StorageLevel,
+  child: SparkPlan,
+  tableName: Option[String],
+  logicalPlan: LogicalPlan): InMemoryRelation = {
+val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, 
storageLevel, child, tableName)()
+new InMemoryRelation(child.output, cacheBuilder)(
+  statsOfPlanToCache = logicalPlan.stats, outputOrdering = 
logicalPlan.outputOrdering)
+  }
+
+  def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): 
InMemoryRelation = {
+new InMemoryRelation(cacheBuilder.cachedPlan.output, cacheBuilder)(
+  statsOfPlanToCache = logicalPlan.stats, outputOrdering = 
logicalPlan.outputOrdering)
+  }
+}
+
+case class InMemoryRelation(
+output: Seq[Attribute],
+@transient cacheBuilder: CachedRDDBuilder)(
+statsOfPlanToCache: Statistics,
+override val outputOrdering: Seq[SortOrder])
+  extends logical.LeafNode with MultiInstanceRelation {
+
+  override protected def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
+
+  override def doCanonicalize(): logical.LogicalPlan =
+copy(output = output.map(QueryPlan.normalizeExprId(_, 
cachedPlan.output)),
+  cacheBuilder)(
+  statsOfPlanToCache,
+  outputOrdering)
+
+  override def producedAttributes: AttributeSet = outputSet
+
+  @transient val partitionStatistics = new PartitionStatistics(output)
+
+  val cachedPlan: SparkPlan = cacheBuilder.cachedPlan
--- End diff --

this should be `def`, or it will be serialized.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-23 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183614108
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -55,56 +42,39 @@ object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
 
-case class InMemoryRelation(
-output: Seq[Attribute],
+case class CachedRDDBuilder(
 useCompression: Boolean,
 batchSize: Int,
 storageLevel: StorageLevel,
 @transient child: SparkPlan,
 tableName: Option[String])(
-@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
-statsOfPlanToCache: Statistics,
-override val outputOrdering: Seq[SortOrder])
-  extends logical.LeafNode with MultiInstanceRelation {
-
-  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
-
-  override def doCanonicalize(): logical.LogicalPlan =
-copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
-  storageLevel = StorageLevel.NONE,
-  child = child.canonicalized,
-  tableName = None)(
-  _cachedColumnBuffers,
-  sizeInBytesStats,
-  statsOfPlanToCache,
-  outputOrdering)
-
-  override def producedAttributes: AttributeSet = outputSet
+@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = 
null) {
 
-  @transient val partitionStatistics = new PartitionStatistics(output)
+  val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator
 
-  override def computeStats(): Statistics = {
-if (sizeInBytesStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, use the stats 
from the plan to cache.
-  // Note that we should drop the hint info here. We may cache a plan 
whose root node is a hint
-  // node. When we lookup the cache with a semantically same plan 
without hint info, the plan
-  // returned by cache lookup should not have hint info. If we lookup 
the cache with a
-  // semantically same plan with a different hint info, 
`CacheManager.useCachedData` will take
-  // care of it and retain the hint info in the lookup input plan.
-  statsOfPlanToCache.copy(hints = HintInfo())
-} else {
-  Statistics(sizeInBytes = sizeInBytesStats.value.longValue)
+  def cachedColumnBuffers: RDD[CachedBatch] = {
+if (_cachedColumnBuffers == null) {
+  synchronized {
--- End diff --

In this pr w/o `synchronized`, I found multi-thread queries wrongly built 
four RDDs for a single cache;
```
val cachedDf = spark.range(100).selectExpr("id AS k", "id AS v").cache
for (i <- 0 to 3) {
  val thread = new Thread {
override def run {
  // Start a job in each thread
  val df = cachedDf.filter('k > 5).groupBy().sum("v")
  df.collect
}
  }
  thread.start
}
```
Either way, I think we should make `_cachedColumnBuffers` private, so I 
fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-23 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183395070
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -55,56 +42,39 @@ object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
 
-case class InMemoryRelation(
-output: Seq[Attribute],
+case class CachedRDDBuilder(
 useCompression: Boolean,
 batchSize: Int,
 storageLevel: StorageLevel,
 @transient child: SparkPlan,
 tableName: Option[String])(
-@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
-statsOfPlanToCache: Statistics,
-override val outputOrdering: Seq[SortOrder])
-  extends logical.LeafNode with MultiInstanceRelation {
-
-  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
-
-  override def doCanonicalize(): logical.LogicalPlan =
-copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
-  storageLevel = StorageLevel.NONE,
-  child = child.canonicalized,
-  tableName = None)(
-  _cachedColumnBuffers,
-  sizeInBytesStats,
-  statsOfPlanToCache,
-  outputOrdering)
-
-  override def producedAttributes: AttributeSet = outputSet
+@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = 
null) {
 
-  @transient val partitionStatistics = new PartitionStatistics(output)
+  val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator
 
-  override def computeStats(): Statistics = {
-if (sizeInBytesStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, use the stats 
from the plan to cache.
-  // Note that we should drop the hint info here. We may cache a plan 
whose root node is a hint
-  // node. When we lookup the cache with a semantically same plan 
without hint info, the plan
-  // returned by cache lookup should not have hint info. If we lookup 
the cache with a
-  // semantically same plan with a different hint info, 
`CacheManager.useCachedData` will take
-  // care of it and retain the hint info in the lookup input plan.
-  statsOfPlanToCache.copy(hints = HintInfo())
-} else {
-  Statistics(sizeInBytes = sizeInBytesStats.value.longValue)
+  def cachedColumnBuffers: RDD[CachedBatch] = {
+if (_cachedColumnBuffers == null) {
+  synchronized {
--- End diff --

oh, my bad. ok and I'll update soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183367771
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -55,56 +42,39 @@ object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
 
-case class InMemoryRelation(
-output: Seq[Attribute],
+case class CachedRDDBuilder(
 useCompression: Boolean,
 batchSize: Int,
 storageLevel: StorageLevel,
 @transient child: SparkPlan,
 tableName: Option[String])(
-@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
-statsOfPlanToCache: Statistics,
-override val outputOrdering: Seq[SortOrder])
-  extends logical.LeafNode with MultiInstanceRelation {
-
-  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
-
-  override def doCanonicalize(): logical.LogicalPlan =
-copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
-  storageLevel = StorageLevel.NONE,
-  child = child.canonicalized,
-  tableName = None)(
-  _cachedColumnBuffers,
-  sizeInBytesStats,
-  statsOfPlanToCache,
-  outputOrdering)
-
-  override def producedAttributes: AttributeSet = outputSet
+@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = 
null) {
 
-  @transient val partitionStatistics = new PartitionStatistics(output)
+  val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator
 
-  override def computeStats(): Statistics = {
-if (sizeInBytesStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, use the stats 
from the plan to cache.
-  // Note that we should drop the hint info here. We may cache a plan 
whose root node is a hint
-  // node. When we lookup the cache with a semantically same plan 
without hint info, the plan
-  // returned by cache lookup should not have hint info. If we lookup 
the cache with a
-  // semantically same plan with a different hint info, 
`CacheManager.useCachedData` will take
-  // care of it and retain the hint info in the lookup input plan.
-  statsOfPlanToCache.copy(hints = HintInfo())
-} else {
-  Statistics(sizeInBytes = sizeInBytesStats.value.longValue)
+  def cachedColumnBuffers: RDD[CachedBatch] = {
+if (_cachedColumnBuffers == null) {
+  synchronized {
--- End diff --

We should not care about thread-safety at all or do it right. Please prove 
`CachedRDDBuilder` will never be accessed by multiple threads, or making 
`_cachedColumnBuffers` private.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-23 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183362225
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -155,31 +125,76 @@ case class InMemoryRelation(
 cached.setName(
   tableName.map(n => s"In-memory table $n")
 .getOrElse(StringUtils.abbreviate(child.toString, 1024)))
-_cachedColumnBuffers = cached
+cached
+  }
+}
+
+object InMemoryRelation {
+
+  def apply(
+  useCompression: Boolean,
+  batchSize: Int,
+  storageLevel: StorageLevel,
+  child: SparkPlan,
+  tableName: Option[String],
+  logicalPlan: LogicalPlan): InMemoryRelation = {
+val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, 
storageLevel, child, tableName)()
+new InMemoryRelation(child.output, cacheBuilder)(
+  statsOfPlanToCache = logicalPlan.stats, outputOrdering = 
logicalPlan.outputOrdering)
+  }
+
+  def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): 
InMemoryRelation = {
+new InMemoryRelation(cacheBuilder.child.output, cacheBuilder)(
+  statsOfPlanToCache = logicalPlan.stats, outputOrdering = 
logicalPlan.outputOrdering)
+  }
+}
+
+case class InMemoryRelation(
+output: Seq[Attribute],
+@transient cacheBuilder: CachedRDDBuilder)(
+statsOfPlanToCache: Statistics,
+override val outputOrdering: Seq[SortOrder])
+  extends logical.LeafNode with MultiInstanceRelation {
+
+  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
+
+  override def doCanonicalize(): logical.LogicalPlan =
+copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
+  cacheBuilder)(
+  statsOfPlanToCache,
+  outputOrdering)
+
+  override def producedAttributes: AttributeSet = outputSet
+
+  @transient val partitionStatistics = new PartitionStatistics(output)
+
+  val child: SparkPlan = cacheBuilder.child
--- End diff --

Since `InMemoryTableScanExec` and the other places reference this variable, 
I kept this public. But, ya, I feel the name is a little weird. So, I renamed 
`child` to `cachedPlan`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-23 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183362208
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -55,56 +42,39 @@ object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
 
-case class InMemoryRelation(
-output: Seq[Attribute],
+case class CachedRDDBuilder(
 useCompression: Boolean,
 batchSize: Int,
 storageLevel: StorageLevel,
 @transient child: SparkPlan,
 tableName: Option[String])(
-@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
-statsOfPlanToCache: Statistics,
-override val outputOrdering: Seq[SortOrder])
-  extends logical.LeafNode with MultiInstanceRelation {
-
-  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
-
-  override def doCanonicalize(): logical.LogicalPlan =
-copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
-  storageLevel = StorageLevel.NONE,
-  child = child.canonicalized,
-  tableName = None)(
-  _cachedColumnBuffers,
-  sizeInBytesStats,
-  statsOfPlanToCache,
-  outputOrdering)
-
-  override def producedAttributes: AttributeSet = outputSet
+@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = 
null) {
 
-  @transient val partitionStatistics = new PartitionStatistics(output)
+  val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator
 
-  override def computeStats(): Statistics = {
-if (sizeInBytesStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, use the stats 
from the plan to cache.
-  // Note that we should drop the hint info here. We may cache a plan 
whose root node is a hint
-  // node. When we lookup the cache with a semantically same plan 
without hint info, the plan
-  // returned by cache lookup should not have hint info. If we lookup 
the cache with a
-  // semantically same plan with a different hint info, 
`CacheManager.useCachedData` will take
-  // care of it and retain the hint info in the lookup input plan.
-  statsOfPlanToCache.copy(hints = HintInfo())
-} else {
-  Statistics(sizeInBytes = sizeInBytesStats.value.longValue)
+  def cachedColumnBuffers: RDD[CachedBatch] = {
+if (_cachedColumnBuffers == null) {
+  synchronized {
--- End diff --

I feel thread contention is low here, so I like simpler code. But, I 
welcome suggestions for more efficient&simpler code. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183325713
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -55,56 +42,39 @@ object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
 
-case class InMemoryRelation(
-output: Seq[Attribute],
+case class CachedRDDBuilder(
 useCompression: Boolean,
 batchSize: Int,
 storageLevel: StorageLevel,
 @transient child: SparkPlan,
 tableName: Option[String])(
-@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
-statsOfPlanToCache: Statistics,
-override val outputOrdering: Seq[SortOrder])
-  extends logical.LeafNode with MultiInstanceRelation {
-
-  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
-
-  override def doCanonicalize(): logical.LogicalPlan =
-copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
-  storageLevel = StorageLevel.NONE,
-  child = child.canonicalized,
-  tableName = None)(
-  _cachedColumnBuffers,
-  sizeInBytesStats,
-  statsOfPlanToCache,
-  outputOrdering)
-
-  override def producedAttributes: AttributeSet = outputSet
+@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = 
null) {
 
-  @transient val partitionStatistics = new PartitionStatistics(output)
+  val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator
 
-  override def computeStats(): Statistics = {
-if (sizeInBytesStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, use the stats 
from the plan to cache.
-  // Note that we should drop the hint info here. We may cache a plan 
whose root node is a hint
-  // node. When we lookup the cache with a semantically same plan 
without hint info, the plan
-  // returned by cache lookup should not have hint info. If we lookup 
the cache with a
-  // semantically same plan with a different hint info, 
`CacheManager.useCachedData` will take
-  // care of it and retain the hint info in the lookup input plan.
-  statsOfPlanToCache.copy(hints = HintInfo())
-} else {
-  Statistics(sizeInBytes = sizeInBytesStats.value.longValue)
+  def cachedColumnBuffers: RDD[CachedBatch] = {
+if (_cachedColumnBuffers == null) {
+  synchronized {
--- End diff --

`_cachedColumnBuffers` is `private[sql]`, so I'm not sure if this 
`synchronized` can be very effective.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183322924
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -155,31 +125,76 @@ case class InMemoryRelation(
 cached.setName(
   tableName.map(n => s"In-memory table $n")
 .getOrElse(StringUtils.abbreviate(child.toString, 1024)))
-_cachedColumnBuffers = cached
+cached
+  }
+}
+
+object InMemoryRelation {
+
+  def apply(
+  useCompression: Boolean,
+  batchSize: Int,
+  storageLevel: StorageLevel,
+  child: SparkPlan,
+  tableName: Option[String],
+  logicalPlan: LogicalPlan): InMemoryRelation = {
+val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, 
storageLevel, child, tableName)()
+new InMemoryRelation(child.output, cacheBuilder)(
+  statsOfPlanToCache = logicalPlan.stats, outputOrdering = 
logicalPlan.outputOrdering)
+  }
+
+  def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): 
InMemoryRelation = {
+new InMemoryRelation(cacheBuilder.child.output, cacheBuilder)(
+  statsOfPlanToCache = logicalPlan.stats, outputOrdering = 
logicalPlan.outputOrdering)
+  }
+}
+
+case class InMemoryRelation(
+output: Seq[Attribute],
+@transient cacheBuilder: CachedRDDBuilder)(
+statsOfPlanToCache: Statistics,
+override val outputOrdering: Seq[SortOrder])
+  extends logical.LeafNode with MultiInstanceRelation {
+
+  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
+
+  override def doCanonicalize(): logical.LogicalPlan =
+copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
+  cacheBuilder)(
+  statsOfPlanToCache,
+  outputOrdering)
+
+  override def producedAttributes: AttributeSet = outputSet
+
+  @transient val partitionStatistics = new PartitionStatistics(output)
+
+  val child: SparkPlan = cacheBuilder.child
--- End diff --

Do we need to expose `child: SparkPlan`? As it is a `logical.LeafNode`, 
it's a bit weird to have it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-22 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183288749
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -55,56 +42,38 @@ object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
 
-case class InMemoryRelation(
-output: Seq[Attribute],
+case class CachedRDDBuilder(
 useCompression: Boolean,
 batchSize: Int,
 storageLevel: StorageLevel,
 @transient child: SparkPlan,
 tableName: Option[String])(
-@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
-statsOfPlanToCache: Statistics,
-override val outputOrdering: Seq[SortOrder])
-  extends logical.LeafNode with MultiInstanceRelation {
-
-  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
-
-  override def doCanonicalize(): logical.LogicalPlan =
-copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
-  storageLevel = StorageLevel.NONE,
-  child = child.canonicalized,
-  tableName = None)(
-  _cachedColumnBuffers,
-  sizeInBytesStats,
-  statsOfPlanToCache,
-  outputOrdering)
-
-  override def producedAttributes: AttributeSet = outputSet
+@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = 
null) {
 
-  @transient val partitionStatistics = new PartitionStatistics(output)
+  val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator
 
-  override def computeStats(): Statistics = {
-if (sizeInBytesStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, use the stats 
from the plan to cache.
-  // Note that we should drop the hint info here. We may cache a plan 
whose root node is a hint
-  // node. When we lookup the cache with a semantically same plan 
without hint info, the plan
-  // returned by cache lookup should not have hint info. If we lookup 
the cache with a
-  // semantically same plan with a different hint info, 
`CacheManager.useCachedData` will take
-  // care of it and retain the hint info in the lookup input plan.
-  statsOfPlanToCache.copy(hints = HintInfo())
-} else {
-  Statistics(sizeInBytes = sizeInBytesStats.value.longValue)
+  def cachedColumnBuffers: RDD[CachedBatch] = {
+if (_cachedColumnBuffers == null) {
+  synchronized {
+if (_cachedColumnBuffers == null) {
+  _cachedColumnBuffers = buildBuffers()
+}
+  }
 }
+_cachedColumnBuffers
   }
 
-  // If the cached column buffers were not passed in, we calculate them in 
the constructor.
-  // As in Spark, the actual work of caching is lazy.
-  if (_cachedColumnBuffers == null) {
-buildBuffers()
+  def clearCache(blocking: Boolean = true): Unit = {
+if (_cachedColumnBuffers != null) {
+  synchronized {
+if (_cachedColumnBuffers != null) {
+  _cachedColumnBuffers.unpersist(blocking)
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-22 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183288666
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -794,4 +794,17 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   }
 }
   }
+
+  private def isMaterialized(df: DataFrame): Boolean = {
+val nodes = df.queryExecution.executedPlan.collect { case c: 
InMemoryTableScanExec => c }
+assert(nodes.nonEmpty, "DataFrame is not cached\n" + 
df.queryExecution.analyzed)
+nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null)
+  }
+
+  test("SPARK-23880 table cache should be lazy and don't trigger any 
jobs") {
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183269323
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -794,4 +794,17 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   }
 }
   }
+
+  private def isMaterialized(df: DataFrame): Boolean = {
+val nodes = df.queryExecution.executedPlan.collect { case c: 
InMemoryTableScanExec => c }
+assert(nodes.nonEmpty, "DataFrame is not cached\n" + 
df.queryExecution.analyzed)
+nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null)
+  }
+
+  test("SPARK-23880 table cache should be lazy and don't trigger any 
jobs") {
--- End diff --

I feel it's more clear to create a listener and explicitly show we don't 
trigger any jobs after calling `Dataset.cache`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183269227
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -794,4 +794,17 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   }
 }
   }
+
+  private def isMaterialized(df: DataFrame): Boolean = {
+val nodes = df.queryExecution.executedPlan.collect { case c: 
InMemoryTableScanExec => c }
+assert(nodes.nonEmpty, "DataFrame is not cached\n" + 
df.queryExecution.analyzed)
+nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null)
+  }
+
+  test("SPARK-23880 table cache should be lazy and don't trigger any 
jobs") {
--- End diff --

how does this test prove we don't trigger jobs?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183268807
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -55,56 +42,38 @@ object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
 
-case class InMemoryRelation(
-output: Seq[Attribute],
+case class CachedRDDBuilder(
 useCompression: Boolean,
 batchSize: Int,
 storageLevel: StorageLevel,
 @transient child: SparkPlan,
 tableName: Option[String])(
-@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
-statsOfPlanToCache: Statistics,
-override val outputOrdering: Seq[SortOrder])
-  extends logical.LeafNode with MultiInstanceRelation {
-
-  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
-
-  override def doCanonicalize(): logical.LogicalPlan =
-copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
-  storageLevel = StorageLevel.NONE,
-  child = child.canonicalized,
-  tableName = None)(
-  _cachedColumnBuffers,
-  sizeInBytesStats,
-  statsOfPlanToCache,
-  outputOrdering)
-
-  override def producedAttributes: AttributeSet = outputSet
+@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = 
null) {
 
-  @transient val partitionStatistics = new PartitionStatistics(output)
+  val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator
 
-  override def computeStats(): Statistics = {
-if (sizeInBytesStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, use the stats 
from the plan to cache.
-  // Note that we should drop the hint info here. We may cache a plan 
whose root node is a hint
-  // node. When we lookup the cache with a semantically same plan 
without hint info, the plan
-  // returned by cache lookup should not have hint info. If we lookup 
the cache with a
-  // semantically same plan with a different hint info, 
`CacheManager.useCachedData` will take
-  // care of it and retain the hint info in the lookup input plan.
-  statsOfPlanToCache.copy(hints = HintInfo())
-} else {
-  Statistics(sizeInBytes = sizeInBytesStats.value.longValue)
+  def cachedColumnBuffers: RDD[CachedBatch] = {
+if (_cachedColumnBuffers == null) {
+  synchronized {
+if (_cachedColumnBuffers == null) {
+  _cachedColumnBuffers = buildBuffers()
+}
+  }
 }
+_cachedColumnBuffers
   }
 
-  // If the cached column buffers were not passed in, we calculate them in 
the constructor.
-  // As in Spark, the actual work of caching is lazy.
-  if (_cachedColumnBuffers == null) {
-buildBuffers()
+  def clearCache(blocking: Boolean = true): Unit = {
+if (_cachedColumnBuffers != null) {
+  synchronized {
+if (_cachedColumnBuffers != null) {
+  _cachedColumnBuffers.unpersist(blocking)
--- End diff --

shall we also do `_cachedColumnBuffers  = null` so that `unpersist` won't 
be called twice?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-19 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r182929830
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -119,26 +119,60 @@ class CacheManager extends Logging {
 while (it.hasNext) {
   val cd = it.next()
   if (cd.plan.find(_.sameResult(plan)).isDefined) {
-cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+cd.cachedRepresentation.clearCache(blocking)
 it.remove()
   }
 }
   }
 
+  /**
+   * Materialize the cache that refers to the given physical plan.
--- End diff --

aha, I'll fix that way. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r182660703
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -119,26 +119,60 @@ class CacheManager extends Logging {
 while (it.hasNext) {
   val cd = it.next()
   if (cd.plan.find(_.sameResult(plan)).isDefined) {
-cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+cd.cachedRepresentation.clearCache(blocking)
 it.remove()
   }
 }
   }
 
+  /**
+   * Materialize the cache that refers to the given physical plan.
--- End diff --

```
class CachedRDDBuilder(private var _cachedColumnBuffers: RDD[CachedBatch] = 
null) {
  def cachedColumnBuffers = {
if (_cachedColumnBuffers == null) {
  synchronized {
if (_cachedColumnBuffers == null) {
  _cachedColumnBuffers = buildBuffer()
}
  }
}
_cachedColumnBuffers
  } 
}

class InMemoryRelation(cacheBuilder: CachedRDDBuilder = new 
CachedRDDBuilder()) {
  // newInstance should keep the existing CachedRDDBuilder
  def newInstance()...
}
```

then in the physical plan and cache manager, just call 
`relation.cacheBuilder.cachedColumnBuffers`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-18 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r182626260
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -119,26 +119,60 @@ class CacheManager extends Logging {
 while (it.hasNext) {
   val cd = it.next()
   if (cd.plan.find(_.sameResult(plan)).isDefined) {
-cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+cd.cachedRepresentation.clearCache(blocking)
 it.remove()
   }
 }
   }
 
+  /**
+   * Materialize the cache that refers to the given physical plan.
--- End diff --

I checked the suggested, but some queries didn't work well;
the query in the description was ok, but a query below wrongly cached two 
different RDDs (I checked the `Strorage` tab in the web UI);
```
scala> sql("SET spark.sql.crossJoin.enabled=true")
scala> val df = spark.range(1L).cache()
scala> df.join(df).show
```
This is because Analyzer copies an `InMemoryRelation` 
(`_cachedColumnBuffers` = null) node via `newInstance` then they build RDDs, 
respectively. Thought?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-18 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r182612846
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -119,26 +119,60 @@ class CacheManager extends Logging {
 while (it.hasNext) {
   val cd = it.next()
   if (cd.plan.find(_.sameResult(plan)).isDefined) {
-cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+cd.cachedRepresentation.clearCache(blocking)
 it.remove()
   }
 }
   }
 
+  /**
+   * Materialize the cache that refers to the given physical plan.
--- End diff --

I'll update today


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r181273485
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -119,26 +119,60 @@ class CacheManager extends Logging {
 while (it.hasNext) {
   val cd = it.next()
   if (cd.plan.find(_.sameResult(plan)).isDefined) {
-cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+cd.cachedRepresentation.clearCache(blocking)
 it.remove()
   }
 }
   }
 
+  /**
+   * Materialize the cache that refers to the given physical plan.
--- End diff --

once it's materialized, it's still materialized after copy 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r181273417
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -119,26 +119,60 @@ class CacheManager extends Logging {
 while (it.hasNext) {
   val cd = it.next()
   if (cd.plan.find(_.sameResult(plan)).isDefined) {
-cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+cd.cachedRepresentation.clearCache(blocking)
 it.remove()
   }
 }
   }
 
+  /**
+   * Materialize the cache that refers to the given physical plan.
--- End diff --

something like
```
class InMemoryRelation(private var _cachedColumnBuffers: RDD[CachedBatch] = 
null) {
  def cachedColumnBuffers = {
if (_cachedColumnBuffers == null) {
  synchronized {
if (_cachedColumnBuffers == null) {
  _cachedColumnBuffers = buildBuffer()
}
  }
}
_cachedColumnBuffers
  }
}

```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-12 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r181122693
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -119,26 +119,60 @@ class CacheManager extends Logging {
 while (it.hasNext) {
   val cd = it.next()
   if (cd.plan.find(_.sameResult(plan)).isDefined) {
-cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+cd.cachedRepresentation.clearCache(blocking)
 it.remove()
   }
 }
   }
 
+  /**
+   * Materialize the cache that refers to the given physical plan.
--- End diff --

I thought, since `InMemoryRelation` was copied in a tree sometimes, the 
lazy update of `_cachedColumnBuffers` always didn't lead to the  
materialization of the corresponding cache entry in `CacheManager` (maybe...).  
If so, following queries might have unnecessary matiralization repeatedly? 
Therefore, I though we needed to directly update the entry in `CacheManager`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r181093091
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -119,26 +119,60 @@ class CacheManager extends Logging {
 while (it.hasNext) {
   val cd = it.next()
   if (cd.plan.find(_.sameResult(plan)).isDefined) {
-cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+cd.cachedRepresentation.clearCache(blocking)
 it.remove()
   }
 }
   }
 
+  /**
+   * Materialize the cache that refers to the given physical plan.
--- End diff --

Why do we need to involve `CacheManager`? Shall we just make the creation 
of RDD lazy in `InMemoryRelation` and trigger the materialization in 
`InMemoryTableScanExec`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-11 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r180777891
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -119,26 +119,60 @@ class CacheManager extends Logging {
 while (it.hasNext) {
   val cd = it.next()
   if (cd.plan.find(_.sameResult(plan)).isDefined) {
-cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+cd.cachedRepresentation.clearCache(blocking)
 it.remove()
   }
 }
   }
 
+  /**
+   * Materialize the cache that refers to the given physical plan.
--- End diff --

The current approach of this pr is; `cache()` just registers an entry 
without building a `RDD` in `CacheManager` and then `InMemoryTableScanExec` 
re-registers an entry to build (materialize) a `RDD` in `CacheManager`. So, I 
added this function for  `InMemoryTableScanExec`  to re-register these entries 
in  `CacheManager`. But, I don't think this is the best, so I'd like to have 
any suggestion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r180717823
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -119,26 +119,60 @@ class CacheManager extends Logging {
 while (it.hasNext) {
   val cd = it.next()
   if (cd.plan.find(_.sameResult(plan)).isDefined) {
-cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+cd.cachedRepresentation.clearCache(blocking)
 it.remove()
   }
 }
   }
 
+  /**
+   * Materialize the cache that refers to the given physical plan.
--- End diff --

why do we do this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-09 Thread maropu
GitHub user maropu opened a pull request:

https://github.com/apache/spark/pull/21018

[SPARK-23880][SQL] Do not trigger any jobs for caching data

## What changes were proposed in this pull request?
This pr fixed code so that `cache` could prevent any jobs from being 
triggered.
For example, in the current master, an operation below triggers a actual 
job;
```
val df = spark.range(100L)
  .filter('id > 1000)
  .orderBy('id.desc)
  .cache()
```
This triggers a job while the cache should be lazy. The problem is that, 
when creating `InMemoryRelation`, we build the RDD, which calls 
`SparkPlan.execute` and may trigger jobs, like sampling job for range 
partitioner, or broadcast job.

This fix do not build a `RDD` in the constructor of `InMemoryRelation`. 
Then, `InMemoryTableScanExec` materializes the cache and updates the entry in 
`CacheManager`. 

## How was this patch tested?
Added tests in `CachedTableSuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maropu/spark SPARK-23880

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21018.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21018


commit 01d75d789c45f73bd999106dfc6f29cdc3050ce9
Author: Takeshi Yamamuro 
Date:   2018-04-09T09:30:10Z

Fix




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org