[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/13596 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/13596#discussion_r66701855 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -513,4 +541,23 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext spark.catalog.uncacheTable("t2") } } + + test("[SPARK-15870] DataFrame can't execute after uncacheTable") { +val selectStar = sql("SELECT * FROM testData WHERE key = 1") +selectStar.createOrReplaceTempView("selectStar") + +spark.catalog.cacheTable("selectStar") +assert( --- End diff -- this assert is unnecessary, we are testing the behaviour of caching and uncaching the same DataFrame, not validate the correctness of caching. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/13596#discussion_r66701821 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -513,4 +541,23 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext spark.catalog.uncacheTable("t2") } } + + test("[SPARK-15870] DataFrame can't execute after uncacheTable") { --- End diff -- nit: remove the `[]` around JIRA ticket number --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/13596#discussion_r66701808 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -333,17 +335,43 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext sql("SELECT * FROM t1").count() sql("SELECT * FROM t2").count() +val toBeCleanedAccIds = new HashSet[Long] + val accId1 = spark.table("t1").queryExecution.withCachedData.collect { case i: InMemoryRelation => i.batchStats.id }.head +toBeCleanedAccIds += accId1 val accId2 = spark.table("t1").queryExecution.withCachedData.collect { case i: InMemoryRelation => i.batchStats.id }.head +toBeCleanedAccIds += accId2 + +val cleanerListener = new CleanerListener { + def rddCleaned(rddId: Int): Unit = { + } --- End diff -- nit: can you put `{}` in one line? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13596#discussion_r66700916 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -321,7 +321,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(spark.sharedState.cacheManager.isEmpty) } - test("Clear accumulators when uncacheTable to prevent memory leaking") { + // This test would be flaky. + ignore("Ensure accumulators to be cleared after GC when uncacheTable") { --- End diff -- Thank you for the pointer. Let me check it and I'll update the test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/13596#discussion_r66700295 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -321,7 +321,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(spark.sharedState.cacheManager.isEmpty) } - test("Clear accumulators when uncacheTable to prevent memory leaking") { + // This test would be flaky. + ignore("Ensure accumulators to be cleared after GC when uncacheTable") { --- End diff -- how about we attach a listener to `ContextCleaner`, and watch the `accumCleaned` event? an example is: https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala#L406-L417 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/13596#discussion_r66700211 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -321,7 +321,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(spark.sharedState.cacheManager.isEmpty) } - test("Clear accumulators when uncacheTable to prevent memory leaking") { + // This test would be flaky. + ignore("Ensure accumulators to be cleared after GC when uncacheTable") { --- End diff -- This is the only risky part of this PR, I'll think about how to deterministically test it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/13596#discussion_r66700185 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -105,7 +105,7 @@ private[sql] class CacheManager extends Logging { val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) require(dataIndex >= 0, s"Table $query is not cached.") -cachedData(dataIndex).cachedRepresentation.uncache(blocking) + cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) --- End diff -- yea, the null setting looks useless, this change LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13596#discussion_r66698444 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -105,7 +105,7 @@ private[sql] class CacheManager extends Logging { val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) require(dataIndex >= 0, s"Table $query is not cached.") -cachedData(dataIndex).cachedRepresentation.uncache(blocking) + cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) --- End diff -- Yes, that's right. But I noticed that the original `InMemoryRelation` instance to be set `_cachedColumnBuffers` to `null` is not the same instance that will be executed by the `DataFrame` because it was copied by `withOutput` when `CacheManager` replace the logical plan for the `DataFrame`. So we don't need to set it to null and the original one will be collected by GC soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/13596#discussion_r66689439 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -105,7 +105,7 @@ private[sql] class CacheManager extends Logging { val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) require(dataIndex >= 0, s"Table $query is not cached.") -cachedData(dataIndex).cachedRepresentation.uncache(blocking) + cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) --- End diff -- this is slightly different with the previous version, as we also set `_cachedColumnBuffers` to null in `uncache` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/13596 [SPARK-15870][SQL] DataFrame can't execute after uncacheTable. ## What changes were proposed in this pull request? If a cached `DataFrame` executed more than once and then do `uncacheTable` like the following: ``` val selectStar = sql("SELECT * FROM testData WHERE key = 1") selectStar.createOrReplaceTempView("selectStar") spark.catalog.cacheTable("selectStar") checkAnswer( selectStar, Seq(Row(1, "1"))) spark.catalog.uncacheTable("selectStar") checkAnswer( selectStar, Seq(Row(1, "1"))) ``` , then the uncached `DataFrame` can't execute because of `Task not serializable` exception like: ``` org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2038) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:884) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:357) at org.apache.spark.rdd.RDD.collect(RDD.scala:883) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290) ... Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:153) at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ... ``` Notice that `DataFrame` uncached with `DataFrame.unpersist()` works, but with `spark.catalog.uncacheTable` doesn't work. This pr reverts a part of cf38fe0 not to unregister `batchStats` accumulator, which is not needed to be unregistered here because it will be done by `ContextCleaner` after it is collected by GC. ## How was this patch tested? Added a test to check if DataFrame can execute after uncacheTable and other existing tests. But I made a test to check if the accumulator was cleared as `ignore` because the test would be flaky. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-15870 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13596.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13596 commit 379b1dc90978fd2e3465b3cc240033943dbefd4c Author: Takuya UESHINDate: 2016-06-10T08:43:37Z Add a test to check if DataFrame can execute after uncacheTable. commit e844a7ea0995e0be17aa96a4381e9bae90b75c76 Author: Takuya UESHIN Date: 2016-06-10T08:46:00Z Revert a part of cf38fe0 not to unregister batchStats accumulator. commit 56082d99f63594ca838ebf22131695f4458238e4 Author: Takuya UESHIN Date: 2016-06-10T08:52:56Z Ignore a flaky test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org