[ https://issues.apache.org/jira/browse/SPARK-48380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated SPARK-48380: ----------------------------------- Labels: pull-request-available (was: ) > AutoBatchedPickler caused Unsafe allocate to fail due to 2GB limit > ------------------------------------------------------------------ > > Key: SPARK-48380 > URL: https://issues.apache.org/jira/browse/SPARK-48380 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.5.1 > Reporter: Zheng Shao > Priority: Major > Labels: pull-request-available > > AutoBatchedPickler assumes that the row sizes are more or less uniform. > That's of course not true all the time. > It needs to find a more conservative algorithm. > Or simply, we should introduce an option to disable AutoBatch and pickle one > row at a time. > The stacktrace: > {{```}} > {{Py4JJavaError: An error occurred while calling o562.saveAsTable.}} > {{: org.apache.spark.SparkException: Job aborted due to stage failure: Task > 1811 in stage 8.0 failed 4 times, most recent failure: Lost task 1811.3 in > stage 8.0 (TID 2782) (10.251.129.187 executor 70): > java.lang.IllegalArgumentException: Cannot grow BufferHolder by size > 578595584 because the size after growing exceeds size limitation 2147483632}} > {{ at > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)}} > {{ at > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:66)}} > {{ at > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:201)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.writeArray(InterpretedUnsafeProjection.scala:322)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16(InterpretedUnsafeProjection.scala:200)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16$adapted(InterpretedUnsafeProjection.scala:198)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22(InterpretedUnsafeProjection.scala:288)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22$adapted(InterpretedUnsafeProjection.scala:286)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2(InterpretedUnsafeProjection.scala:123)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2$adapted(InterpretedUnsafeProjection.scala:120)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3(InterpretedUnsafeProjection.scala:67)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3$adapted(InterpretedUnsafeProjection.scala:65)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:90)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:36)}} > {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} > {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} > {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} > {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} > {{ at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:93)}} > {{ at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:83)}} > {{ at > org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:474)}} > {{ at > org.apache.spark.api.python.PythonRunner$$anon$2.writeNextInputToStream(PythonRunner.scala:885)}} > {{ at > org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:813)}} > {{ at > org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:733)}} > {{ at > java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)}} > {{ at > java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)}} > {{ at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)}} > {{ at > org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:912)}} > {{ at > org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:904)}} > {{ at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:506)}} > {{ at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)}} > {{ at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)}} > {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} > {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} > {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} > {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} > {{ at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)}} > {{ at > org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:134)}} > {{ at > org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:148)}} > {{ at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:539)}} > {{ at > org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:117)}} > {{ at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)}} > {{ at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)}} > {{ at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)}} > {{ at > org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)}} > {{ at > com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)}} > {{ at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)}} > {{ at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)}} > {{ at > org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)}} > {{```}} > It's caused by the following code: Let's say 10K almost-empty rows, and then > 10K rows with 1MB each. > {{ /**}} > {{ * Choose batch size based on size of objects}} > {{ */}} > {{ private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends > Iterator[Array[Byte]] {}} > {{ private val pickle = new Pickler(/* useMemo = */ true,}} > {{ /* valueCompare = */ false)}} > {{ private var batch = 1}} > {{ private val buffer = new mutable.ArrayBuffer[Any]}}{{ override def > hasNext: Boolean = iter.hasNext}}{{ override def next(): Array[Byte] = {}} > {{ while (iter.hasNext && buffer.length < batch) {}} > {{ buffer += iter.next()}} > {{ }}} > {{ val bytes = pickle.dumps(buffer.toArray)}} > {{ val size = bytes.length}} > {{ // let 1M < size < 10M}} > {{ if (size < 1024 * 1024) {}} > {{ batch *= 2}} > {{ } else if (size > 1024 * 1024 * 10 && batch > 1) {}} > {{ batch /= 2}} > {{ }}} > {{ buffer.clear()}} > {{ bytes}} > {{ }}} > {{ }}} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org