[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14952 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/14952#discussion_r77699493 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -520,10 +520,11 @@ private[spark] class BlockManager( * * This does not acquire a lock on this block in this JVM. */ - private def getRemoteValues(blockId: BlockId): Option[BlockResult] = { + private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { +val ct = implicitly[ClassTag[T]] getRemoteBytes(blockId).map { data => val values = -serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true)) +serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct) --- End diff -- In this case, the problem is that the type parameter was inferred as `Any`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/14952#discussion_r77697794 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -520,10 +520,11 @@ private[spark] class BlockManager( * * This does not acquire a lock on this block in this JVM. */ - private def getRemoteValues(blockId: BlockId): Option[BlockResult] = { + private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { +val ct = implicitly[ClassTag[T]] getRemoteBytes(blockId).map { data => val values = -serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true)) +serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct) --- End diff -- How do you forget to pass a correct ClassTag when the compiler is enforcing its presence via the context bound? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14952#discussion_r77459578 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -520,10 +520,11 @@ private[spark] class BlockManager( * * This does not acquire a lock on this block in this JVM. */ - private def getRemoteValues(blockId: BlockId): Option[BlockResult] = { + private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { +val ct = implicitly[ClassTag[T]] getRemoteBytes(blockId).map { data => val values = -serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true)) +serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct) --- End diff -- It seems like it is easy to accidentally forget to pass a correct classtag, since this has happened twice already. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/14952#discussion_r77459444 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -520,10 +520,11 @@ private[spark] class BlockManager( * * This does not acquire a lock on this block in this JVM. */ - private def getRemoteValues(blockId: BlockId): Option[BlockResult] = { + private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { +val ct = implicitly[ClassTag[T]] getRemoteBytes(blockId).map { data => val values = -serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true)) +serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct) --- End diff -- I'm not saying this should definitely be done one way or the other, but I'm curious why you have a preference for the extra code and more verbose API that come with making the classTag an explicit parameter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14952#discussion_r77442140 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -520,10 +520,11 @@ private[spark] class BlockManager( * * This does not acquire a lock on this block in this JVM. */ - private def getRemoteValues(blockId: BlockId): Option[BlockResult] = { + private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { +val ct = implicitly[ClassTag[T]] getRemoteBytes(blockId).map { data => val values = -serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true)) +serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct) --- End diff -- Is it possible for dataDeserializeStream to require a classtag to be explicitly passed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...
GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/14952 [SPARK-17110] Fix StreamCorruptionException in BlockManager.getRemoteValues() ## What changes were proposed in this pull request? This patch fixes a `java.io.StreamCorruptedException` error affecting remote reads of cached values when certain data types are used. The problem stems from #11801 / SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs. If PySpark cached a PythonRDD, then this would be cached as an `RDD[Array[Byte]]` and the automatic serializer selection would pick KryoSerializer for replication and block transfer. However, the `getRemoteValues()` / `getRemoteBytes()` code path did not pass proper class tags in order to enable the same serializer to be used during deserialization, causing Java to be inappropriately used instead of Kryo, leading to the StreamCorruptedException. We already fixed a similar bug in #14311, which dealt with similar issues in block replication. Prior to that patch, it seems that we had no tests to ensure that block replication actually succeeded. Similarly, prior to this bug fix patch it looks like we had no tests to perform remote reads of cached data, which is why this bug was able to remain latent for so long. This patch addresses the bug by modifying `BlockManager`'s `get()` and `getRemoteValues()` methods to accept ClassTags, allowing the proper class tag to be threaded in the `getOrElseUpdate` code path (which is used by `rdd.iterator`) ## How was this patch tested? Extended the caching tests in `DistributedSuite` to exercise the `getRemoteValues` path, plus manual testing to verify that the PySpark bug reproduction in SPARK-17110 is fixed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark SPARK-17110 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14952.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14952 commit 470380e48a9bf574ee6cfc2700bd044b70276cd8 Author: Josh RosenDate: 2016-09-03T17:26:52Z Add regression test. commit 9eb75f57bbb7ee0c555bbdd26cf4187ee0ad3671 Author: Josh Rosen Date: 2016-09-03T17:31:43Z Fix bug by threading proper ClassTag --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org