[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...

2016-06-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/13596


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...

2016-06-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13596#discussion_r66701855
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -513,4 +541,23 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   spark.catalog.uncacheTable("t2")
 }
   }
+
+  test("[SPARK-15870] DataFrame can't execute after uncacheTable") {
+val selectStar = sql("SELECT * FROM testData WHERE key = 1")
+selectStar.createOrReplaceTempView("selectStar")
+
+spark.catalog.cacheTable("selectStar")
+assert(
--- End diff --

this assert is unnecessary, we are testing the behaviour of caching and 
uncaching the same DataFrame, not validate the correctness of caching.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...

2016-06-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13596#discussion_r66701821
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -513,4 +541,23 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   spark.catalog.uncacheTable("t2")
 }
   }
+
+  test("[SPARK-15870] DataFrame can't execute after uncacheTable") {
--- End diff --

nit: remove the `[]` around JIRA ticket number


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...

2016-06-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13596#discussion_r66701808
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -333,17 +335,43 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
 sql("SELECT * FROM t1").count()
 sql("SELECT * FROM t2").count()
 
+val toBeCleanedAccIds = new HashSet[Long]
+
 val accId1 = spark.table("t1").queryExecution.withCachedData.collect {
   case i: InMemoryRelation => i.batchStats.id
 }.head
+toBeCleanedAccIds += accId1
 
 val accId2 = spark.table("t1").queryExecution.withCachedData.collect {
   case i: InMemoryRelation => i.batchStats.id
 }.head
+toBeCleanedAccIds += accId2
+
+val cleanerListener = new CleanerListener {
+  def rddCleaned(rddId: Int): Unit = {
+  }
--- End diff --

nit: can you put `{}` in one line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...

2016-06-10 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13596#discussion_r66700916
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -321,7 +321,8 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
 assert(spark.sharedState.cacheManager.isEmpty)
   }
 
-  test("Clear accumulators when uncacheTable to prevent memory leaking") {
+  // This test would be flaky.
+  ignore("Ensure accumulators to be cleared after GC when uncacheTable") {
--- End diff --

Thank you for the pointer.
Let me check it and I'll update the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...

2016-06-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13596#discussion_r66700295
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -321,7 +321,8 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
 assert(spark.sharedState.cacheManager.isEmpty)
   }
 
-  test("Clear accumulators when uncacheTable to prevent memory leaking") {
+  // This test would be flaky.
+  ignore("Ensure accumulators to be cleared after GC when uncacheTable") {
--- End diff --

how about we attach a listener to `ContextCleaner`, and watch the 
`accumCleaned` event? an example is: 
https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala#L406-L417


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...

2016-06-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13596#discussion_r66700211
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -321,7 +321,8 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
 assert(spark.sharedState.cacheManager.isEmpty)
   }
 
-  test("Clear accumulators when uncacheTable to prevent memory leaking") {
+  // This test would be flaky.
+  ignore("Ensure accumulators to be cleared after GC when uncacheTable") {
--- End diff --

This is the only risky part of this PR, I'll think about how to 
deterministically test it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...

2016-06-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13596#discussion_r66700185
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -105,7 +105,7 @@ private[sql] class CacheManager extends Logging {
 val planToCache = query.queryExecution.analyzed
 val dataIndex = cachedData.indexWhere(cd => 
planToCache.sameResult(cd.plan))
 require(dataIndex >= 0, s"Table $query is not cached.")
-cachedData(dataIndex).cachedRepresentation.uncache(blocking)
+
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
--- End diff --

yea, the null setting looks useless, this change LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...

2016-06-10 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13596#discussion_r66698444
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -105,7 +105,7 @@ private[sql] class CacheManager extends Logging {
 val planToCache = query.queryExecution.analyzed
 val dataIndex = cachedData.indexWhere(cd => 
planToCache.sameResult(cd.plan))
 require(dataIndex >= 0, s"Table $query is not cached.")
-cachedData(dataIndex).cachedRepresentation.uncache(blocking)
+
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
--- End diff --

Yes, that's right.
But I noticed that the original `InMemoryRelation` instance to be set 
`_cachedColumnBuffers` to `null` is not the same instance that will be executed 
by the `DataFrame` because it was copied by `withOutput` when `CacheManager` 
replace the logical plan for the `DataFrame`.
So we don't need to set it to null and the original one will be collected 
by GC soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...

2016-06-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13596#discussion_r66689439
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -105,7 +105,7 @@ private[sql] class CacheManager extends Logging {
 val planToCache = query.queryExecution.analyzed
 val dataIndex = cachedData.indexWhere(cd => 
planToCache.sameResult(cd.plan))
 require(dataIndex >= 0, s"Table $query is not cached.")
-cachedData(dataIndex).cachedRepresentation.uncache(blocking)
+
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
--- End diff --

this is slightly different with the previous version, as we also set 
`_cachedColumnBuffers` to null in `uncache`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13596: [SPARK-15870][SQL] DataFrame can't execute after ...

2016-06-10 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/13596

[SPARK-15870][SQL] DataFrame can't execute after uncacheTable.

## What changes were proposed in this pull request?

If a cached `DataFrame` executed more than once and then do `uncacheTable` 
like the following:

```
val selectStar = sql("SELECT * FROM testData WHERE key = 1")
selectStar.createOrReplaceTempView("selectStar")

spark.catalog.cacheTable("selectStar")
checkAnswer(
  selectStar,
  Seq(Row(1, "1")))

spark.catalog.uncacheTable("selectStar")
checkAnswer(
  selectStar,
  Seq(Row(1, "1")))
```

, then the uncached `DataFrame` can't execute because of `Task not 
serializable` exception like:

```
org.apache.spark.SparkException: Task not serializable
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2038)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:884)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
at org.apache.spark.rdd.RDD.collect(RDD.scala:883)
at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
...
Caused by: java.lang.UnsupportedOperationException: Accumulator must be 
registered before send to executor
at 
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:153)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
...
```

Notice that `DataFrame` uncached with `DataFrame.unpersist()` works, but 
with `spark.catalog.uncacheTable` doesn't work.

This pr reverts a part of cf38fe0 not to unregister `batchStats` 
accumulator, which is not needed to be unregistered here because it will be 
done by `ContextCleaner` after it is collected by GC.

## How was this patch tested?

Added a test to check if DataFrame can execute after uncacheTable and other 
existing tests.
But I made a test to check if the accumulator was cleared as `ignore` 
because the test would be flaky.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark issues/SPARK-15870

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/13596.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #13596


commit 379b1dc90978fd2e3465b3cc240033943dbefd4c
Author: Takuya UESHIN 
Date:   2016-06-10T08:43:37Z

Add a test to check if DataFrame can execute after uncacheTable.

commit e844a7ea0995e0be17aa96a4381e9bae90b75c76
Author: Takuya UESHIN 
Date:   2016-06-10T08:46:00Z

Revert a part of cf38fe0 not to unregister batchStats accumulator.

commit 56082d99f63594ca838ebf22131695f4458238e4
Author: Takuya UESHIN 
Date:   2016-06-10T08:52:56Z

Ignore a flaky test.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org