[ https://issues.apache.org/jira/browse/SPARK-48380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zheng Shao updated SPARK-48380: ------------------------------- Description: AutoBatchedPickler assumes that the row sizes are more or less uniform. That's of course not true all the time. It needs to find a more conservative algorithm. Or simply, we should introduce an option to disable AutoBatch and pickle one row at a time. The stacktrace: {{```}} {{Py4JJavaError: An error occurred while calling o562.saveAsTable.}} {{: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1811 in stage 8.0 failed 4 times, most recent failure: Lost task 1811.3 in stage 8.0 (TID 2782) (10.251.129.187 executor 70): java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 578595584 because the size after growing exceeds size limitation 2147483632}} {{ at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)}} {{ at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:66)}} {{ at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:201)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.writeArray(InterpretedUnsafeProjection.scala:322)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16(InterpretedUnsafeProjection.scala:200)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16$adapted(InterpretedUnsafeProjection.scala:198)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22(InterpretedUnsafeProjection.scala:288)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22$adapted(InterpretedUnsafeProjection.scala:286)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2(InterpretedUnsafeProjection.scala:123)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2$adapted(InterpretedUnsafeProjection.scala:120)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3(InterpretedUnsafeProjection.scala:67)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3$adapted(InterpretedUnsafeProjection.scala:65)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:90)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:36)}} {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} {{ at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:93)}} {{ at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:83)}} {{ at org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:474)}} {{ at org.apache.spark.api.python.PythonRunner$$anon$2.writeNextInputToStream(PythonRunner.scala:885)}} {{ at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:813)}} {{ at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:733)}} {{ at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)}} {{ at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)}} {{ at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)}} {{ at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:912)}} {{ at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:904)}} {{ at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:506)}} {{ at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)}} {{ at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)}} {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} {{ at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)}} {{ at org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:134)}} {{ at org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:148)}} {{ at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:539)}} {{ at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:117)}} {{ at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)}} {{ at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)}} {{ at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)}} {{ at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)}} {{ at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)}} {{ at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)}} {{ at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)}} {{ at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)}} {{```}} It's caused by the following code: Let's say 10K almost-empty rows, and then 10K rows with 1MB each. {{{}```{}}}{{{}private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {{}}} {{ private val pickle = new Pickler(/* useMemo = */ true,}} {{ /* valueCompare = */ false)}} {{ private var batch = 1}} {{ private val buffer = new mutable.ArrayBuffer[Any]}}{{ override def hasNext: Boolean = iter.hasNext}}{{ override def next(): Array[Byte] = {}} {{{} while (iter.hasNext && buffer.length < batch){}}}{{{}{ buffer += iter.next() }{}}}{{ val bytes = pickle.dumps(buffer.toArray)}} {{ val size = bytes.length}} {{ // let 1M < size < 10M}} {{{} if (size < 1024 * 1024){}}}{{{}{ batch *= 2 }{}}}{{{}else if (size > 1024 * 1024 * 10 && batch > 1){}}}{{{}{ batch /= 2 }{}}}{{ buffer.clear()}} {{ bytes}} {{ }}} {{{} }{}}}{{{}```{}}} was: The stacktrace: {{```}} {{Py4JJavaError: An error occurred while calling o562.saveAsTable.}} {{: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1811 in stage 8.0 failed 4 times, most recent failure: Lost task 1811.3 in stage 8.0 (TID 2782) (10.251.129.187 executor 70): java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 578595584 because the size after growing exceeds size limitation 2147483632}} {{ at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)}} {{ at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:66)}} {{ at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:201)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.writeArray(InterpretedUnsafeProjection.scala:322)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16(InterpretedUnsafeProjection.scala:200)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16$adapted(InterpretedUnsafeProjection.scala:198)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22(InterpretedUnsafeProjection.scala:288)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22$adapted(InterpretedUnsafeProjection.scala:286)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2(InterpretedUnsafeProjection.scala:123)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2$adapted(InterpretedUnsafeProjection.scala:120)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3(InterpretedUnsafeProjection.scala:67)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3$adapted(InterpretedUnsafeProjection.scala:65)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:90)}} {{ at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:36)}} {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} {{ at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:93)}} {{ at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:83)}} {{ at org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:474)}} {{ at org.apache.spark.api.python.PythonRunner$$anon$2.writeNextInputToStream(PythonRunner.scala:885)}} {{ at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:813)}} {{ at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:733)}} {{ at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)}} {{ at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)}} {{ at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)}} {{ at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:912)}} {{ at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:904)}} {{ at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:506)}} {{ at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)}} {{ at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)}} {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} {{ at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)}} {{ at org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:134)}} {{ at org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:148)}} {{ at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:539)}} {{ at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:117)}} {{ at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)}} {{ at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)}} {{ at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)}} {{ at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)}} {{ at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)}} {{ at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)}} {{ at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)}} {{ at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)}} {{```}} It's caused by the following code: Let's say 10K almost-empty rows, and then 10K rows with 1MB each. {{{}```{}}}{{{}private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {{}}} {{ private val pickle = new Pickler(/* useMemo = */ true,}} {{ /* valueCompare = */ false)}} {{ private var batch = 1}} {{ private val buffer = new mutable.ArrayBuffer[Any]}}{{ override def hasNext: Boolean = iter.hasNext}}{{ override def next(): Array[Byte] = {}} {{{} while (iter.hasNext && buffer.length < batch){}}}{{{}{ buffer += iter.next() }{}}}{{ val bytes = pickle.dumps(buffer.toArray)}} {{ val size = bytes.length}} {{ // let 1M < size < 10M}} {{{} if (size < 1024 * 1024){}}}{{{}{ batch *= 2 }{}}}{{{}else if (size > 1024 * 1024 * 10 && batch > 1){}}}{{{}{ batch /= 2 }{}}}{{ buffer.clear()}} {{ bytes}} {{ }}} {{{} }{}}}{{{}```{}}} > AutoBatchedPickler caused Unsafe allocate to fail due to 2GB limit > ------------------------------------------------------------------ > > Key: SPARK-48380 > URL: https://issues.apache.org/jira/browse/SPARK-48380 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.5.1 > Reporter: Zheng Shao > Priority: Major > > AutoBatchedPickler assumes that the row sizes are more or less uniform. > That's of course not true all the time. > It needs to find a more conservative algorithm. > Or simply, we should introduce an option to disable AutoBatch and pickle one > row at a time. > The stacktrace: > {{```}} > {{Py4JJavaError: An error occurred while calling o562.saveAsTable.}} > {{: org.apache.spark.SparkException: Job aborted due to stage failure: Task > 1811 in stage 8.0 failed 4 times, most recent failure: Lost task 1811.3 in > stage 8.0 (TID 2782) (10.251.129.187 executor 70): > java.lang.IllegalArgumentException: Cannot grow BufferHolder by size > 578595584 because the size after growing exceeds size limitation 2147483632}} > {{ at > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)}} > {{ at > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:66)}} > {{ at > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:201)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.writeArray(InterpretedUnsafeProjection.scala:322)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16(InterpretedUnsafeProjection.scala:200)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16$adapted(InterpretedUnsafeProjection.scala:198)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22(InterpretedUnsafeProjection.scala:288)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22$adapted(InterpretedUnsafeProjection.scala:286)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2(InterpretedUnsafeProjection.scala:123)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2$adapted(InterpretedUnsafeProjection.scala:120)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3(InterpretedUnsafeProjection.scala:67)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3$adapted(InterpretedUnsafeProjection.scala:65)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:90)}} > {{ at > org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:36)}} > {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} > {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} > {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} > {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}} > {{ at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:93)}} > {{ at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:83)}} > {{ at > org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:474)}} > {{ at > org.apache.spark.api.python.PythonRunner$$anon$2.writeNextInputToStream(PythonRunner.scala:885)}} > {{ at > org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:813)}} > {{ at > org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:733)}} > {{ at > java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)}} > {{ at > java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)}} > {{ at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)}} > {{ at > org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:912)}} > {{ at > org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:904)}} > {{ at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:506)}} > {{ at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)}} > {{ at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)}} > {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} > {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} > {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} > {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}} > {{ at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)}} > {{ at > org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:134)}} > {{ at > org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:148)}} > {{ at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:539)}} > {{ at > org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:117)}} > {{ at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)}} > {{ at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)}} > {{ at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)}} > {{ at > org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)}} > {{ at > com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)}} > {{ at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)}} > {{ at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)}} > {{ at > org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)}} > {{```}} > It's caused by the following code: Let's say 10K almost-empty rows, and then > 10K rows with 1MB each. > {{{}```{}}}{{{}private[spark] class AutoBatchedPickler(iter: Iterator[Any]) > extends Iterator[Array[Byte]] {{}}} > {{ private val pickle = new Pickler(/* useMemo = */ true,}} > {{ /* valueCompare = */ false)}} > {{ private var batch = 1}} > {{ private val buffer = new mutable.ArrayBuffer[Any]}}{{ override def > hasNext: Boolean = iter.hasNext}}{{ override def next(): Array[Byte] = {}} > {{{} while (iter.hasNext && buffer.length < batch){}}}{{{}{ > buffer += iter.next() }{}}}{{ val bytes = > pickle.dumps(buffer.toArray)}} > {{ val size = bytes.length}} > {{ // let 1M < size < 10M}} > {{{} if (size < 1024 * 1024){}}}{{{}{ batch *= 2 > }{}}}{{{}else if (size > 1024 * 1024 * 10 && batch > 1){}}}{{{}{ > batch /= 2 }{}}}{{ buffer.clear()}} > {{ bytes}} > {{ }}} > {{{} }{}}}{{{}```{}}} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org