[jira] [Commented] (SPARK-28482) Data incomplete when using pandas udf in Python 3

2019-08-22 Thread jiangyu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16913879#comment-16913879
 ] 

jiangyu commented on SPARK-28482:
-

hi, [~bryanc] , i have tested toPandas(), it is okay. Row numbers is correct 
and no exception throw.

I used df_result.rdd.foreachPartition(trigger_func) at beginning when use 
python 2.7 to tigger pandas udf , everything is fine. When changed to python 
3.6, this method seemed not stable. I will change the method to toPandas(). 
Thank you.

> Data incomplete when using pandas udf in Python 3
> -
>
> Key: SPARK-28482
> URL: https://issues.apache.org/jira/browse/SPARK-28482
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.3, 2.4.3
> Environment: centos 7.4   
> pyarrow 0.10.0 0.14.0
> python 2.7 3.5 3.6
>Reporter: jiangyu
>Priority: Major
> Attachments: py2.7.png, py3.6.png, test.csv, test.py, worker.png
>
>
> Hi,
>   
>  Since Spark 2.3.x, pandas udf has been introduced as default ser/des method 
> when using udf. However, an issue raises with python >= 3.5.x version.
>  We use pandas udf to process batches of data, but we find the data is 
> incomplete in python 3.x. At first , i think the process logical maybe wrong, 
> so i change the code to very simple one and it has the same problem.After 
> investigate for a week, i find it is related to pyarrow.   
>   
>  *Reproduce procedure:*
> 1. prepare data
>  The data have seven column, a、b、c、d、e、f and g, data type is Integer
>  a,b,c,d,e,f,g
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>   produce 100,000 rows and name the file test.csv ,upload to hdfs, then load 
> it , and repartition it to 1 partition.
>   
> {code:java}
> df=spark.read.format('csv').option("header","true").load('/test.csv')
> df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
> df=df.repartition(1)
> spark_context = SparkContext.getOrCreate() {code}
>  
>  2.register pandas udf
>   
> {code:java}
> def add_func(a,b,c,d,e,f,g):
> print('iterator one time')
> return a
> add = pandas_udf(add_func, returnType=IntegerType())
> df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
>  
>  3.apply pandas udf
>   
> {code:java}
> def trigger_func(iterator):
>       yield iterator
> df_result.rdd.foreachPartition(trigger_func){code}
>  
>  4.execute it in pyspark (local or yarn)
>  run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=10. As 
> mentioned before the total row number is 100, it should print "iterator 
> one time " 10 times.
>  (1)Python 2.7 envs:
>   
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
> spark.executor.pyspark.memory=2g --conf 
> spark.sql.execution.arrow.enabled=true --executor-cores 1{code}
>  
>  !py2.7.png!   
>  The result is right, 10 times of print.
>  
>  
> (2)Python 3.5 or 3.6 envs:
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
> spark.executor.pyspark.memory=2g --conf 
> spark.sql.execution.arrow.enabled=true --executor-cores{code}
>  
> !py3.6.png!
> The data is incomplete. Exception is print by jvm spark which have been added 
> by us , I will explain it later.
>   
>   
> h3. *Investigation*
> The “process done” is added in the worker.py.
>  !worker.png!
>  In order to get the exception,  change the spark code, the code is under 
> core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to 
> print the exception.
>   
>  
> {code:java}
> @@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
>  case t: Throwable =>
>  // Purposefully not using NonFatal, because even fatal exceptions
>  // we don't want to have our finallyBlock suppress
> + logInfo(t.getLocalizedMessage)
> + t.printStackTrace()
>  originalThrowable = t
>  throw originalThrowable
>  } finally {{code}
>  
>  
>  It seems the pyspark get the data from jvm , but pyarrow get the data 
> incomplete. Pyarrow side think the data is finished, then shutdown the 
> socket. At the same time, the jvm side still writes to the same socket , but 
> get socket close exception.
>  The pyarrow part is in ipc.pxi:
>   
> {code:java}
> cdef class _RecordBatchReader:
>  cdef:
>  shared_ptr[CRecordBatchReader] reader
>  shared_ptr[InputStream] in_stream
> cdef readonly:
>  Schema schema
> def _cinit_(self):
>  pass
> def _open(self, source):
>  get_input_stream(source, &self.in_stream)
>  with nogil:
>  check_status(CRecordBatchStreamReader.Open(
>  self.in_stream.get(), &self.reader))
> self.schema = pyarrow_wrap_schema(self.reader.get().schema())
> def _iter

[jira] [Comment Edited] (SPARK-28482) Data incomplete when using pandas udf in Python 3

2019-08-22 Thread jiangyu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16913183#comment-16913183
 ] 

jiangyu edited comment on SPARK-28482 at 8/22/19 9:37 AM:
--

hi, [~bryanc] , maybe you should produce more data, like 100,000 rows, and read 
10,000 rows every iteration. The number of the rows is not right, is smaller 
than expected.

I have investigate this issue this week,  i find the row numbers is correct 
when arrow read from the socket , so in  serializers.py , i revise the method 
of dump_stream,  change the stream to local stream
{code:java}
// code placeholder
def dump_stream(self, iterator, stream):
"""
Make ArrowRecordBatches from Pandas Series and serialize. Input is a single 
series or
a list of series accompanied by an optional pyarrow type to coerce the data 
to.
"""
import pyarrow as pa
writer = None
local_stream = pa.output_stream('/tmp/output')
try:
for series in iterator:
batch = _create_batch(series, self._timezone)
if writer is None:
# write_int(SpecialLengths.START_ARROW_STREAM, stream)
# writer = pa.RecordBatchStreamWriter(stream, batch.schema)
write_int(SpecialLengths.START_ARROW_STREAM, local_stream)
writer = pa.RecordBatchStreamWriter(local_stream, batch.schema)
writer.write_batch(batch)
finally:
if writer is not None:
writer.close()
{code}
 

The row numbers is correct, and no exception throw.

Then i  change the daemon.py , and increase the buffer size of outfile, from 
65536 to 65536.
{code:java}
// code placeholder
def worker(sock, authenticated):
"""
Called by a worker process after the fork().
"""
signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
signal.signal(SIGTERM, SIG_DFL)
# restore the handler for SIGINT,
# it's useful for debugging (show the stacktrace before exit)
signal.signal(SIGINT, signal.default_int_handler)

# Read the socket using fdopen instead of socket.makefile() because the 
latter
# seems to be very slow; note that we need to dup() the file descriptor 
because
# otherwise writes also cause a seek that makes us miss data on the read 
side.
infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536)
{code}
And everything is ok. I don't know if it is safe to increase buffer size to 
that high. But it really help us.


was (Author: jiangyu1211):
hi, [~bryanc] , maybe you should produce more data, like 100,000 rows, and read 
10,000 rows every iteration. The number of the rows is not right, is smaller 
than expected.

I have investigate this issue this week,  i find the row numbers is correct 
when arrow read from the socket , so in  serializers.py , i revise the method 
of dump_stream,  change the stream to local stream
{code:java}
// code placeholder
def dump_stream(self, iterator, stream):
"""
Make ArrowRecordBatches from Pandas Series and serialize. Input is a single 
series or
a list of series accompanied by an optional pyarrow type to coerce the data 
to.
"""
import pyarrow as pa
writer = None
local_stream = pa.output_stream('/tmp/output')
try:
for series in iterator:
batch = _create_batch(series, self._timezone)
if writer is None:
# write_int(SpecialLengths.START_ARROW_STREAM, stream)
# writer = pa.RecordBatchStreamWriter(stream, batch.schema)
write_int(SpecialLengths.START_ARROW_STREAM, local_stream)
writer = pa.RecordBatchStreamWriter(local_stream, batch.schema)
writer.write_batch(batch)
finally:
if writer is not None:
writer.close()
{code}
 

The row numbers is correct, and no exception throw.

Then i  change the daemon.py , and increase the buffer size of outfile, from 
65536 to 65536.
{code:java}
// code placeholder
def worker(sock, authenticated):
"""
Called by a worker process after the fork().
"""
signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
signal.signal(SIGTERM, SIG_DFL)
# restore the handler for SIGINT,
# it's useful for debugging (show the stacktrace before exit)
signal.signal(SIGINT, signal.default_int_handler)

# Read the socket using fdopen instead of socket.makefile() because the 
latter
# seems to be very slow; note that we need to dup() the file descriptor 
because
# otherwise writes also cause a seek that makes us miss data on the read 
side.
infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536)
{code}
And everything is ok. So i don't know if it is safe to increase buffer size to 
this high. But it is really help us.

> 

[jira] [Commented] (SPARK-28482) Data incomplete when using pandas udf in Python 3

2019-08-22 Thread jiangyu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16913183#comment-16913183
 ] 

jiangyu commented on SPARK-28482:
-

hi, [~bryanc] , maybe you should produce more data, like 100,000 rows, and read 
10,000 rows every iteration. The number of the rows is not right, is smaller 
than expected.

I have investigate this issue this week,  i find the row numbers is correct 
when arrow read from the socket , so in  serializers.py , i revise the method 
of dump_stream,  change the stream to local stream
{code:java}
// code placeholder
def dump_stream(self, iterator, stream):
"""
Make ArrowRecordBatches from Pandas Series and serialize. Input is a single 
series or
a list of series accompanied by an optional pyarrow type to coerce the data 
to.
"""
import pyarrow as pa
writer = None
local_stream = pa.output_stream('/tmp/output')
try:
for series in iterator:
batch = _create_batch(series, self._timezone)
if writer is None:
# write_int(SpecialLengths.START_ARROW_STREAM, stream)
# writer = pa.RecordBatchStreamWriter(stream, batch.schema)
write_int(SpecialLengths.START_ARROW_STREAM, local_stream)
writer = pa.RecordBatchStreamWriter(local_stream, batch.schema)
writer.write_batch(batch)
finally:
if writer is not None:
writer.close()
{code}
 

The row numbers is correct, and no exception throw.

Then i  change the daemon.py , and increase the buffer size of outfile, from 
65536 to 65536.
{code:java}
// code placeholder
def worker(sock, authenticated):
"""
Called by a worker process after the fork().
"""
signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
signal.signal(SIGTERM, SIG_DFL)
# restore the handler for SIGINT,
# it's useful for debugging (show the stacktrace before exit)
signal.signal(SIGINT, signal.default_int_handler)

# Read the socket using fdopen instead of socket.makefile() because the 
latter
# seems to be very slow; note that we need to dup() the file descriptor 
because
# otherwise writes also cause a seek that makes us miss data on the read 
side.
infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536)
{code}
And everything is ok. So i don't know if it is safe to increase buffer size to 
this high. But it is really help us.

> Data incomplete when using pandas udf in Python 3
> -
>
> Key: SPARK-28482
> URL: https://issues.apache.org/jira/browse/SPARK-28482
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.3, 2.4.3
> Environment: centos 7.4   
> pyarrow 0.10.0 0.14.0
> python 2.7 3.5 3.6
>Reporter: jiangyu
>Priority: Major
> Attachments: py2.7.png, py3.6.png, test.csv, test.py, worker.png
>
>
> Hi,
>   
>  Since Spark 2.3.x, pandas udf has been introduced as default ser/des method 
> when using udf. However, an issue raises with python >= 3.5.x version.
>  We use pandas udf to process batches of data, but we find the data is 
> incomplete in python 3.x. At first , i think the process logical maybe wrong, 
> so i change the code to very simple one and it has the same problem.After 
> investigate for a week, i find it is related to pyarrow.   
>   
>  *Reproduce procedure:*
> 1. prepare data
>  The data have seven column, a、b、c、d、e、f and g, data type is Integer
>  a,b,c,d,e,f,g
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>   produce 100,000 rows and name the file test.csv ,upload to hdfs, then load 
> it , and repartition it to 1 partition.
>   
> {code:java}
> df=spark.read.format('csv').option("header","true").load('/test.csv')
> df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
> df=df.repartition(1)
> spark_context = SparkContext.getOrCreate() {code}
>  
>  2.register pandas udf
>   
> {code:java}
> def add_func(a,b,c,d,e,f,g):
> print('iterator one time')
> return a
> add = pandas_udf(add_func, returnType=IntegerType())
> df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
>  
>  3.apply pandas udf
>   
> {code:java}
> def trigger_func(iterator):
>       yield iterator
> df_result.rdd.foreachPartition(trigger_func){code}
>  
>  4.execute it in pyspark (local or yarn)
>  run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=10. As 
> mentioned before the total row number is 100, it should print "iterator 
> one time " 10 times.
>  (1)Python 2.7 envs:
>   
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
> spark

[jira] [Commented] (SPARK-28482) Data incomplete when using pandas udf in pyspark

2019-07-23 Thread jiangyu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16891638#comment-16891638
 ] 

jiangyu commented on SPARK-28482:
-

hi  [~dongjoon] , I have tested this in spark 2.3.3 , 2.4.2 and 2.4.3, got the 
same error. And you are right , Python 2.x is ok, but in Python 3.x , data is 
incomplete, which lead pandas udf can not be used in this envs.

> Data incomplete when using pandas udf in pyspark
> 
>
> Key: SPARK-28482
> URL: https://issues.apache.org/jira/browse/SPARK-28482
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: centos 7.4   
> pyarrow 0.10.0 0.14.0
> python 2.7 3.5 3.6
>Reporter: jiangyu
>Priority: Major
> Attachments: py2.7.png, py3.6.png, test.csv, test.py, worker.png
>
>
> Hi,
>   
>  Since Spark 2.3.x, pandas udf has been introduced as default ser/des method 
> when using udf. However, an issue raises with python >= 3.5.x version.
>  We use pandas udf to process batches of data, but we find the data is 
> incomplete in python 3.x. At first , i think the process logical maybe wrong, 
> so i change the code to very simple one and it has the same problem.After 
> investigate for a week, i find it is related to pyarrow.   
>   
>  *Reproduce procedure:*
> 1. prepare data
>  The data have seven column, a、b、c、d、e、f and g, data type is Integer
>  a,b,c,d,e,f,g
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>   produce 100,000 rows and name the file test.csv ,upload to hdfs, then load 
> it , and repartition it to 1 partition.
>   
> {code:java}
> df=spark.read.format('csv').option("header","true").load('/test.csv')
> df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
> df=df.repartition(1)
> spark_context = SparkContext.getOrCreate() {code}
>  
>  2.register pandas udf
>   
> {code:java}
> def add_func(a,b,c,d,e,f,g):
> print('iterator one time')
> return a
> add = pandas_udf(add_func, returnType=IntegerType())
> df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
>  
>  3.apply pandas udf
>   
> {code:java}
> def trigger_func(iterator):
>       yield iterator
> df_result.rdd.foreachPartition(trigger_func){code}
>  
>  4.execute it in pyspark (local or yarn)
>  run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=10. As 
> mentioned before the total row number is 100, it should print "iterator 
> one time " 10 times.
>  (1)Python 2.7 envs:
>   
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
> spark.executor.pyspark.memory=2g --conf 
> spark.sql.execution.arrow.enabled=true --executor-cores 1{code}
>  
>  !py2.7.png!   
>  The result is right, 10 times of print.
>  
>  
> (2)Python 3.5 or 3.6 envs:
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
> spark.executor.pyspark.memory=2g --conf 
> spark.sql.execution.arrow.enabled=true --executor-cores{code}
>  
> !py3.6.png!
> The data is incomplete. Exception is print by jvm spark which have been added 
> by us , I will explain it later.
>   
>   
> h3. *Investigation*
> The “process done” is added in the worker.py.
>  !worker.png!
>  In order to get the exception,  change the spark code, the code is under 
> core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to 
> print the exception.
>   
>  
> {code:java}
> @@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
>  case t: Throwable =>
>  // Purposefully not using NonFatal, because even fatal exceptions
>  // we don't want to have our finallyBlock suppress
> + logInfo(t.getLocalizedMessage)
> + t.printStackTrace()
>  originalThrowable = t
>  throw originalThrowable
>  } finally {{code}
>  
>  
>  It seems the pyspark get the data from jvm , but pyarrow get the data 
> incomplete. Pyarrow side think the data is finished, then shutdown the 
> socket. At the same time, the jvm side still writes to the same socket , but 
> get socket close exception.
>  The pyarrow part is in ipc.pxi:
>   
> {code:java}
> cdef class _RecordBatchReader:
>  cdef:
>  shared_ptr[CRecordBatchReader] reader
>  shared_ptr[InputStream] in_stream
> cdef readonly:
>  Schema schema
> def _cinit_(self):
>  pass
> def _open(self, source):
>  get_input_stream(source, &self.in_stream)
>  with nogil:
>  check_status(CRecordBatchStreamReader.Open(
>  self.in_stream.get(), &self.reader))
> self.schema = pyarrow_wrap_schema(self.reader.get().schema())
> def _iter_(self):
>  while True:
>  yield self.read_next_batch()
> def get_next_batch(self):
>  import warnings
>  warnings.warn('Please us

[jira] [Updated] (SPARK-28482) Data incomplete when using pandas udf in pyspark

2019-07-23 Thread jiangyu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiangyu updated SPARK-28482:

Description: 
Hi,
  
 Since Spark 2.3.x, pandas udf has been introduced as default ser/des method 
when using udf. However, an issue raises with python >= 3.5.x version.
 We use pandas udf to process batches of data, but we find the data is 
incomplete in python 3.x. At first , i think the process logical maybe wrong, 
so i change the code to very simple one and it has the same problem.After 
investigate for a week, i find it is related to pyarrow.   
  
 *Reproduce procedure:*

1. prepare data
 The data have seven column, a、b、c、d、e、f and g, data type is Integer
 a,b,c,d,e,f,g
 1,2,3,4,5,6,7
 1,2,3,4,5,6,7
 1,2,3,4,5,6,7
 1,2,3,4,5,6,7
  produce 100,000 rows and name the file test.csv ,upload to hdfs, then load it 
, and repartition it to 1 partition.
  
{code:java}
df=spark.read.format('csv').option("header","true").load('/test.csv')
df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
df=df.repartition(1)
spark_context = SparkContext.getOrCreate() {code}
 
 2.register pandas udf
  
{code:java}
def add_func(a,b,c,d,e,f,g):
print('iterator one time')
return a
add = pandas_udf(add_func, returnType=IntegerType())
df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
 
 3.apply pandas udf
  
{code:java}
def trigger_func(iterator):
      yield iterator
df_result.rdd.foreachPartition(trigger_func){code}
 
 4.execute it in pyspark (local or yarn)
 run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=10. As 
mentioned before the total row number is 100, it should print "iterator one 
time " 10 times.
 (1)Python 2.7 envs:
  
{code:java}
PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf 
spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true 
--executor-cores 1{code}
 
 !py2.7.png!   
 The result is right, 10 times of print.

 

 

(2)Python 3.5 or 3.6 envs:
{code:java}
PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf 
spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true 
--executor-cores{code}
 

!py3.6.png!

The data is incomplete. Exception is print by jvm spark which have been added 
by us , I will explain it later.
  
  
h3. *Investigation*

The “process done” is added in the worker.py.
 !worker.png!
 In order to get the exception,  change the spark code, the code is under 
core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to 
print the exception.
  

 
{code:java}
@@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
 case t: Throwable =>
 // Purposefully not using NonFatal, because even fatal exceptions
 // we don't want to have our finallyBlock suppress
+ logInfo(t.getLocalizedMessage)
+ t.printStackTrace()
 originalThrowable = t
 throw originalThrowable
 } finally {{code}
 

 
 It seems the pyspark get the data from jvm , but pyarrow get the data 
incomplete. Pyarrow side think the data is finished, then shutdown the socket. 
At the same time, the jvm side still writes to the same socket , but get socket 
close exception.
 The pyarrow part is in ipc.pxi:
  
{code:java}
cdef class _RecordBatchReader:
 cdef:
 shared_ptr[CRecordBatchReader] reader
 shared_ptr[InputStream] in_stream
cdef readonly:
 Schema schema
def _cinit_(self):
 pass
def _open(self, source):
 get_input_stream(source, &self.in_stream)
 with nogil:
 check_status(CRecordBatchStreamReader.Open(
 self.in_stream.get(), &self.reader))
self.schema = pyarrow_wrap_schema(self.reader.get().schema())
def _iter_(self):
 while True:
 yield self.read_next_batch()
def get_next_batch(self):
 import warnings
 warnings.warn('Please use read_next_batch instead of '
 'get_next_batch', FutureWarning)
 return self.read_next_batch()
def read_next_batch(self):
 """
 Read next RecordBatch from the stream. Raises StopIteration at end of
 stream
 """
 cdef shared_ptr[CRecordBatch] batch
with nogil:
 check_status(self.reader.get().ReadNext(&batch))
if batch.get() == NULL:
 raise StopIteration
 return pyarrow_wrap_batch(batch){code}
 

read_next_batch function get NULL, think the iterator is over.
  
h3. *RESULT*

Our environment is spark 2.4.3, we have tried pyarrow version 0.10.0 and 0.14.0 
, python version is python 2.7, python 3.5, python 3.6.
 When using python 2.7, everything is fine. But when change to python 3.5,3,6, 
the data is wrong.
 The column number is critical to trigger this bug, if column number is less 
than 5 , this bug probably will not happen. But If the column number is big , 
for example 7 or above, it happens every time.
 So we wonder if there is some conflict between python 3.x and pyarrow version? 
 I have put the code and data as attac

[jira] [Commented] (SPARK-28482) Data incomplete when using pandas udf in pyspark

2019-07-23 Thread jiangyu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16890793#comment-16890793
 ] 

jiangyu commented on SPARK-28482:
-

Also submit a pr in arrow community 
https://issues.apache.org/jira/browse/ARROW-6011

> Data incomplete when using pandas udf in pyspark
> 
>
> Key: SPARK-28482
> URL: https://issues.apache.org/jira/browse/SPARK-28482
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: centos 7.4   
> pyarrow 0.10.0 0.14.0
> python 2.7 3.5 3.6
>Reporter: jiangyu
>Priority: Major
> Attachments: py2.7.png, py3.6.png, test.csv, test.py, worker.png
>
>
> Hi,
>   
>  Since Spark 2.3.x, pandas udf has been introduced as default ser/des method 
> when using udf. However, an issue raises with python >= 3.5.x version.
>  We use pandas udf to process batches of data, but we find the data is 
> incomplete in python 3.x. At first , i think the process logical maybe wrong, 
> so i change the code to very simple one and it has the same problem.After 
> investigate for a week, i find it is related to pyarrow.   
>   
>  *Reproduce procedure:*
> 1. prepare data
>  The data have seven column, a、b、c、d、e、f and g, data type is Integer
>  a,b,c,d,e,f,g
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>   produce 100,000 rows and name the file test.csv ,upload to hdfs, then load 
> it , and repartition it to 1 partition.
>   
> {code:java}
> df=spark.read.format('csv').option("header","true").load('/test.csv')
> df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
> df=df.repartition(1)
> spark_context = SparkContext.getOrCreate() {code}
>  
>  2.register pandas udf
>   
> {code:java}
> def add_func(a,b,c,d,e,f,g):
> print('iterator one time')
> return a
> add = pandas_udf(add_func, returnType=IntegerType())
> df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
>  
>  3.apply pandas udf
>   
> {code:java}
> def trigger_func(iterator):
>       yield iterator
> df_result.rdd.foreachPartition(trigger_func){code}
>  
>  4.execute it in pyspark (local or yarn)
>  run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=10. As 
> mentioned before the total row number is 100, it should print "iterator 
> one time " 10 times.
>  (1)Python 2.7 envs:
>   
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
> spark.executor.pyspark.memory=2g --conf 
> spark.sql.execution.arrow.enabled=true --executor-cores 1{code}
>  
> !py2.7.png!   
>  The result is right, 10 times of print.
>  
>  
> (2)Python 3.5 or 3.6 envs:
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
> spark.executor.pyspark.memory=2g --conf 
> spark.sql.execution.arrow.enabled=true --executor-cores{code}
>  
> !py3.6.png!
> The data is incomplete. Exception is print by jvm spark which have been added 
> by us , I will explain it later.
>   
>   
> h3. *Investigation*
> The “process done” is added in the worker.py.
> !worker.png!
>  In order to get the exception,  change the spark code, the code is under 
> core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to 
> print the exception.
>   
>  
> {code:java}
> @@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
>  case t: Throwable =>
>  // Purposefully not using NonFatal, because even fatal exceptions
>  // we don't want to have our finallyBlock suppress
> + logInfo(t.getLocalizedMessage)
> + t.printStackTrace()
>  originalThrowable = t
>  throw originalThrowable
>  } finally {{code}
>  
>  
>  It seems the pyspark get the data from jvm , but pyarrow get the data 
> incomplete. Pyarrow side think the data is finished, then shutdown the 
> socket. At the same time, the jvm side still writes to the same socket , but 
> get socket close exception.
>  The pyarrow part is in ipc.pxi:
>   
> {code:java}
> cdef class _RecordBatchReader:
>  cdef:
>  shared_ptr[CRecordBatchReader] reader
>  shared_ptr[InputStream] in_stream
> cdef readonly:
>  Schema schema
> def _cinit_(self):
>  pass
> def _open(self, source):
>  get_input_stream(source, &self.in_stream)
>  with nogil:
>  check_status(CRecordBatchStreamReader.Open(
>  self.in_stream.get(), &self.reader))
> self.schema = pyarrow_wrap_schema(self.reader.get().schema())
> def _iter_(self):
>  while True:
>  yield self.read_next_batch()
> def get_next_batch(self):
>  import warnings
>  warnings.warn('Please use read_next_batch instead of '
>  'get_next_batch', FutureWarning)
>  return self.read_next_batch()
> def read_next_batch(self):
>  """
>  

[jira] [Updated] (SPARK-28482) Data incomplete when using pandas udf in pyspark

2019-07-23 Thread jiangyu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiangyu updated SPARK-28482:

Description: 
Hi,
  
 Since Spark 2.3.x, pandas udf has been introduced as default ser/des method 
when using udf. However, an issue raises with python >= 3.5.x version.
 We use pandas udf to process batches of data, but we find the data is 
incomplete in python 3.x. At first , i think the process logical maybe wrong, 
so i change the code to very simple one and it has the same problem.After 
investigate for a week, i find it is related to pyarrow.   
  
 *Reproduce procedure:*

1. prepare data
 The data have seven column, a、b、c、d、e、f and g, data type is Integer
 a,b,c,d,e,f,g
 1,2,3,4,5,6,7
 1,2,3,4,5,6,7
 1,2,3,4,5,6,7
 1,2,3,4,5,6,7
  produce 100,000 rows and name the file test.csv ,upload to hdfs, then load it 
, and repartition it to 1 partition.
  
{code:java}
df=spark.read.format('csv').option("header","true").load('/test.csv')
df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
df=df.repartition(1)
spark_context = SparkContext.getOrCreate() {code}
 
 2.register pandas udf
  
{code:java}
def add_func(a,b,c,d,e,f,g):
print('iterator one time')
return a
add = pandas_udf(add_func, returnType=IntegerType())
df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
 
 3.apply pandas udf
  
{code:java}
def trigger_func(iterator):
      yield iterator
df_result.rdd.foreachPartition(trigger_func){code}
 
 4.execute it in pyspark (local or yarn)
 run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=10. As 
mentioned before the total row number is 100, it should print "iterator one 
time " 10 times.
 (1)Python 2.7 envs:
  
{code:java}
PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf 
spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true 
--executor-cores 1{code}
 
!py2.7.png!   
 The result is right, 10 times of print.

 

 

(2)Python 3.5 or 3.6 envs:
{code:java}
PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf 
spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true 
--executor-cores{code}
 

!py3.6.png!

The data is incomplete. Exception is print by jvm spark which have been added 
by us , I will explain it later.
  
  
h3. *Investigation*

The “process done” is added in the worker.py.
!worker.png!
 In order to get the exception,  change the spark code, the code is under 
core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to 
print the exception.
  

 
{code:java}
@@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
 case t: Throwable =>
 // Purposefully not using NonFatal, because even fatal exceptions
 // we don't want to have our finallyBlock suppress
+ logInfo(t.getLocalizedMessage)
+ t.printStackTrace()
 originalThrowable = t
 throw originalThrowable
 } finally {{code}
 

 
 It seems the pyspark get the data from jvm , but pyarrow get the data 
incomplete. Pyarrow side think the data is finished, then shutdown the socket. 
At the same time, the jvm side still writes to the same socket , but get socket 
close exception.
 The pyarrow part is in ipc.pxi:
  
{code:java}
cdef class _RecordBatchReader:
 cdef:
 shared_ptr[CRecordBatchReader] reader
 shared_ptr[InputStream] in_stream
cdef readonly:
 Schema schema
def _cinit_(self):
 pass
def _open(self, source):
 get_input_stream(source, &self.in_stream)
 with nogil:
 check_status(CRecordBatchStreamReader.Open(
 self.in_stream.get(), &self.reader))
self.schema = pyarrow_wrap_schema(self.reader.get().schema())
def _iter_(self):
 while True:
 yield self.read_next_batch()
def get_next_batch(self):
 import warnings
 warnings.warn('Please use read_next_batch instead of '
 'get_next_batch', FutureWarning)
 return self.read_next_batch()
def read_next_batch(self):
 """
 Read next RecordBatch from the stream. Raises StopIteration at end of
 stream
 """
 cdef shared_ptr[CRecordBatch] batch
with nogil:
 check_status(self.reader.get().ReadNext(&batch))
if batch.get() == NULL:
 raise StopIteration
 return pyarrow_wrap_batch(batch){code}
 

read_next_batch function get NULL, think the iterator is over.
  
h3. *RESULT*

Our environment is spark 2.4.3, we have tried pyarrow version 0.10.0 and 0.14.0 
, python version is python 2.7, python 3.5, python 3.6.
 When using python 2.7, everything is fine. But when change to python 3.5,3,6, 
the data is wrong.
 The column number is critical to trigger this bug, if column number is less 
than 5 , this bug probably will not happen. But If the column number is big , 
for example 7 or above, it will happened every time.
 So we wonder if there is some conflict between python 3.x and pyarrow version? 
 I have put the code and data as a

[jira] [Updated] (SPARK-28482) Data incomplete when using pandas udf in pyspark

2019-07-23 Thread jiangyu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiangyu updated SPARK-28482:

Attachment: test.py
test.csv

> Data incomplete when using pandas udf in pyspark
> 
>
> Key: SPARK-28482
> URL: https://issues.apache.org/jira/browse/SPARK-28482
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: centos 7.4   
> pyarrow 0.10.0 0.14.0
> python 2.7 3.5 3.6
>Reporter: jiangyu
>Priority: Major
> Attachments: py2.7.png, py3.6.png, test.csv, test.py, worker.png
>
>
> Hi,
>  
> Since Spark 2.3.x, pandas udf has been introduced as default ser/des method 
> when using udf. However, an issue raises with python >= 3.5.x version.
> We use pandas udf to process batches of data, but we find the data is 
> incomplete in python 3.x. At first , i think the process logical maybe wrong, 
> so i change the code to very simple one and it has the same problem.After 
> investigate for a week, i find it is related to pyarrow.   
>  
> *Reproduce procedure:*
> 1. prepare data
> The data have seven column, a、b、c、d、e、f and g, data type is Integer
> a,b,c,d,e,f,g
> 1,2,3,4,5,6,7
> 1,2,3,4,5,6,7
> 1,2,3,4,5,6,7
> 1,2,3,4,5,6,7
>  produce 100,000 rows and name the file test.csv ,upload to hdfs, then load 
> it , and repartition it to 1 partition.
>  
> {code:java}
> df=spark.read.format('csv').option("header","true").load('/test.csv')
> df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
> df=df.repartition(1)
> spark_context = SparkContext.getOrCreate() {code}
>  
> 2.register pandas udf
>  
> {code:java}
> def add_func(a,b,c,d,e,f,g):
> print('iterator one time')
> return a
> add = pandas_udf(add_func, returnType=IntegerType())
> df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
>  
> 3.apply pandas udf
>  
> {code:java}
> def trigger_func(iterator):
>       yield iterator
> df_result.rdd.foreachPartition(trigger_func){code}
>  
> 4.execute it in pyspark (local or yarn)
> run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=10. As 
> mentioned before the total row number is 100, it should print "iterator 
> one time " 10 times.
> (1)Python 2.7 envs:
>  
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
> spark.executor.pyspark.memory=2g --conf 
> spark.sql.execution.arrow.enabled=true --executor-cores 1{code}
>  
> !image-2019-07-23-16-06-49-889.png!  
> The result is right, 10 times of print.
>  
>  
> (2)Python 3.5 or 3.6 envs:
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
> spark.executor.pyspark.memory=2g --conf 
> spark.sql.execution.arrow.enabled=true --executor-cores{code}
>  
> !py3.6.png!
> The data is incomplete. Exception is print by jvm spark which have been added 
> by us , I will explain it later.
>  
>  
> h3. *Investigation*
> The “process done” is added in the worker.py.
> !worker.png!   
> In order to get the exception,  change the spark code, the code is under 
> core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to 
> print the exception.
>  
>  
> {code:java}
> @@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
>  case t: Throwable =>
>  // Purposefully not using NonFatal, because even fatal exceptions
>  // we don't want to have our finallyBlock suppress
> + logInfo(t.getLocalizedMessage)
> + t.printStackTrace()
>  originalThrowable = t
>  throw originalThrowable
>  } finally {{code}
>  
>  
> It seems the pyspark get the data from jvm , but pyarrow get the data 
> incomplete. Pyarrow side think the data is finished, then shutdown the 
> socket. At the same time, the jvm side still writes to the same socket , but 
> get socket close exception.
> The pyarrow part is in ipc.pxi:
>  
> {code:java}
> cdef class _RecordBatchReader:
>  cdef:
>  shared_ptr[CRecordBatchReader] reader
>  shared_ptr[InputStream] in_stream
> cdef readonly:
>  Schema schema
> def _cinit_(self):
>  pass
> def _open(self, source):
>  get_input_stream(source, &self.in_stream)
>  with nogil:
>  check_status(CRecordBatchStreamReader.Open(
>  self.in_stream.get(), &self.reader))
> self.schema = pyarrow_wrap_schema(self.reader.get().schema())
> def _iter_(self):
>  while True:
>  yield self.read_next_batch()
> def get_next_batch(self):
>  import warnings
>  warnings.warn('Please use read_next_batch instead of '
>  'get_next_batch', FutureWarning)
>  return self.read_next_batch()
> def read_next_batch(self):
>  """
>  Read next RecordBatch from the stream. Raises StopIteration at end of
>  stream
>  """
>  cdef sha

[jira] [Updated] (SPARK-28482) Data incomplete when using pandas udf in pyspark

2019-07-23 Thread jiangyu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiangyu updated SPARK-28482:

Attachment: py3.6.png
worker.png
py2.7.png

> Data incomplete when using pandas udf in pyspark
> 
>
> Key: SPARK-28482
> URL: https://issues.apache.org/jira/browse/SPARK-28482
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: centos 7.4   
> pyarrow 0.10.0 0.14.0
> python 2.7 3.5 3.6
>Reporter: jiangyu
>Priority: Major
> Attachments: py2.7.png, py3.6.png, test.csv, test.py, worker.png
>
>
> Hi,
>  
> Since Spark 2.3.x, pandas udf has been introduced as default ser/des method 
> when using udf. However, an issue raises with python >= 3.5.x version.
> We use pandas udf to process batches of data, but we find the data is 
> incomplete in python 3.x. At first , i think the process logical maybe wrong, 
> so i change the code to very simple one and it has the same problem.After 
> investigate for a week, i find it is related to pyarrow.   
>  
> *Reproduce procedure:*
> 1. prepare data
> The data have seven column, a、b、c、d、e、f and g, data type is Integer
> a,b,c,d,e,f,g
> 1,2,3,4,5,6,7
> 1,2,3,4,5,6,7
> 1,2,3,4,5,6,7
> 1,2,3,4,5,6,7
>  produce 100,000 rows and name the file test.csv ,upload to hdfs, then load 
> it , and repartition it to 1 partition.
>  
> {code:java}
> df=spark.read.format('csv').option("header","true").load('/test.csv')
> df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
> df=df.repartition(1)
> spark_context = SparkContext.getOrCreate() {code}
>  
> 2.register pandas udf
>  
> {code:java}
> def add_func(a,b,c,d,e,f,g):
> print('iterator one time')
> return a
> add = pandas_udf(add_func, returnType=IntegerType())
> df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
>  
> 3.apply pandas udf
>  
> {code:java}
> def trigger_func(iterator):
>       yield iterator
> df_result.rdd.foreachPartition(trigger_func){code}
>  
> 4.execute it in pyspark (local or yarn)
> run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=10. As 
> mentioned before the total row number is 100, it should print "iterator 
> one time " 10 times.
> (1)Python 2.7 envs:
>  
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
> spark.executor.pyspark.memory=2g --conf 
> spark.sql.execution.arrow.enabled=true --executor-cores 1{code}
>  
> !image-2019-07-23-16-06-49-889.png!  
> The result is right, 10 times of print.
>  
>  
> (2)Python 3.5 or 3.6 envs:
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf 
> spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
> spark.executor.pyspark.memory=2g --conf 
> spark.sql.execution.arrow.enabled=true --executor-cores{code}
>  
> !py3.6.png!
> The data is incomplete. Exception is print by jvm spark which have been added 
> by us , I will explain it later.
>  
>  
> h3. *Investigation*
> The “process done” is added in the worker.py.
> !worker.png!   
> In order to get the exception,  change the spark code, the code is under 
> core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to 
> print the exception.
>  
>  
> {code:java}
> @@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
>  case t: Throwable =>
>  // Purposefully not using NonFatal, because even fatal exceptions
>  // we don't want to have our finallyBlock suppress
> + logInfo(t.getLocalizedMessage)
> + t.printStackTrace()
>  originalThrowable = t
>  throw originalThrowable
>  } finally {{code}
>  
>  
> It seems the pyspark get the data from jvm , but pyarrow get the data 
> incomplete. Pyarrow side think the data is finished, then shutdown the 
> socket. At the same time, the jvm side still writes to the same socket , but 
> get socket close exception.
> The pyarrow part is in ipc.pxi:
>  
> {code:java}
> cdef class _RecordBatchReader:
>  cdef:
>  shared_ptr[CRecordBatchReader] reader
>  shared_ptr[InputStream] in_stream
> cdef readonly:
>  Schema schema
> def _cinit_(self):
>  pass
> def _open(self, source):
>  get_input_stream(source, &self.in_stream)
>  with nogil:
>  check_status(CRecordBatchStreamReader.Open(
>  self.in_stream.get(), &self.reader))
> self.schema = pyarrow_wrap_schema(self.reader.get().schema())
> def _iter_(self):
>  while True:
>  yield self.read_next_batch()
> def get_next_batch(self):
>  import warnings
>  warnings.warn('Please use read_next_batch instead of '
>  'get_next_batch', FutureWarning)
>  return self.read_next_batch()
> def read_next_batch(self):
>  """
>  Read next RecordBatch from the stream. Raises StopIteration at end o

[jira] [Created] (SPARK-28482) Data incomplete when using pandas udf in pyspark

2019-07-23 Thread jiangyu (JIRA)
jiangyu created SPARK-28482:
---

 Summary: Data incomplete when using pandas udf in pyspark
 Key: SPARK-28482
 URL: https://issues.apache.org/jira/browse/SPARK-28482
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.4.3
 Environment: centos 7.4   

pyarrow 0.10.0 0.14.0

python 2.7 3.5 3.6
Reporter: jiangyu


Hi,
 
Since Spark 2.3.x, pandas udf has been introduced as default ser/des method 
when using udf. However, an issue raises with python >= 3.5.x version.
We use pandas udf to process batches of data, but we find the data is 
incomplete in python 3.x. At first , i think the process logical maybe wrong, 
so i change the code to very simple one and it has the same problem.After 
investigate for a week, i find it is related to pyarrow.   
 
*Reproduce procedure:*

1. prepare data
The data have seven column, a、b、c、d、e、f and g, data type is Integer
a,b,c,d,e,f,g
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
 produce 100,000 rows and name the file test.csv ,upload to hdfs, then load it 
, and repartition it to 1 partition.
 
{code:java}
df=spark.read.format('csv').option("header","true").load('/test.csv')
df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
df=df.repartition(1)
spark_context = SparkContext.getOrCreate() {code}
 
2.register pandas udf
 
{code:java}
def add_func(a,b,c,d,e,f,g):
print('iterator one time')
return a
add = pandas_udf(add_func, returnType=IntegerType())
df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
 
3.apply pandas udf
 
{code:java}
def trigger_func(iterator):
      yield iterator
df_result.rdd.foreachPartition(trigger_func){code}
 
4.execute it in pyspark (local or yarn)
run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=10. As 
mentioned before the total row number is 100, it should print "iterator one 
time " 10 times.
(1)Python 2.7 envs:
 
{code:java}
PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf 
spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true 
--executor-cores 1{code}
 
!image-2019-07-23-16-06-49-889.png!  
The result is right, 10 times of print.

 

 

(2)Python 3.5 or 3.6 envs:
{code:java}
PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf 
spark.sql.execution.arrow.maxRecordsPerBatch=10 --conf 
spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true 
--executor-cores{code}
 

!py3.6.png!

The data is incomplete. Exception is print by jvm spark which have been added 
by us , I will explain it later.
 
 
h3. *Investigation*

The “process done” is added in the worker.py.
!worker.png!   
In order to get the exception,  change the spark code, the code is under 
core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to 
print the exception.
 

 
{code:java}
@@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
 case t: Throwable =>
 // Purposefully not using NonFatal, because even fatal exceptions
 // we don't want to have our finallyBlock suppress
+ logInfo(t.getLocalizedMessage)
+ t.printStackTrace()
 originalThrowable = t
 throw originalThrowable
 } finally {{code}
 

 
It seems the pyspark get the data from jvm , but pyarrow get the data 
incomplete. Pyarrow side think the data is finished, then shutdown the socket. 
At the same time, the jvm side still writes to the same socket , but get socket 
close exception.
The pyarrow part is in ipc.pxi:
 
{code:java}
cdef class _RecordBatchReader:
 cdef:
 shared_ptr[CRecordBatchReader] reader
 shared_ptr[InputStream] in_stream
cdef readonly:
 Schema schema
def _cinit_(self):
 pass
def _open(self, source):
 get_input_stream(source, &self.in_stream)
 with nogil:
 check_status(CRecordBatchStreamReader.Open(
 self.in_stream.get(), &self.reader))
self.schema = pyarrow_wrap_schema(self.reader.get().schema())
def _iter_(self):
 while True:
 yield self.read_next_batch()
def get_next_batch(self):
 import warnings
 warnings.warn('Please use read_next_batch instead of '
 'get_next_batch', FutureWarning)
 return self.read_next_batch()
def read_next_batch(self):
 """
 Read next RecordBatch from the stream. Raises StopIteration at end of
 stream
 """
 cdef shared_ptr[CRecordBatch] batch
with nogil:
 check_status(self.reader.get().ReadNext(&batch))
if batch.get() == NULL:
 raise StopIteration
 return pyarrow_wrap_batch(batch){code}
 

read_next_batch function get NULL, think the iterator is over.
 
h3. *RESULT*

Our environment is spark 2.4.3, we have tried pyarrow version 0.10.0 and 0.14.0 
, python version is python 2.7, python 3.5, python 3.6.
When using python 2.7, everything is fine. But when change to python 3.5,3,6, 
the data is wrong.
The column number is critical to trigger this bug, if column number is les