[ 
https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16896132#comment-16896132
 ] 

Li Jin commented on SPARK-28482:
--------------------------------

Hi [~jiangyu1211], thank you for the bug report.
 
>From the description, it looks like an issue either in Spark or Arrow. Since 
>it's not clear where the problem is, there needs to be a person leading the 
>investigation. It seems that you have done pretty good investigation so far I 
>think it'd be great if you can lead the investigation under the help of both 
>communities. 
 
To the problem itself, it seems like the issue happens with in the Arrow stream 
format. Since the JVM write the complete Arrow stream format to the socket, one 
thing I suggest you do is to write the Arrow stream format data to a file and 
try reading it from pyarrow to see if you can reproduce it.

> Data incomplete when using pandas udf in Python 3
> -------------------------------------------------
>
>                 Key: SPARK-28482
>                 URL: https://issues.apache.org/jira/browse/SPARK-28482
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.3, 2.4.3
>         Environment: centos 7.4   
> pyarrow 0.10.0 0.14.0
> python 2.7 3.5 3.6
>            Reporter: jiangyu
>            Priority: Major
>         Attachments: py2.7.png, py3.6.png, test.csv, test.py, worker.png
>
>
> Hi,
>   
>  Since Spark 2.3.x, pandas udf has been introduced as default ser/des method 
> when using udf. However, an issue raises with python >= 3.5.x version.
>  We use pandas udf to process batches of data, but we find the data is 
> incomplete in python 3.x. At first , i think the process logical maybe wrong, 
> so i change the code to very simple one and it has the same problem.After 
> investigate for a week, i find it is related to pyarrow.   
>   
>  *Reproduce procedure:*
> 1. prepare data
>  The data have seven column, a、b、c、d、e、f and g, data type is Integer
>  a,b,c,d,e,f,g
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>   produce 100,000 rows and name the file test.csv ,upload to hdfs, then load 
> it , and repartition it to 1 partition.
>   
> {code:java}
> df=spark.read.format('csv').option("header","true").load('/test.csv')
> df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
> df=df.repartition(1)
> spark_context = SparkContext.getOrCreate() {code}
>  
>  2.register pandas udf
>   
> {code:java}
> def add_func(a,b,c,d,e,f,g):
>     print('iterator one time')
>     return a
> add = pandas_udf(add_func, returnType=IntegerType())
> df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
>  
>  3.apply pandas udf
>   
> {code:java}
> def trigger_func(iterator):
>       yield iterator
> df_result.rdd.foreachPartition(trigger_func){code}
>  
>  4.execute it in pyspark (local or yarn)
>  run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=100000. As 
> mentioned before the total row number is 1000000, it should print "iterator 
> one time " 10 times.
>  (1)Python 2.7 envs:
>   
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=100000 --conf 
> spark.executor.pyspark.memory=2g --conf 
> spark.sql.execution.arrow.enabled=true --executor-cores 1{code}
>  
>  !py2.7.png!   
>  The result is right, 10 times of print.
>  
>  
> (2)Python 3.5 or 3.6 envs:
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=100000 --conf 
> spark.executor.pyspark.memory=2g --conf 
> spark.sql.execution.arrow.enabled=true --executor-cores{code}
>  
> !py3.6.png!
> The data is incomplete. Exception is print by jvm spark which have been added 
> by us , I will explain it later.
>   
>   
> h3. *Investigation*
> The “process done” is added in the worker.py.
>  !worker.png!
>  In order to get the exception,  change the spark code, the code is under 
> core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to 
> print the exception.
>   
>  
> {code:java}
> @@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
>  case t: Throwable =>
>  // Purposefully not using NonFatal, because even fatal exceptions
>  // we don't want to have our finallyBlock suppress
> + logInfo(t.getLocalizedMessage)
> + t.printStackTrace()
>  originalThrowable = t
>  throw originalThrowable
>  } finally {{code}
>  
>  
>  It seems the pyspark get the data from jvm , but pyarrow get the data 
> incomplete. Pyarrow side think the data is finished, then shutdown the 
> socket. At the same time, the jvm side still writes to the same socket , but 
> get socket close exception.
>  The pyarrow part is in ipc.pxi:
>   
> {code:java}
> cdef class _RecordBatchReader:
>  cdef:
>  shared_ptr[CRecordBatchReader] reader
>  shared_ptr[InputStream] in_stream
> cdef readonly:
>  Schema schema
> def _cinit_(self):
>  pass
> def _open(self, source):
>  get_input_stream(source, &self.in_stream)
>  with nogil:
>  check_status(CRecordBatchStreamReader.Open(
>  self.in_stream.get(), &self.reader))
> self.schema = pyarrow_wrap_schema(self.reader.get().schema())
> def _iter_(self):
>  while True:
>  yield self.read_next_batch()
> def get_next_batch(self):
>  import warnings
>  warnings.warn('Please use read_next_batch instead of '
>  'get_next_batch', FutureWarning)
>  return self.read_next_batch()
> def read_next_batch(self):
>  """
>  Read next RecordBatch from the stream. Raises StopIteration at end of
>  stream
>  """
>  cdef shared_ptr[CRecordBatch] batch
> with nogil:
>  check_status(self.reader.get().ReadNext(&batch))
> if batch.get() == NULL:
>  raise StopIteration
>  return pyarrow_wrap_batch(batch){code}
>  
> read_next_batch function get NULL, think the iterator is over.
>   
> h3. *RESULT*
> Our environment is spark 2.4.3, we have tried pyarrow version 0.10.0 and 
> 0.14.0 , python version is python 2.7, python 3.5, python 3.6.
>  When using python 2.7, everything is fine. But when change to python 
> 3.5,3,6, the data is wrong.
>  The column number is critical to trigger this bug, if column number is less 
> than 5 , this bug probably will not happen. But If the column number is big , 
> for example 7 or above, it happens every time.
>  So we wonder if there is some conflict between python 3.x and pyarrow 
> version? 
>  I have put the code and data as attachment.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to