[ 
https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuanjian Li updated SPARK-26549:
--------------------------------
    Description: 
During [the follow-up 
work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
PySpark worker reuse scenario, we found that the worker reuse takes no effect 
for Python3 while works properly for Python2 and PyPy.
It happened because, during the python worker check end of the stream in 
Python3, we got an unexpected value -1 here which refers to 
END_OF_DATA_SECTION. See the code in worker.py:
{code:python}
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
    write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
    # write a different value to tell JVM to not reuse this worker
    write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
    sys.exit(-1)
{code}
The code works well for Python2 and PyPy cause the END_OF_DATA_SECTION has been 
handled during load iterator from the socket stream, see the code in 
FramedSerializer:

{code:python}
def load_stream(self, stream):
    while True:
        try:
            yield self._read_with_length(stream)
        except EOFError:
            return

...

def _read_with_length(self, stream):
    length = read_int(stream)
    if length == SpecialLengths.END_OF_DATA_SECTION:
        raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
load_stream
    elif length == SpecialLengths.NULL:
        return None
    obj = stream.read(length)
    if len(obj) < length:
        raise EOFError
    return self.loads(obj)
{code}



  was:
During [the follow-up 
work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
PySpark worker reuse scenario, we found that the worker reuse takes no effect 
for Python3 while works properly for Python2 and PyPy.
It happened because, during the python worker check end of the stream in 
Python3, we got an unexpected value -1 here which refers to 
END_OF_DATA_SECTION. See the code in worker.py:
{code:python}
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
    write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
    # write a different value to tell JVM to not reuse this worker
    write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
    sys.exit(-1)
{code}
The code works well for Python2 and PyPy cause the END_OF_DATA_SECTION has been 
handled during load iterator from the socket stream, see the code in 
FramedSerializer:

{code:python}
    def load_stream(self, stream):
        while True:
            try:
                yield self._read_with_length(stream)
            except EOFError:
                return
    ...

    def _read_with_length(self, stream):
        length = read_int(stream)
        if length == SpecialLengths.END_OF_DATA_SECTION:
            raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
load_stream
        elif length == SpecialLengths.NULL:
            return None
        obj = stream.read(length)
        if len(obj) < length:
            raise EOFError
        return self.loads(obj)
{code}




> PySpark worker reuse take no effect for Python3
> -----------------------------------------------
>
>                 Key: SPARK-26549
>                 URL: https://issues.apache.org/jira/browse/SPARK-26549
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.0.0
>            Reporter: Yuanjian Li
>            Priority: Major
>
> During [the follow-up 
> work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
> PySpark worker reuse scenario, we found that the worker reuse takes no effect 
> for Python3 while works properly for Python2 and PyPy.
> It happened because, during the python worker check end of the stream in 
> Python3, we got an unexpected value -1 here which refers to 
> END_OF_DATA_SECTION. See the code in worker.py:
> {code:python}
> # check end of stream
> if read_int(infile) == SpecialLengths.END_OF_STREAM:
>     write_int(SpecialLengths.END_OF_STREAM, outfile)
> else:
>     # write a different value to tell JVM to not reuse this worker
>     write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
>     sys.exit(-1)
> {code}
> The code works well for Python2 and PyPy cause the END_OF_DATA_SECTION has 
> been handled during load iterator from the socket stream, see the code in 
> FramedSerializer:
> {code:python}
> def load_stream(self, stream):
>     while True:
>         try:
>             yield self._read_with_length(stream)
>         except EOFError:
>             return
> ...
> def _read_with_length(self, stream):
>     length = read_int(stream)
>     if length == SpecialLengths.END_OF_DATA_SECTION:
>         raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
> load_stream
>     elif length == SpecialLengths.NULL:
>         return None
>     obj = stream.read(length)
>     if len(obj) < length:
>         raise EOFError
>     return self.loads(obj)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to