Repository: spark Updated Branches: refs/heads/master 9eacc5e43 -> 9b57cd8d5
[SPARK-21133][CORE] Fix HighlyCompressedMapStatus#writeExternal throws NPE ## What changes were proposed in this pull request? Fix HighlyCompressedMapStatus#writeExternal NPE: ``` 17/06/18 15:00:27 ERROR Utils: Exception encountered java.lang.NullPointerException at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619) at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562) at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/06/18 15:00:27 ERROR MapOutputTrackerMaster: java.lang.NullPointerException java.io.IOException: java.lang.NullPointerException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1310) at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619) at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562) at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) ... 17 more 17/06/18 15:00:27 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.17.47.20:50188 17/06/18 15:00:27 ERROR Utils: Exception encountered java.lang.NullPointerException at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619) at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562) at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` ## How was this patch tested? manual tests Author: Yuming Wang <wgy...@gmail.com> Closes #18343 from wangyum/SPARK-21133. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b57cd8d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b57cd8d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b57cd8d Branch: refs/heads/master Commit: 9b57cd8d5c594731a7b3c90ce59bcddb05193d79 Parents: 9eacc5e Author: Yuming Wang <wgy...@gmail.com> Authored: Tue Jun 20 09:22:30 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Jun 20 09:22:30 2017 +0800 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/MapStatus.scala | 2 +- .../apache/spark/serializer/KryoSerializer.scala | 1 + .../apache/spark/scheduler/MapStatusSuite.scala | 18 ++++++++++++++++-- 3 files changed, 18 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9b57cd8d/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 048e0d0..5e45b37 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -141,7 +141,7 @@ private[spark] class HighlyCompressedMapStatus private ( private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long, - @transient private var hugeBlockSizes: Map[Int, Byte]) + private var hugeBlockSizes: Map[Int, Byte]) extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization http://git-wip-us.apache.org/repos/asf/spark/blob/9b57cd8d/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index e15166d..4f03e54 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -175,6 +175,7 @@ class KryoSerializer(conf: SparkConf) kryo.register(None.getClass) kryo.register(Nil.getClass) kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon")) + kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$")) kryo.register(classOf[ArrayBuffer[Any]]) kryo.setClassLoader(classLoader) http://git-wip-us.apache.org/repos/asf/spark/blob/9b57cd8d/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index 3ec37f6..e612013 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -24,9 +24,9 @@ import scala.util.Random import org.mockito.Mockito._ import org.roaringbitmap.RoaringBitmap -import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite} import org.apache.spark.internal.config -import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.storage.BlockManagerId class MapStatusSuite extends SparkFunSuite { @@ -154,4 +154,18 @@ class MapStatusSuite extends SparkFunSuite { case part => assert(status2.getSizeForBlock(part) >= sizes(part)) } } + + test("SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE") { + val conf = new SparkConf() + .set("spark.serializer", classOf[KryoSerializer].getName) + .setMaster("local") + .setAppName("SPARK-21133") + val sc = new SparkContext(conf) + try { + val count = sc.parallelize(0 until 3000, 10).repartition(2001).collect().length + assert(count === 3000) + } finally { + sc.stop() + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org