[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21018 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183907321 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -794,4 +795,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } } } + + private def checkIfNoJobTriggered(f: => DataFrame): DataFrame = { +var numJobTrigered = 0 +val jobListener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { +numJobTrigered += 1 + } +} +sparkContext.addSparkListener(jobListener) +try { + val df = f + assert(numJobTrigered === 0) + df +} finally { + sparkContext.removeSparkListener(jobListener) +} + } + + test("SPARK-23880 table cache should be lazy and don't trigger any jobs") { --- End diff -- oh, I'll recheck. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183903195 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -794,4 +795,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } } } + + private def checkIfNoJobTriggered(f: => DataFrame): DataFrame = { +var numJobTrigered = 0 +val jobListener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { +numJobTrigered += 1 + } +} +sparkContext.addSparkListener(jobListener) +try { + val df = f + assert(numJobTrigered === 0) + df +} finally { + sparkContext.removeSparkListener(jobListener) +} + } + + test("SPARK-23880 table cache should be lazy and don't trigger any jobs") { --- End diff -- Without the changes in this PR, this test still can pass. : ) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183638215 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -794,4 +795,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } } } + + private def checkIfNoJobTriggered(f: => DataFrame): DataFrame = { +var numJobTrigered = 0 +val jobListener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { +numJobTrigered += 1 + } +} +sparkContext.addSparkListener(jobListener) +try { + val df = f + assert(numJobTrigered === 0) --- End diff -- before this assert, we should make sure the event queue is empty, via `sparkContext.listenerBus.waitUntilEmpty` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183637688 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -154,32 +124,77 @@ case class InMemoryRelation( cached.setName( tableName.map(n => s"In-memory table $n") -.getOrElse(StringUtils.abbreviate(child.toString, 1024))) -_cachedColumnBuffers = cached +.getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024))) +cached + } +} + +object InMemoryRelation { + + def apply( + useCompression: Boolean, + batchSize: Int, + storageLevel: StorageLevel, + child: SparkPlan, + tableName: Option[String], + logicalPlan: LogicalPlan): InMemoryRelation = { +val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, storageLevel, child, tableName)() +new InMemoryRelation(child.output, cacheBuilder)( + statsOfPlanToCache = logicalPlan.stats, outputOrdering = logicalPlan.outputOrdering) + } + + def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): InMemoryRelation = { +new InMemoryRelation(cacheBuilder.cachedPlan.output, cacheBuilder)( + statsOfPlanToCache = logicalPlan.stats, outputOrdering = logicalPlan.outputOrdering) + } +} + +case class InMemoryRelation( +output: Seq[Attribute], +@transient cacheBuilder: CachedRDDBuilder)( +statsOfPlanToCache: Statistics, +override val outputOrdering: Seq[SortOrder]) + extends logical.LeafNode with MultiInstanceRelation { + + override protected def innerChildren: Seq[SparkPlan] = Seq(cachedPlan) + + override def doCanonicalize(): logical.LogicalPlan = +copy(output = output.map(QueryPlan.normalizeExprId(_, cachedPlan.output)), + cacheBuilder)( + statsOfPlanToCache, + outputOrdering) + + override def producedAttributes: AttributeSet = outputSet + + @transient val partitionStatistics = new PartitionStatistics(output) + + val cachedPlan: SparkPlan = cacheBuilder.cachedPlan --- End diff -- this should be `def`, or it will be serialized. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183614108 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -55,56 +42,39 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) -case class InMemoryRelation( -output: Seq[Attribute], +case class CachedRDDBuilder( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, @transient child: SparkPlan, tableName: Option[String])( -@transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, -statsOfPlanToCache: Statistics, -override val outputOrdering: Seq[SortOrder]) - extends logical.LeafNode with MultiInstanceRelation { - - override protected def innerChildren: Seq[SparkPlan] = Seq(child) - - override def doCanonicalize(): logical.LogicalPlan = -copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), - storageLevel = StorageLevel.NONE, - child = child.canonicalized, - tableName = None)( - _cachedColumnBuffers, - sizeInBytesStats, - statsOfPlanToCache, - outputOrdering) - - override def producedAttributes: AttributeSet = outputSet +@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null) { - @transient val partitionStatistics = new PartitionStatistics(output) + val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator - override def computeStats(): Statistics = { -if (sizeInBytesStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache. - // Note that we should drop the hint info here. We may cache a plan whose root node is a hint - // node. When we lookup the cache with a semantically same plan without hint info, the plan - // returned by cache lookup should not have hint info. If we lookup the cache with a - // semantically same plan with a different hint info, `CacheManager.useCachedData` will take - // care of it and retain the hint info in the lookup input plan. - statsOfPlanToCache.copy(hints = HintInfo()) -} else { - Statistics(sizeInBytes = sizeInBytesStats.value.longValue) + def cachedColumnBuffers: RDD[CachedBatch] = { +if (_cachedColumnBuffers == null) { + synchronized { --- End diff -- In this pr w/o `synchronized`, I found multi-thread queries wrongly built four RDDs for a single cache; ``` val cachedDf = spark.range(100).selectExpr("id AS k", "id AS v").cache for (i <- 0 to 3) { val thread = new Thread { override def run { // Start a job in each thread val df = cachedDf.filter('k > 5).groupBy().sum("v") df.collect } } thread.start } ``` Either way, I think we should make `_cachedColumnBuffers` private, so I fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183395070 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -55,56 +42,39 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) -case class InMemoryRelation( -output: Seq[Attribute], +case class CachedRDDBuilder( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, @transient child: SparkPlan, tableName: Option[String])( -@transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, -statsOfPlanToCache: Statistics, -override val outputOrdering: Seq[SortOrder]) - extends logical.LeafNode with MultiInstanceRelation { - - override protected def innerChildren: Seq[SparkPlan] = Seq(child) - - override def doCanonicalize(): logical.LogicalPlan = -copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), - storageLevel = StorageLevel.NONE, - child = child.canonicalized, - tableName = None)( - _cachedColumnBuffers, - sizeInBytesStats, - statsOfPlanToCache, - outputOrdering) - - override def producedAttributes: AttributeSet = outputSet +@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null) { - @transient val partitionStatistics = new PartitionStatistics(output) + val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator - override def computeStats(): Statistics = { -if (sizeInBytesStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache. - // Note that we should drop the hint info here. We may cache a plan whose root node is a hint - // node. When we lookup the cache with a semantically same plan without hint info, the plan - // returned by cache lookup should not have hint info. If we lookup the cache with a - // semantically same plan with a different hint info, `CacheManager.useCachedData` will take - // care of it and retain the hint info in the lookup input plan. - statsOfPlanToCache.copy(hints = HintInfo()) -} else { - Statistics(sizeInBytes = sizeInBytesStats.value.longValue) + def cachedColumnBuffers: RDD[CachedBatch] = { +if (_cachedColumnBuffers == null) { + synchronized { --- End diff -- oh, my bad. ok and I'll update soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183367771 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -55,56 +42,39 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) -case class InMemoryRelation( -output: Seq[Attribute], +case class CachedRDDBuilder( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, @transient child: SparkPlan, tableName: Option[String])( -@transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, -statsOfPlanToCache: Statistics, -override val outputOrdering: Seq[SortOrder]) - extends logical.LeafNode with MultiInstanceRelation { - - override protected def innerChildren: Seq[SparkPlan] = Seq(child) - - override def doCanonicalize(): logical.LogicalPlan = -copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), - storageLevel = StorageLevel.NONE, - child = child.canonicalized, - tableName = None)( - _cachedColumnBuffers, - sizeInBytesStats, - statsOfPlanToCache, - outputOrdering) - - override def producedAttributes: AttributeSet = outputSet +@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null) { - @transient val partitionStatistics = new PartitionStatistics(output) + val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator - override def computeStats(): Statistics = { -if (sizeInBytesStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache. - // Note that we should drop the hint info here. We may cache a plan whose root node is a hint - // node. When we lookup the cache with a semantically same plan without hint info, the plan - // returned by cache lookup should not have hint info. If we lookup the cache with a - // semantically same plan with a different hint info, `CacheManager.useCachedData` will take - // care of it and retain the hint info in the lookup input plan. - statsOfPlanToCache.copy(hints = HintInfo()) -} else { - Statistics(sizeInBytes = sizeInBytesStats.value.longValue) + def cachedColumnBuffers: RDD[CachedBatch] = { +if (_cachedColumnBuffers == null) { + synchronized { --- End diff -- We should not care about thread-safety at all or do it right. Please prove `CachedRDDBuilder` will never be accessed by multiple threads, or making `_cachedColumnBuffers` private. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183362225 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -155,31 +125,76 @@ case class InMemoryRelation( cached.setName( tableName.map(n => s"In-memory table $n") .getOrElse(StringUtils.abbreviate(child.toString, 1024))) -_cachedColumnBuffers = cached +cached + } +} + +object InMemoryRelation { + + def apply( + useCompression: Boolean, + batchSize: Int, + storageLevel: StorageLevel, + child: SparkPlan, + tableName: Option[String], + logicalPlan: LogicalPlan): InMemoryRelation = { +val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, storageLevel, child, tableName)() +new InMemoryRelation(child.output, cacheBuilder)( + statsOfPlanToCache = logicalPlan.stats, outputOrdering = logicalPlan.outputOrdering) + } + + def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): InMemoryRelation = { +new InMemoryRelation(cacheBuilder.child.output, cacheBuilder)( + statsOfPlanToCache = logicalPlan.stats, outputOrdering = logicalPlan.outputOrdering) + } +} + +case class InMemoryRelation( +output: Seq[Attribute], +@transient cacheBuilder: CachedRDDBuilder)( +statsOfPlanToCache: Statistics, +override val outputOrdering: Seq[SortOrder]) + extends logical.LeafNode with MultiInstanceRelation { + + override protected def innerChildren: Seq[SparkPlan] = Seq(child) + + override def doCanonicalize(): logical.LogicalPlan = +copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), + cacheBuilder)( + statsOfPlanToCache, + outputOrdering) + + override def producedAttributes: AttributeSet = outputSet + + @transient val partitionStatistics = new PartitionStatistics(output) + + val child: SparkPlan = cacheBuilder.child --- End diff -- Since `InMemoryTableScanExec` and the other places reference this variable, I kept this public. But, ya, I feel the name is a little weird. So, I renamed `child` to `cachedPlan`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183362208 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -55,56 +42,39 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) -case class InMemoryRelation( -output: Seq[Attribute], +case class CachedRDDBuilder( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, @transient child: SparkPlan, tableName: Option[String])( -@transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, -statsOfPlanToCache: Statistics, -override val outputOrdering: Seq[SortOrder]) - extends logical.LeafNode with MultiInstanceRelation { - - override protected def innerChildren: Seq[SparkPlan] = Seq(child) - - override def doCanonicalize(): logical.LogicalPlan = -copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), - storageLevel = StorageLevel.NONE, - child = child.canonicalized, - tableName = None)( - _cachedColumnBuffers, - sizeInBytesStats, - statsOfPlanToCache, - outputOrdering) - - override def producedAttributes: AttributeSet = outputSet +@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null) { - @transient val partitionStatistics = new PartitionStatistics(output) + val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator - override def computeStats(): Statistics = { -if (sizeInBytesStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache. - // Note that we should drop the hint info here. We may cache a plan whose root node is a hint - // node. When we lookup the cache with a semantically same plan without hint info, the plan - // returned by cache lookup should not have hint info. If we lookup the cache with a - // semantically same plan with a different hint info, `CacheManager.useCachedData` will take - // care of it and retain the hint info in the lookup input plan. - statsOfPlanToCache.copy(hints = HintInfo()) -} else { - Statistics(sizeInBytes = sizeInBytesStats.value.longValue) + def cachedColumnBuffers: RDD[CachedBatch] = { +if (_cachedColumnBuffers == null) { + synchronized { --- End diff -- I feel thread contention is low here, so I like simpler code. But, I welcome suggestions for more efficient&simpler code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183325713 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -55,56 +42,39 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) -case class InMemoryRelation( -output: Seq[Attribute], +case class CachedRDDBuilder( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, @transient child: SparkPlan, tableName: Option[String])( -@transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, -statsOfPlanToCache: Statistics, -override val outputOrdering: Seq[SortOrder]) - extends logical.LeafNode with MultiInstanceRelation { - - override protected def innerChildren: Seq[SparkPlan] = Seq(child) - - override def doCanonicalize(): logical.LogicalPlan = -copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), - storageLevel = StorageLevel.NONE, - child = child.canonicalized, - tableName = None)( - _cachedColumnBuffers, - sizeInBytesStats, - statsOfPlanToCache, - outputOrdering) - - override def producedAttributes: AttributeSet = outputSet +@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null) { - @transient val partitionStatistics = new PartitionStatistics(output) + val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator - override def computeStats(): Statistics = { -if (sizeInBytesStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache. - // Note that we should drop the hint info here. We may cache a plan whose root node is a hint - // node. When we lookup the cache with a semantically same plan without hint info, the plan - // returned by cache lookup should not have hint info. If we lookup the cache with a - // semantically same plan with a different hint info, `CacheManager.useCachedData` will take - // care of it and retain the hint info in the lookup input plan. - statsOfPlanToCache.copy(hints = HintInfo()) -} else { - Statistics(sizeInBytes = sizeInBytesStats.value.longValue) + def cachedColumnBuffers: RDD[CachedBatch] = { +if (_cachedColumnBuffers == null) { + synchronized { --- End diff -- `_cachedColumnBuffers` is `private[sql]`, so I'm not sure if this `synchronized` can be very effective. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183322924 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -155,31 +125,76 @@ case class InMemoryRelation( cached.setName( tableName.map(n => s"In-memory table $n") .getOrElse(StringUtils.abbreviate(child.toString, 1024))) -_cachedColumnBuffers = cached +cached + } +} + +object InMemoryRelation { + + def apply( + useCompression: Boolean, + batchSize: Int, + storageLevel: StorageLevel, + child: SparkPlan, + tableName: Option[String], + logicalPlan: LogicalPlan): InMemoryRelation = { +val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, storageLevel, child, tableName)() +new InMemoryRelation(child.output, cacheBuilder)( + statsOfPlanToCache = logicalPlan.stats, outputOrdering = logicalPlan.outputOrdering) + } + + def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): InMemoryRelation = { +new InMemoryRelation(cacheBuilder.child.output, cacheBuilder)( + statsOfPlanToCache = logicalPlan.stats, outputOrdering = logicalPlan.outputOrdering) + } +} + +case class InMemoryRelation( +output: Seq[Attribute], +@transient cacheBuilder: CachedRDDBuilder)( +statsOfPlanToCache: Statistics, +override val outputOrdering: Seq[SortOrder]) + extends logical.LeafNode with MultiInstanceRelation { + + override protected def innerChildren: Seq[SparkPlan] = Seq(child) + + override def doCanonicalize(): logical.LogicalPlan = +copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), + cacheBuilder)( + statsOfPlanToCache, + outputOrdering) + + override def producedAttributes: AttributeSet = outputSet + + @transient val partitionStatistics = new PartitionStatistics(output) + + val child: SparkPlan = cacheBuilder.child --- End diff -- Do we need to expose `child: SparkPlan`? As it is a `logical.LeafNode`, it's a bit weird to have it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183288749 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -55,56 +42,38 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) -case class InMemoryRelation( -output: Seq[Attribute], +case class CachedRDDBuilder( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, @transient child: SparkPlan, tableName: Option[String])( -@transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, -statsOfPlanToCache: Statistics, -override val outputOrdering: Seq[SortOrder]) - extends logical.LeafNode with MultiInstanceRelation { - - override protected def innerChildren: Seq[SparkPlan] = Seq(child) - - override def doCanonicalize(): logical.LogicalPlan = -copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), - storageLevel = StorageLevel.NONE, - child = child.canonicalized, - tableName = None)( - _cachedColumnBuffers, - sizeInBytesStats, - statsOfPlanToCache, - outputOrdering) - - override def producedAttributes: AttributeSet = outputSet +@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null) { - @transient val partitionStatistics = new PartitionStatistics(output) + val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator - override def computeStats(): Statistics = { -if (sizeInBytesStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache. - // Note that we should drop the hint info here. We may cache a plan whose root node is a hint - // node. When we lookup the cache with a semantically same plan without hint info, the plan - // returned by cache lookup should not have hint info. If we lookup the cache with a - // semantically same plan with a different hint info, `CacheManager.useCachedData` will take - // care of it and retain the hint info in the lookup input plan. - statsOfPlanToCache.copy(hints = HintInfo()) -} else { - Statistics(sizeInBytes = sizeInBytesStats.value.longValue) + def cachedColumnBuffers: RDD[CachedBatch] = { +if (_cachedColumnBuffers == null) { + synchronized { +if (_cachedColumnBuffers == null) { + _cachedColumnBuffers = buildBuffers() +} + } } +_cachedColumnBuffers } - // If the cached column buffers were not passed in, we calculate them in the constructor. - // As in Spark, the actual work of caching is lazy. - if (_cachedColumnBuffers == null) { -buildBuffers() + def clearCache(blocking: Boolean = true): Unit = { +if (_cachedColumnBuffers != null) { + synchronized { +if (_cachedColumnBuffers != null) { + _cachedColumnBuffers.unpersist(blocking) --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183288666 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -794,4 +794,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } } } + + private def isMaterialized(df: DataFrame): Boolean = { +val nodes = df.queryExecution.executedPlan.collect { case c: InMemoryTableScanExec => c } +assert(nodes.nonEmpty, "DataFrame is not cached\n" + df.queryExecution.analyzed) +nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null) + } + + test("SPARK-23880 table cache should be lazy and don't trigger any jobs") { --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183269323 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -794,4 +794,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } } } + + private def isMaterialized(df: DataFrame): Boolean = { +val nodes = df.queryExecution.executedPlan.collect { case c: InMemoryTableScanExec => c } +assert(nodes.nonEmpty, "DataFrame is not cached\n" + df.queryExecution.analyzed) +nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null) + } + + test("SPARK-23880 table cache should be lazy and don't trigger any jobs") { --- End diff -- I feel it's more clear to create a listener and explicitly show we don't trigger any jobs after calling `Dataset.cache` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183269227 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -794,4 +794,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } } } + + private def isMaterialized(df: DataFrame): Boolean = { +val nodes = df.queryExecution.executedPlan.collect { case c: InMemoryTableScanExec => c } +assert(nodes.nonEmpty, "DataFrame is not cached\n" + df.queryExecution.analyzed) +nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null) + } + + test("SPARK-23880 table cache should be lazy and don't trigger any jobs") { --- End diff -- how does this test prove we don't trigger jobs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183268807 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -55,56 +42,38 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) -case class InMemoryRelation( -output: Seq[Attribute], +case class CachedRDDBuilder( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, @transient child: SparkPlan, tableName: Option[String])( -@transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, -statsOfPlanToCache: Statistics, -override val outputOrdering: Seq[SortOrder]) - extends logical.LeafNode with MultiInstanceRelation { - - override protected def innerChildren: Seq[SparkPlan] = Seq(child) - - override def doCanonicalize(): logical.LogicalPlan = -copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), - storageLevel = StorageLevel.NONE, - child = child.canonicalized, - tableName = None)( - _cachedColumnBuffers, - sizeInBytesStats, - statsOfPlanToCache, - outputOrdering) - - override def producedAttributes: AttributeSet = outputSet +@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null) { - @transient val partitionStatistics = new PartitionStatistics(output) + val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator - override def computeStats(): Statistics = { -if (sizeInBytesStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache. - // Note that we should drop the hint info here. We may cache a plan whose root node is a hint - // node. When we lookup the cache with a semantically same plan without hint info, the plan - // returned by cache lookup should not have hint info. If we lookup the cache with a - // semantically same plan with a different hint info, `CacheManager.useCachedData` will take - // care of it and retain the hint info in the lookup input plan. - statsOfPlanToCache.copy(hints = HintInfo()) -} else { - Statistics(sizeInBytes = sizeInBytesStats.value.longValue) + def cachedColumnBuffers: RDD[CachedBatch] = { +if (_cachedColumnBuffers == null) { + synchronized { +if (_cachedColumnBuffers == null) { + _cachedColumnBuffers = buildBuffers() +} + } } +_cachedColumnBuffers } - // If the cached column buffers were not passed in, we calculate them in the constructor. - // As in Spark, the actual work of caching is lazy. - if (_cachedColumnBuffers == null) { -buildBuffers() + def clearCache(blocking: Boolean = true): Unit = { +if (_cachedColumnBuffers != null) { + synchronized { +if (_cachedColumnBuffers != null) { + _cachedColumnBuffers.unpersist(blocking) --- End diff -- shall we also do `_cachedColumnBuffers = null` so that `unpersist` won't be called twice? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r182929830 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -119,26 +119,60 @@ class CacheManager extends Logging { while (it.hasNext) { val cd = it.next() if (cd.plan.find(_.sameResult(plan)).isDefined) { -cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) +cd.cachedRepresentation.clearCache(blocking) it.remove() } } } + /** + * Materialize the cache that refers to the given physical plan. --- End diff -- aha, I'll fix that way. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r182660703 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -119,26 +119,60 @@ class CacheManager extends Logging { while (it.hasNext) { val cd = it.next() if (cd.plan.find(_.sameResult(plan)).isDefined) { -cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) +cd.cachedRepresentation.clearCache(blocking) it.remove() } } } + /** + * Materialize the cache that refers to the given physical plan. --- End diff -- ``` class CachedRDDBuilder(private var _cachedColumnBuffers: RDD[CachedBatch] = null) { def cachedColumnBuffers = { if (_cachedColumnBuffers == null) { synchronized { if (_cachedColumnBuffers == null) { _cachedColumnBuffers = buildBuffer() } } } _cachedColumnBuffers } } class InMemoryRelation(cacheBuilder: CachedRDDBuilder = new CachedRDDBuilder()) { // newInstance should keep the existing CachedRDDBuilder def newInstance()... } ``` then in the physical plan and cache manager, just call `relation.cacheBuilder.cachedColumnBuffers` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r182626260 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -119,26 +119,60 @@ class CacheManager extends Logging { while (it.hasNext) { val cd = it.next() if (cd.plan.find(_.sameResult(plan)).isDefined) { -cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) +cd.cachedRepresentation.clearCache(blocking) it.remove() } } } + /** + * Materialize the cache that refers to the given physical plan. --- End diff -- I checked the suggested, but some queries didn't work well; the query in the description was ok, but a query below wrongly cached two different RDDs (I checked the `Strorage` tab in the web UI); ``` scala> sql("SET spark.sql.crossJoin.enabled=true") scala> val df = spark.range(1L).cache() scala> df.join(df).show ``` This is because Analyzer copies an `InMemoryRelation` (`_cachedColumnBuffers` = null) node via `newInstance` then they build RDDs, respectively. Thought? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r182612846 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -119,26 +119,60 @@ class CacheManager extends Logging { while (it.hasNext) { val cd = it.next() if (cd.plan.find(_.sameResult(plan)).isDefined) { -cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) +cd.cachedRepresentation.clearCache(blocking) it.remove() } } } + /** + * Materialize the cache that refers to the given physical plan. --- End diff -- I'll update today --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r181273485 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -119,26 +119,60 @@ class CacheManager extends Logging { while (it.hasNext) { val cd = it.next() if (cd.plan.find(_.sameResult(plan)).isDefined) { -cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) +cd.cachedRepresentation.clearCache(blocking) it.remove() } } } + /** + * Materialize the cache that refers to the given physical plan. --- End diff -- once it's materialized, it's still materialized after copy --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r181273417 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -119,26 +119,60 @@ class CacheManager extends Logging { while (it.hasNext) { val cd = it.next() if (cd.plan.find(_.sameResult(plan)).isDefined) { -cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) +cd.cachedRepresentation.clearCache(blocking) it.remove() } } } + /** + * Materialize the cache that refers to the given physical plan. --- End diff -- something like ``` class InMemoryRelation(private var _cachedColumnBuffers: RDD[CachedBatch] = null) { def cachedColumnBuffers = { if (_cachedColumnBuffers == null) { synchronized { if (_cachedColumnBuffers == null) { _cachedColumnBuffers = buildBuffer() } } } _cachedColumnBuffers } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r181122693 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -119,26 +119,60 @@ class CacheManager extends Logging { while (it.hasNext) { val cd = it.next() if (cd.plan.find(_.sameResult(plan)).isDefined) { -cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) +cd.cachedRepresentation.clearCache(blocking) it.remove() } } } + /** + * Materialize the cache that refers to the given physical plan. --- End diff -- I thought, since `InMemoryRelation` was copied in a tree sometimes, the lazy update of `_cachedColumnBuffers` always didn't lead to the materialization of the corresponding cache entry in `CacheManager` (maybe...). If so, following queries might have unnecessary matiralization repeatedly? Therefore, I though we needed to directly update the entry in `CacheManager`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r181093091 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -119,26 +119,60 @@ class CacheManager extends Logging { while (it.hasNext) { val cd = it.next() if (cd.plan.find(_.sameResult(plan)).isDefined) { -cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) +cd.cachedRepresentation.clearCache(blocking) it.remove() } } } + /** + * Materialize the cache that refers to the given physical plan. --- End diff -- Why do we need to involve `CacheManager`? Shall we just make the creation of RDD lazy in `InMemoryRelation` and trigger the materialization in `InMemoryTableScanExec`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r180777891 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -119,26 +119,60 @@ class CacheManager extends Logging { while (it.hasNext) { val cd = it.next() if (cd.plan.find(_.sameResult(plan)).isDefined) { -cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) +cd.cachedRepresentation.clearCache(blocking) it.remove() } } } + /** + * Materialize the cache that refers to the given physical plan. --- End diff -- The current approach of this pr is; `cache()` just registers an entry without building a `RDD` in `CacheManager` and then `InMemoryTableScanExec` re-registers an entry to build (materialize) a `RDD` in `CacheManager`. So, I added this function for `InMemoryTableScanExec` to re-register these entries in `CacheManager`. But, I don't think this is the best, so I'd like to have any suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r180717823 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -119,26 +119,60 @@ class CacheManager extends Logging { while (it.hasNext) { val cd = it.next() if (cd.plan.find(_.sameResult(plan)).isDefined) { -cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) +cd.cachedRepresentation.clearCache(blocking) it.remove() } } } + /** + * Materialize the cache that refers to the given physical plan. --- End diff -- why do we do this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
GitHub user maropu opened a pull request: https://github.com/apache/spark/pull/21018 [SPARK-23880][SQL] Do not trigger any jobs for caching data ## What changes were proposed in this pull request? This pr fixed code so that `cache` could prevent any jobs from being triggered. For example, in the current master, an operation below triggers a actual job; ``` val df = spark.range(100L) .filter('id > 1000) .orderBy('id.desc) .cache() ``` This triggers a job while the cache should be lazy. The problem is that, when creating `InMemoryRelation`, we build the RDD, which calls `SparkPlan.execute` and may trigger jobs, like sampling job for range partitioner, or broadcast job. This fix do not build a `RDD` in the constructor of `InMemoryRelation`. Then, `InMemoryTableScanExec` materializes the cache and updates the entry in `CacheManager`. ## How was this patch tested? Added tests in `CachedTableSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maropu/spark SPARK-23880 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21018.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21018 commit 01d75d789c45f73bd999106dfc6f29cdc3050ce9 Author: Takeshi Yamamuro Date: 2018-04-09T09:30:10Z Fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org