[ https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16913183#comment-16913183 ]
jiangyu edited comment on SPARK-28482 at 8/22/19 9:37 AM: ---------------------------------------------------------- hi, [~bryanc] , maybe you should produce more data, like 100,000 rows, and read 10,000 rows every iteration. The number of the rows is not right, is smaller than expected. I have investigate this issue this week, i find the row numbers is correct when arrow read from the socket , so in serializers.py , i revise the method of dump_stream, change the stream to local stream {code:java} // code placeholder def dump_stream(self, iterator, stream): """ Make ArrowRecordBatches from Pandas Series and serialize. Input is a single series or a list of series accompanied by an optional pyarrow type to coerce the data to. """ import pyarrow as pa writer = None local_stream = pa.output_stream('/tmp/output') try: for series in iterator: batch = _create_batch(series, self._timezone) if writer is None: # write_int(SpecialLengths.START_ARROW_STREAM, stream) # writer = pa.RecordBatchStreamWriter(stream, batch.schema) write_int(SpecialLengths.START_ARROW_STREAM, local_stream) writer = pa.RecordBatchStreamWriter(local_stream, batch.schema) writer.write_batch(batch) finally: if writer is not None: writer.close() {code} The row numbers is correct, and no exception throw. Then i change the daemon.py , and increase the buffer size of outfile, from 65536 to 655360000. {code:java} // code placeholder def worker(sock, authenticated): """ Called by a worker process after the fork(). """ signal.signal(SIGHUP, SIG_DFL) signal.signal(SIGCHLD, SIG_DFL) signal.signal(SIGTERM, SIG_DFL) # restore the handler for SIGINT, # it's useful for debugging (show the stacktrace before exit) signal.signal(SIGINT, signal.default_int_handler) # Read the socket using fdopen instead of socket.makefile() because the latter # seems to be very slow; note that we need to dup() the file descriptor because # otherwise writes also cause a seek that makes us miss data on the read side. infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536) outfile = os.fdopen(os.dup(sock.fileno()), "wb", 655360000) {code} And everything is ok. I don't know if it is safe to increase buffer size to that high. But it really help us. was (Author: jiangyu1211): hi, [~bryanc] , maybe you should produce more data, like 100,000 rows, and read 10,000 rows every iteration. The number of the rows is not right, is smaller than expected. I have investigate this issue this week, i find the row numbers is correct when arrow read from the socket , so in serializers.py , i revise the method of dump_stream, change the stream to local stream {code:java} // code placeholder def dump_stream(self, iterator, stream): """ Make ArrowRecordBatches from Pandas Series and serialize. Input is a single series or a list of series accompanied by an optional pyarrow type to coerce the data to. """ import pyarrow as pa writer = None local_stream = pa.output_stream('/tmp/output') try: for series in iterator: batch = _create_batch(series, self._timezone) if writer is None: # write_int(SpecialLengths.START_ARROW_STREAM, stream) # writer = pa.RecordBatchStreamWriter(stream, batch.schema) write_int(SpecialLengths.START_ARROW_STREAM, local_stream) writer = pa.RecordBatchStreamWriter(local_stream, batch.schema) writer.write_batch(batch) finally: if writer is not None: writer.close() {code} The row numbers is correct, and no exception throw. Then i change the daemon.py , and increase the buffer size of outfile, from 65536 to 655360000. {code:java} // code placeholder def worker(sock, authenticated): """ Called by a worker process after the fork(). """ signal.signal(SIGHUP, SIG_DFL) signal.signal(SIGCHLD, SIG_DFL) signal.signal(SIGTERM, SIG_DFL) # restore the handler for SIGINT, # it's useful for debugging (show the stacktrace before exit) signal.signal(SIGINT, signal.default_int_handler) # Read the socket using fdopen instead of socket.makefile() because the latter # seems to be very slow; note that we need to dup() the file descriptor because # otherwise writes also cause a seek that makes us miss data on the read side. infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536) outfile = os.fdopen(os.dup(sock.fileno()), "wb", 655360000) {code} And everything is ok. So i don't know if it is safe to increase buffer size to this high. But it is really help us. > Data incomplete when using pandas udf in Python 3 > ------------------------------------------------- > > Key: SPARK-28482 > URL: https://issues.apache.org/jira/browse/SPARK-28482 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.3, 2.4.3 > Environment: centos 7.4 > pyarrow 0.10.0 0.14.0 > python 2.7 3.5 3.6 > Reporter: jiangyu > Priority: Major > Attachments: py2.7.png, py3.6.png, test.csv, test.py, worker.png > > > Hi, > > Since Spark 2.3.x, pandas udf has been introduced as default ser/des method > when using udf. However, an issue raises with python >= 3.5.x version. > We use pandas udf to process batches of data, but we find the data is > incomplete in python 3.x. At first , i think the process logical maybe wrong, > so i change the code to very simple one and it has the same problem.After > investigate for a week, i find it is related to pyarrow. > > *Reproduce procedure:* > 1. prepare data > The data have seven column, a、b、c、d、e、f and g, data type is Integer > a,b,c,d,e,f,g > 1,2,3,4,5,6,7 > 1,2,3,4,5,6,7 > 1,2,3,4,5,6,7 > 1,2,3,4,5,6,7 > produce 100,000 rows and name the file test.csv ,upload to hdfs, then load > it , and repartition it to 1 partition. > > {code:java} > df=spark.read.format('csv').option("header","true").load('/test.csv') > df=df.select(*(col(c).cast("int").alias(c) for c in df.columns)) > df=df.repartition(1) > spark_context = SparkContext.getOrCreate() {code} > > 2.register pandas udf > > {code:java} > def add_func(a,b,c,d,e,f,g): > print('iterator one time') > return a > add = pandas_udf(add_func, returnType=IntegerType()) > df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code} > > 3.apply pandas udf > > {code:java} > def trigger_func(iterator): > yield iterator > df_result.rdd.foreachPartition(trigger_func){code} > > 4.execute it in pyspark (local or yarn) > run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=100000. As > mentioned before the total row number is 1000000, it should print "iterator > one time " 10 times. > (1)Python 2.7 envs: > > {code:java} > PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf > spark.sql.execution.arrow.maxRecordsPerBatch=100000 --conf > spark.executor.pyspark.memory=2g --conf > spark.sql.execution.arrow.enabled=true --executor-cores 1{code} > > !py2.7.png! > The result is right, 10 times of print. > > > (2)Python 3.5 or 3.6 envs: > {code:java} > PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf > spark.sql.execution.arrow.maxRecordsPerBatch=100000 --conf > spark.executor.pyspark.memory=2g --conf > spark.sql.execution.arrow.enabled=true --executor-cores{code} > > !py3.6.png! > The data is incomplete. Exception is print by jvm spark which have been added > by us , I will explain it later. > > > h3. *Investigation* > The “process done” is added in the worker.py. > !worker.png! > In order to get the exception, change the spark code, the code is under > core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to > print the exception. > > > {code:java} > @@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging { > case t: Throwable => > // Purposefully not using NonFatal, because even fatal exceptions > // we don't want to have our finallyBlock suppress > + logInfo(t.getLocalizedMessage) > + t.printStackTrace() > originalThrowable = t > throw originalThrowable > } finally {{code} > > > It seems the pyspark get the data from jvm , but pyarrow get the data > incomplete. Pyarrow side think the data is finished, then shutdown the > socket. At the same time, the jvm side still writes to the same socket , but > get socket close exception. > The pyarrow part is in ipc.pxi: > > {code:java} > cdef class _RecordBatchReader: > cdef: > shared_ptr[CRecordBatchReader] reader > shared_ptr[InputStream] in_stream > cdef readonly: > Schema schema > def _cinit_(self): > pass > def _open(self, source): > get_input_stream(source, &self.in_stream) > with nogil: > check_status(CRecordBatchStreamReader.Open( > self.in_stream.get(), &self.reader)) > self.schema = pyarrow_wrap_schema(self.reader.get().schema()) > def _iter_(self): > while True: > yield self.read_next_batch() > def get_next_batch(self): > import warnings > warnings.warn('Please use read_next_batch instead of ' > 'get_next_batch', FutureWarning) > return self.read_next_batch() > def read_next_batch(self): > """ > Read next RecordBatch from the stream. Raises StopIteration at end of > stream > """ > cdef shared_ptr[CRecordBatch] batch > with nogil: > check_status(self.reader.get().ReadNext(&batch)) > if batch.get() == NULL: > raise StopIteration > return pyarrow_wrap_batch(batch){code} > > read_next_batch function get NULL, think the iterator is over. > > h3. *RESULT* > Our environment is spark 2.4.3, we have tried pyarrow version 0.10.0 and > 0.14.0 , python version is python 2.7, python 3.5, python 3.6. > When using python 2.7, everything is fine. But when change to python > 3.5,3,6, the data is wrong. > The column number is critical to trigger this bug, if column number is less > than 5 , this bug probably will not happen. But If the column number is big , > for example 7 or above, it happens every time. > So we wonder if there is some conflict between python 3.x and pyarrow > version? > I have put the code and data as attachment. -- This message was sent by Atlassian Jira (v8.3.2#803003) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org