jiangyu created ARROW-6011:
------------------------------

             Summary: Data incomplete when using pyarrow in pyspark in python 
3.x
                 Key: ARROW-6011
                 URL: https://issues.apache.org/jira/browse/ARROW-6011
             Project: Apache Arrow
          Issue Type: Bug
          Components: Java, Python
    Affects Versions: 0.14.0, 0.10.0
         Environment: ceonts 7.4      pyarrow 0.10.0  0.14.0   python 2.7  3.5 
3.6
            Reporter: jiangyu
         Attachments: image-2019-07-23-16-06-49-889.png

Hi,
 
In spark 2.3, pandas udf add to pyspark and pyarrow as a default serialization 
and deserialization method. It is a great feature, and we use it a lot.
But , when we change the default python version from 2.7 to 3.5 or 3.6 ( conda 
as  python envs manager),  we encounter a fatal problem.
We use pandas udf to process batches of data, but we find the data is 
incompelted. At first , i think the process logical maybe wrong, so i change 
the code to very simple one and it has the same problem.After investigate for a 
week, i find it is related to pyarrow.   
 
Reproduce it:
 
Below is how to reproduce it:

1.generate data
first generate a very simple data, the data have seven column, a、b、c、d、e、f and 
g, every row is the same,data type is Integer
a,b,c,d,e,f,g
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
 we can produce 100,000 rows and name the file test.csv upload to hdfs, then 
load it , and repartition it to 1 partition.
 
df=spark.read.format('csv').option("header","true").load('/test.csv')
df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
df=df.repartition(1)
spark_context = SparkContext.getOrCreate() 
 
2.register pandas udf
make a very simple pandas udf function and register it.The function is very 
simple , just print “iterator one time” and do nothing then return.
 
def add_func(a,b,c,d,e,f,g):
     print('iterator one time')
     return a
add = pandas_udf(add_func, returnType=IntegerType())
df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g")))
 
3.trigger spark to action
 
def trigger_func(iterator):
     yield iterator
df_result.rdd.foreachPartition(trigger_func)
 
4.execute it in pyspark (local or yarn)
we set spark.sql.execution.arrow.maxRecordsPerBatch=100000, and the rows is 
1,000,000 , so it is should print “iterator one time” for 10 times.
(1)Here is result in python 2.7 envs:
 
PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf 
spark.sql.execution.arrow.maxRecordsPerBatch=100000 --conf 
spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true 
--executor-cores 1
 
!image-2019-07-23-16-06-49-889.png!  
The result is right, 10 times of print.

(2)Then change to python 3.6 envs,with the same code.
PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf 
spark.sql.execution.arrow.maxRecordsPerBatch=100000 --conf 
spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true 
--executor-cores 
1!0pPjMgKKgEJSEACEpCABCQgAQlIQAISkMCcCaismHPrWjcJSEACEpCABCQgAQlIQAISkMAJJKCy4gQ2miJLQAISkIAEJCABCUhAAhKQgATmTEBlxZxb17pJQAISkIAEJCABCUhAAhKQgAROIAGVFSew0RRZAhKQgAQkIAEJSEACEpCABCQwZwIqK
 
bcutZNAhKQgAQkIAEJSEACEpCABCRwAgmorDiBjabIEpCABCQgAQlIQAISkIAEJCCBORP4B5QvwTqM1wfyAAAAAElFTkSuQmCC!
 The data is incomplete. 
The exception is print by spark which have been added by us , I will explain it 
later.
 
 
h3. Investigation
So i just add some log to trace it. The “process done” is added in the 
worker.py.
!Ae0YTBna66oMAAAAAElFTkSuQmCC!  
In order to get the exception, we also change the spark code, the code is under 
core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to 
print the exception.
 
@@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
 case t: Throwable =>
 // Purposefully not using NonFatal, because even fatal exceptions
 // we don't want to have our finallyBlock suppress
+ logInfo(t.getLocalizedMessage)
+ t.printStackTrace()
 originalThrowable = t
 throw originalThrowable
 } finally {
 
It seems the pyspark get the data from jvm , but pyarrow get the data 
incomplete. Pyarrow side think the data is finished, then shutdown the socket. 
At the same time, the jvm side still writes to the same socket , but get socket 
close exception.
The pyarrow part is in ipc.pxi:
 
cdef class _RecordBatchReader:
 cdef:
 shared_ptr[CRecordBatchReader] reader
 shared_ptr[InputStream] in_stream

 cdef readonly:
 Schema schema

 def __cinit__(self):
 pass

 def _open(self, source):
 get_input_stream(source, &self.in_stream)
 with nogil:
 check_status(CRecordBatchStreamReader.Open(
 self.in_stream.get(), &self.reader))

 self.schema = pyarrow_wrap_schema(self.reader.get().schema())

 def __iter__(self):
 while True:
 yield self.read_next_batch()

 def get_next_batch(self):
 import warnings
 warnings.warn('Please use read_next_batch instead of '
 'get_next_batch', FutureWarning)
 return self.read_next_batch()

 def read_next_batch(self):
 """
 Read next RecordBatch from the stream. Raises StopIteration at end of
 stream
 """
 cdef shared_ptr[CRecordBatch] batch

 with nogil:
 check_status(self.reader.get().ReadNext(&batch))

 if batch.get() == NULL:
 raise StopIteration

 return pyarrow_wrap_batch(batch)read_next_batch function get NULL, think the 
iterator is over.
 
h3. RESULT
Our environment is spark 2.4.3, we have tried pyarrow version 0.10.0 and 0.14.0 
, python version is python 2.7, python 3.5, python 3.6.
When using python 2.7, everything is fine. But when change to python 3.5,3,6, 
the data is wrong.
The column number is critical to trigger this bug, if column number is less 
than 5 , this bug probably will not happen. But If the column number is big , 
for example 7 or above, it will happened every time.
So we wonder if there is some conflict between python 3.x and pyarrow version? 
I have put the code and data as attachment.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to