[ 
https://issues.apache.org/jira/browse/SPARK-48380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zheng Shao updated SPARK-48380:
-------------------------------
    Description: 



The stacktrace:

{{```}}
{{Py4JJavaError: An error occurred while calling o562.saveAsTable.}}
{{: org.apache.spark.SparkException: Job aborted due to stage failure: Task 
1811 in stage 8.0 failed 4 times, most recent failure: Lost task 1811.3 in 
stage 8.0 (TID 2782) (10.251.129.187 executor 70): 
java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 578595584 
because the size after growing exceeds size limitation 2147483632}}
{{    at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:66)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:201)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.writeArray(InterpretedUnsafeProjection.scala:322)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16(InterpretedUnsafeProjection.scala:200)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16$adapted(InterpretedUnsafeProjection.scala:198)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22(InterpretedUnsafeProjection.scala:288)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22$adapted(InterpretedUnsafeProjection.scala:286)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2(InterpretedUnsafeProjection.scala:123)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2$adapted(InterpretedUnsafeProjection.scala:120)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3(InterpretedUnsafeProjection.scala:67)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3$adapted(InterpretedUnsafeProjection.scala:65)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:90)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:36)}}
{{    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
{{    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
{{    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
{{    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
{{    at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:93)}}
{{    at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:83)}}
{{    at 
org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:474)}}
{{    at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeNextInputToStream(PythonRunner.scala:885)}}
{{    at 
org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:813)}}
{{    at 
org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:733)}}
{{    at 
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)}}
{{    at 
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)}}
{{    at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)}}
{{    at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:912)}}
{{    at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:904)}}
{{    at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:506)}}
{{    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)}}
{{    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)}}
{{    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
{{    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
{{    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
{{    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
{{    at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)}}
{{    at 
org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:134)}}
{{    at 
org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:148)}}
{{    at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:539)}}
{{    at 
org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:117)}}
{{    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)}}
{{    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)}}
{{    at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)}}
{{    at 
org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)}}
{{    at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)}}
{{    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)}}
{{    at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)}}
{{    at 
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)}}
{{```}}

It's caused by the following code: Let's say 10K almost-empty rows, and then 
10K rows with 1MB each.

{{{}```{}}}{{{}private[spark] class AutoBatchedPickler(iter: Iterator[Any]) 
extends Iterator[Array[Byte]] {{}}}
{{    private val pickle = new Pickler(/* useMemo = */ true,}}
{{      /* valueCompare = */ false)}}
{{    private var batch = 1}}
{{    private val buffer = new mutable.ArrayBuffer[Any]}}{{    override def 
hasNext: Boolean = iter.hasNext}}{{    override def next(): Array[Byte] = {}}
{{{}      while (iter.hasNext && buffer.length < batch){}}}{{{}{         buffer 
+= iter.next()       }{}}}{{      val bytes = pickle.dumps(buffer.toArray)}}
{{      val size = bytes.length}}
{{      // let  1M < size < 10M}}
{{{}      if (size < 1024 * 1024){}}}{{{}{         batch *= 2       
}{}}}{{{}else if (size > 1024 * 1024 * 10 && batch > 1){}}}{{{}{         batch 
/= 2       }{}}}{{      buffer.clear()}}
{{      bytes}}
{{    }}}
{{{}  }{}}}{{{}```{}}}

  was:
The stacktrace:


{{```}}
{{Py4JJavaError: An error occurred while calling o562.saveAsTable.}}
{{: org.apache.spark.SparkException: Job aborted due to stage failure: Task 
1811 in stage 8.0 failed 4 times, most recent failure: Lost task 1811.3 in 
stage 8.0 (TID 2782) (10.251.129.187 executor 70): 
java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 578595584 
because the size after growing exceeds size limitation 2147483632}}
{{    at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:66)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:201)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.writeArray(InterpretedUnsafeProjection.scala:322)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16(InterpretedUnsafeProjection.scala:200)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16$adapted(InterpretedUnsafeProjection.scala:198)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22(InterpretedUnsafeProjection.scala:288)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22$adapted(InterpretedUnsafeProjection.scala:286)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2(InterpretedUnsafeProjection.scala:123)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2$adapted(InterpretedUnsafeProjection.scala:120)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3(InterpretedUnsafeProjection.scala:67)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3$adapted(InterpretedUnsafeProjection.scala:65)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:90)}}
{{    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:36)}}
{{    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
{{    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
{{    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
{{    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
{{    at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:93)}}
{{    at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:83)}}
{{    at 
org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:474)}}
{{    at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeNextInputToStream(PythonRunner.scala:885)}}
{{    at 
org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:813)}}
{{    at 
org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:733)}}
{{    at 
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)}}
{{    at 
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)}}
{{    at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)}}
{{    at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:912)}}
{{    at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:904)}}
{{    at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:506)}}
{{    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)}}
{{    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)}}
{{    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
{{    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
{{    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
{{    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
{{    at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)}}
{{    at 
org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:134)}}
{{    at 
org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:148)}}
{{    at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:539)}}
{{    at 
org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:117)}}
{{    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)}}
{{    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)}}
{{    at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)}}
{{    at 
org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)}}
{{    at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)}}
{{    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)}}
{{    at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)}}
{{    at 
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)}}
{{```}}

It's caused by the following code: Let's say 10K almost-empty rows, and then 
10K rows with 1MB each.

{{{}```{}}}{{{}private[spark] class AutoBatchedPickler(iter: Iterator[Any]) 
extends Iterator[Array[Byte]] {{}}}
{{    private val pickle = new Pickler(/* useMemo = */ true,}}
{{      /* valueCompare = */ false)}}
{{    private var batch = 1}}
{{    private val buffer = new mutable.ArrayBuffer[Any]}}{{    override def 
hasNext: Boolean = iter.hasNext}}{{    override def next(): Array[Byte] = {}}
{{{}      while (iter.hasNext && buffer.length < batch){}}}{{{}{         buffer 
+= iter.next()       }{}}}{{      val bytes = pickle.dumps(buffer.toArray)}}
{{      val size = bytes.length}}
{{      // let  1M < size < 10M}}
{{{}      if (size < 1024 * 1024){}}}{{{}{         batch *= 2       
}{}}}{{{}else if (size > 1024 * 1024 * 10 && batch > 1){}}}{{{}{         batch 
/= 2       }{}}}{{      buffer.clear()}}
{{      bytes}}
{{    }}}
{{{}  }{}}}{{{}```{}}}


> AutoBatchedPickler caused Unsafe allocate to fail due to 2GB limit
> ------------------------------------------------------------------
>
>                 Key: SPARK-48380
>                 URL: https://issues.apache.org/jira/browse/SPARK-48380
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.5.1
>            Reporter: Zheng Shao
>            Priority: Major
>
> The stacktrace:
> {{```}}
> {{Py4JJavaError: An error occurred while calling o562.saveAsTable.}}
> {{: org.apache.spark.SparkException: Job aborted due to stage failure: Task 
> 1811 in stage 8.0 failed 4 times, most recent failure: Lost task 1811.3 in 
> stage 8.0 (TID 2782) (10.251.129.187 executor 70): 
> java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 
> 578595584 because the size after growing exceeds size limitation 2147483632}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:66)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:201)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.writeArray(InterpretedUnsafeProjection.scala:322)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16(InterpretedUnsafeProjection.scala:200)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16$adapted(InterpretedUnsafeProjection.scala:198)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22(InterpretedUnsafeProjection.scala:288)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22$adapted(InterpretedUnsafeProjection.scala:286)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2(InterpretedUnsafeProjection.scala:123)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2$adapted(InterpretedUnsafeProjection.scala:120)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3(InterpretedUnsafeProjection.scala:67)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3$adapted(InterpretedUnsafeProjection.scala:65)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:90)}}
> {{    at 
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:36)}}
> {{    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
> {{    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
> {{    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
> {{    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
> {{    at 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:93)}}
> {{    at 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:83)}}
> {{    at 
> org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:474)}}
> {{    at 
> org.apache.spark.api.python.PythonRunner$$anon$2.writeNextInputToStream(PythonRunner.scala:885)}}
> {{    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:813)}}
> {{    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:733)}}
> {{    at 
> java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)}}
> {{    at 
> java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)}}
> {{    at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)}}
> {{    at 
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:912)}}
> {{    at 
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:904)}}
> {{    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:506)}}
> {{    at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)}}
> {{    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)}}
> {{    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
> {{    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
> {{    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
> {{    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
> {{    at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)}}
> {{    at 
> org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:134)}}
> {{    at 
> org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:148)}}
> {{    at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:539)}}
> {{    at 
> org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:117)}}
> {{    at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)}}
> {{    at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)}}
> {{    at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)}}
> {{    at 
> org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)}}
> {{    at 
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)}}
> {{    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)}}
> {{    at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)}}
> {{    at 
> org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)}}
> {{```}}
> It's caused by the following code: Let's say 10K almost-empty rows, and then 
> 10K rows with 1MB each.
> {{{}```{}}}{{{}private[spark] class AutoBatchedPickler(iter: Iterator[Any]) 
> extends Iterator[Array[Byte]] {{}}}
> {{    private val pickle = new Pickler(/* useMemo = */ true,}}
> {{      /* valueCompare = */ false)}}
> {{    private var batch = 1}}
> {{    private val buffer = new mutable.ArrayBuffer[Any]}}{{    override def 
> hasNext: Boolean = iter.hasNext}}{{    override def next(): Array[Byte] = {}}
> {{{}      while (iter.hasNext && buffer.length < batch){}}}{{{}{         
> buffer += iter.next()       }{}}}{{      val bytes = 
> pickle.dumps(buffer.toArray)}}
> {{      val size = bytes.length}}
> {{      // let  1M < size < 10M}}
> {{{}      if (size < 1024 * 1024){}}}{{{}{         batch *= 2       
> }{}}}{{{}else if (size > 1024 * 1024 * 10 && batch > 1){}}}{{{}{         
> batch /= 2       }{}}}{{      buffer.clear()}}
> {{      bytes}}
> {{    }}}
> {{{}  }{}}}{{{}```{}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to