spark git commit: [SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill
Repository: spark Updated Branches: refs/heads/master 49da38e5f -> e04811137 [SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill Data Spill with UnsafeRow causes assert failure. ``` java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ``` To reproduce that with code (thanks andrewor14): ```scala bin/spark-shell --master local --conf spark.shuffle.memoryFraction=0.005 --conf spark.shuffle.sort.bypassMergeThreshold=0 sc.parallelize(1 to 2 * 1000 * 1000, 10) .map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count() ``` Author: Cheng HaoCloses #8635 from chenghao-intel/unsafe_spill. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0481113 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0481113 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0481113 Branch: refs/heads/master Commit: e04811137680f937669cdcc78771227aeb7cd849 Parents: 49da38e Author: Cheng Hao Authored: Thu Sep 10 11:48:43 2015 -0700 Committer: Andrew Or Committed: Thu Sep 10 11:48:43 2015 -0700 -- .../spark/util/collection/ExternalSorter.scala | 6 ++ .../sql/execution/UnsafeRowSerializer.scala | 2 +- .../execution/UnsafeRowSerializerSuite.scala| 64 ++-- 3 files changed, 67 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e0481113/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 19287ed..138c05d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -188,6 +188,12 @@ private[spark] class ExternalSorter[K, V, C]( private val spills = new ArrayBuffer[SpilledFile] + /** + * Number of files this sorter has spilled so far. + * Exposed for testing. + */ + private[spark] def numSpills: Int = spills.size + override def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined http://git-wip-us.apache.org/repos/asf/spark/blob/e0481113/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala index 5c18558..e060c06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala @@ -72,7 +72,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst override def writeKey[T: ClassTag](key: T): SerializationStream = { // The key is only needed on the map side when
spark git commit: [SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill
Repository: spark Updated Branches: refs/heads/branch-1.5 5e06d41a4 -> bc70043c8 [SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill Data Spill with UnsafeRow causes assert failure. ``` java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ``` To reproduce that with code (thanks andrewor14): ```scala bin/spark-shell --master local --conf spark.shuffle.memoryFraction=0.005 --conf spark.shuffle.sort.bypassMergeThreshold=0 sc.parallelize(1 to 2 * 1000 * 1000, 10) .map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count() ``` Author: Cheng HaoCloses #8635 from chenghao-intel/unsafe_spill. (cherry picked from commit e04811137680f937669cdcc78771227aeb7cd849) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc70043c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc70043c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc70043c Branch: refs/heads/branch-1.5 Commit: bc70043c8ebdc985ae4a02092b2750c22460d657 Parents: 5e06d41 Author: Cheng Hao Authored: Thu Sep 10 11:48:43 2015 -0700 Committer: Andrew Or Committed: Thu Sep 10 11:48:50 2015 -0700 -- .../spark/util/collection/ExternalSorter.scala | 6 ++ .../sql/execution/UnsafeRowSerializer.scala | 2 +- .../execution/UnsafeRowSerializerSuite.scala| 64 ++-- 3 files changed, 67 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc70043c/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 19287ed..138c05d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -188,6 +188,12 @@ private[spark] class ExternalSorter[K, V, C]( private val spills = new ArrayBuffer[SpilledFile] + /** + * Number of files this sorter has spilled so far. + * Exposed for testing. + */ + private[spark] def numSpills: Int = spills.size + override def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined http://git-wip-us.apache.org/repos/asf/spark/blob/bc70043c/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala index 5c18558..e060c06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala @@ -72,7 +72,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst