[ https://issues.apache.org/jira/browse/SPARK-32601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176771#comment-17176771 ]
Rohit Mishra commented on SPARK-32601: -------------------------------------- [~tahmad], Thanks for raising the bug. Please refrain from marking the Fix version as those are reserved for committers. Thanks. > Issue in converting an RDD of Arrow RecordBatches in v3.0.0 > ----------------------------------------------------------- > > Key: SPARK-32601 > URL: https://issues.apache.org/jira/browse/SPARK-32601 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.0.0 > Reporter: Tanveer > Priority: Major > Fix For: 2.3.4 > > > The following simple code snippet for converting an RDD of Arrow > RecordBatches works perfectly in Spark v2.3.4. > > {code:java} > // code placeholder > from pyspark.sql import SparkSession > import pyspark > import pyarrow as pa > from pyspark.serializers import ArrowSerializer > def _arrow_record_batch_dumps(rb): > # Fix for interoperability between pyarrow version >=0.15 and Spark's > arrow version > # Streaming message protocol has changed, remove setting when upgrading > spark. > import os > os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = '1' > > return bytearray(ArrowSerializer().dumps(rb)) > def rb_return(ardd): > data = [ > pa.array(range(5), type='int16'), > pa.array([-10, -5, 0, None, 10], type='int32') > ] > schema = pa.schema([pa.field('c0', pa.int16()), > pa.field('c1', pa.int32())], > metadata={b'foo': b'bar'}) > return pa.RecordBatch.from_arrays(data, schema=schema) > if __name__ == '__main__': > spark = SparkSession \ > .builder \ > .appName("Python Arrow-in-Spark example") \ > .getOrCreate() > # Enable Arrow-based columnar data transfers > spark.conf.set("spark.sql.execution.arrow.enabled", "true") > sc = spark.sparkContext > ardd = spark.sparkContext.parallelize([0,1,2], 3) > ardd = ardd.map(rb_return) > from pyspark.sql.types import from_arrow_schema > from pyspark.sql.dataframe import DataFrame > from pyspark.serializers import ArrowSerializer, PickleSerializer, > AutoBatchedSerializer > # Filter out and cache arrow record batches > ardd = ardd.filter(lambda x: isinstance(x, pa.RecordBatch)).cache() > ardd = ardd.map(_arrow_record_batch_dumps) > schema = pa.schema([pa.field('c0', pa.int16()), > pa.field('c1', pa.int32())], > metadata={b'foo': b'bar'}) > schema = from_arrow_schema(schema) > jrdd = ardd._to_java_object_rdd() > jdf = spark._jvm.PythonSQLUtils.arrowPayloadToDataFrame(jrdd, > schema.json(), spark._wrapped._jsqlContext) > df = DataFrame(jdf, spark._wrapped) > df._schema = schema > df.show() > {code} > > But after updating to Spark to v3.0.0, the same functionality with just > changing arrowPayloadToDataFrame() -> toDataFrame() doesn't work. > > {code:java} > // code placeholder > from pyspark.sql import SparkSession > import pyspark > import pyarrow as pa > #from pyspark.serializers import ArrowSerializerdef dumps(batch): > import pyarrow as pa > import io > sink = io.BytesIO() > writer = pa.RecordBatchFileWriter(sink, batch.schema) > writer.write_batch(batch) > writer.close() > return sink.getvalue()def _arrow_record_batch_dumps(rb): > # Fix for interoperability between pyarrow version >=0.15 and Spark's > arrow version > # Streaming message protocol has changed, remove setting when upgrading > spark. > #import os > #os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = '1' #return > bytearray(ArrowSerializer().dumps(rb)) > return bytearray(dumps(rb)) > def rb_return(ardd): > data = [ > pa.array(range(5), type='int16'), > pa.array([-10, -5, 0, None, 10], type='int32') > ] > schema = pa.schema([pa.field('c0', pa.int16()), > pa.field('c1', pa.int32())], > metadata={b'foo': b'bar'}) > return pa.RecordBatch.from_arrays(data, schema=schema)if __name__ == > '__main__': > spark = SparkSession \ > .builder \ > .appName("Python Arrow-in-Spark example") \ > .getOrCreate() # Enable Arrow-based columnar data transfers > spark.conf.set("spark.sql.execution.arrow.enabled", "true") > sc = spark.sparkContext ardd = spark.sparkContext.parallelize([0,1,2], > 3) > ardd = ardd.map(rb_return) from pyspark.sql.pandas.types import > from_arrow_schema > from pyspark.sql.dataframe import DataFrame # Filter out and cache > arrow record batches > ardd = ardd.filter(lambda x: isinstance(x, pa.RecordBatch)).cache() > ardd = ardd.map(_arrow_record_batch_dumps) schema = > pa.schema([pa.field('c0', pa.int16()), > pa.field('c1', pa.int32())], > metadata={b'foo': b'bar'}) > schema = from_arrow_schema(schema) jrdd = ardd._to_java_object_rdd() > #jdf = spark._jvm.PythonSQLUtils.arrowPayloadToDataFrame(jrdd, > schema.json(), spark._wrapped._jsqlContext) > jdf = spark._jvm.PythonSQLUtils.toDataFrame(jrdd, schema.json(), > spark._wrapped._jsqlContext) > df = DataFrame(jdf, spark._wrapped) > df._schema = schema df.show(){code} > First it gives error for Heap: > {code:java} > // code placeholder > 20/08/12 12:18:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID > 0)20/08/12 12:18:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID > 0)java.lang.OutOfMemoryError: Java heap space at > java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at > java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at > org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669) > at > org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:336) > at > org.apache.spark.sql.execution.arrow.ArrowConverters$.loadBatch(ArrowConverters.scala:189) > at > org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.nextBatch(ArrowConverters.scala:165) > at > org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.<init>(ArrowConverters.scala:144) > at > org.apache.spark.sql.execution.arrow.ArrowConverters$.fromBatchIterator(ArrowConverters.scala:143) > at > org.apache.spark.sql.execution.arrow.ArrowConverters$.$anonfun$toDataFrame$1(ArrowConverters.scala:203) > at > org.apache.spark.sql.execution.arrow.ArrowConverters$$$Lambda$1806/1325557847.apply(Unknown > Source) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837) > at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837) > at org.apache.spark.rdd.RDD$$Lambda$1805/889467051.apply(Unknown Source) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at > org.apache.spark.scheduler.Task.run(Task.scala:127) at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) > at > org.apache.spark.executor.Executor$TaskRunner$$Lambda$1773/1728811302.apply(Unknown > Source) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)20/08/12 12:18:48 ERROR > SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor > task launch worker for task 0,5,main]java.lang.OutOfMemoryError: Java heap > space > {code} > And when using parameter --driver-memory 4g for this very small data, it > gives: > {code:java} > 20/08/12 12:22:18 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID > 0)20/08/12 12:22:18 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID > 0)java.io.IOException: Unexpected end of stream trying to read message. at > org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:671) > at > org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:336) > at > org.apache.spark.sql.execution.arrow.ArrowConverters$.loadBatch(ArrowConverters.scala:189) > at > org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.nextBatch(ArrowConverters.scala:165) > at > org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.<init>(ArrowConverters.scala:144) > at > org.apache.spark.sql.execution.arrow.ArrowConverters$.fromBatchIterator(ArrowConverters.scala:143) > at > org.apache.spark.sql.execution.arrow.ArrowConverters$.$anonfun$toDataFrame$1(ArrowConverters.scala:203) > at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837) at > org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at > org.apache.spark.scheduler.Task.run(Task.scala:127) at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)20/08/12 12:22:18 WARN > TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, int2.bullx, executor > driver): java.io.IOException: Unexpected end of stream trying to read message. > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org