[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...

2016-09-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/14952


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...

2016-09-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14952#discussion_r77699493
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -520,10 +520,11 @@ private[spark] class BlockManager(
*
* This does not acquire a lock on this block in this JVM.
*/
-  private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+  private def getRemoteValues[T: ClassTag](blockId: BlockId): 
Option[BlockResult] = {
+val ct = implicitly[ClassTag[T]]
 getRemoteBytes(blockId).map { data =>
   val values =
-serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))
+serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))(ct)
--- End diff --

In this case, the problem is that the type parameter was inferred as `Any`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...

2016-09-06 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14952#discussion_r77697794
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -520,10 +520,11 @@ private[spark] class BlockManager(
*
* This does not acquire a lock on this block in this JVM.
*/
-  private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+  private def getRemoteValues[T: ClassTag](blockId: BlockId): 
Option[BlockResult] = {
+val ct = implicitly[ClassTag[T]]
 getRemoteBytes(blockId).map { data =>
   val values =
-serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))
+serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))(ct)
--- End diff --

How do you forget to pass a correct ClassTag when the compiler is enforcing 
its presence via the context bound?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...

2016-09-04 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14952#discussion_r77459578
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -520,10 +520,11 @@ private[spark] class BlockManager(
*
* This does not acquire a lock on this block in this JVM.
*/
-  private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+  private def getRemoteValues[T: ClassTag](blockId: BlockId): 
Option[BlockResult] = {
+val ct = implicitly[ClassTag[T]]
 getRemoteBytes(blockId).map { data =>
   val values =
-serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))
+serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))(ct)
--- End diff --

It seems like it is easy to accidentally forget to pass a correct classtag, 
since this has happened twice already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...

2016-09-04 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14952#discussion_r77459444
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -520,10 +520,11 @@ private[spark] class BlockManager(
*
* This does not acquire a lock on this block in this JVM.
*/
-  private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+  private def getRemoteValues[T: ClassTag](blockId: BlockId): 
Option[BlockResult] = {
+val ct = implicitly[ClassTag[T]]
 getRemoteBytes(blockId).map { data =>
   val values =
-serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))
+serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))(ct)
--- End diff --

I'm not saying this should definitely be done one way or the other, but I'm 
curious why you have a preference for the extra code and more verbose API that 
come with making the classTag an explicit parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...

2016-09-03 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14952#discussion_r77442140
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -520,10 +520,11 @@ private[spark] class BlockManager(
*
* This does not acquire a lock on this block in this JVM.
*/
-  private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+  private def getRemoteValues[T: ClassTag](blockId: BlockId): 
Option[BlockResult] = {
+val ct = implicitly[ClassTag[T]]
 getRemoteBytes(blockId).map { data =>
   val values =
-serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))
+serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))(ct)
--- End diff --

Is it possible for dataDeserializeStream to require a classtag to be 
explicitly passed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...

2016-09-03 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/14952

[SPARK-17110] Fix StreamCorruptionException in 
BlockManager.getRemoteValues()

## What changes were proposed in this pull request?

This patch fixes a `java.io.StreamCorruptedException` error affecting 
remote reads of cached values when certain data types are used. The problem 
stems from #11801 / SPARK-13990, a patch to have Spark automatically pick the 
"best" serializer when caching RDDs. If PySpark cached a PythonRDD, then this 
would be cached as an `RDD[Array[Byte]]` and the automatic serializer selection 
would pick KryoSerializer for replication and block transfer. However, the 
`getRemoteValues()` / `getRemoteBytes()` code path did not pass proper class 
tags in order to enable the same serializer to be used during deserialization, 
causing Java to be inappropriately used instead of Kryo, leading to the 
StreamCorruptedException.

We already fixed a similar bug in #14311, which dealt with similar issues 
in block replication. Prior to that patch, it seems that we had no tests to 
ensure that block replication actually succeeded. Similarly, prior to this bug 
fix patch it looks like we had no tests to perform remote reads of cached data, 
which is why this bug was able to remain latent for so long.

This patch addresses the bug by modifying `BlockManager`'s `get()` and  
`getRemoteValues()` methods to accept ClassTags, allowing the proper class tag 
to be threaded in the `getOrElseUpdate` code path (which is used by 
`rdd.iterator`)

## How was this patch tested?

Extended the caching tests in `DistributedSuite` to exercise the 
`getRemoteValues` path, plus manual testing to verify that the PySpark bug 
reproduction in SPARK-17110 is fixed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark SPARK-17110

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14952.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14952


commit 470380e48a9bf574ee6cfc2700bd044b70276cd8
Author: Josh Rosen 
Date:   2016-09-03T17:26:52Z

Add regression test.

commit 9eb75f57bbb7ee0c555bbdd26cf4187ee0ad3671
Author: Josh Rosen 
Date:   2016-09-03T17:31:43Z

Fix bug by threading proper ClassTag




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org