[ https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-26549. ---------------------------------- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23470 [https://github.com/apache/spark/pull/23470] > PySpark worker reuse take no effect for parallelize xrange > ---------------------------------------------------------- > > Key: SPARK-26549 > URL: https://issues.apache.org/jira/browse/SPARK-26549 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.0.0 > Reporter: Yuanjian Li > Assignee: Yuanjian Li > Priority: Major > Fix For: 3.0.0 > > > During [the follow-up > work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for > PySpark worker reuse scenario, we found that the worker reuse takes no effect > for `sc.parallelize(xrange(...))`. > It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) > generated data by xrange, which don't need to use the passed-in iterator. But > this will break the end of stream checking in python worker and finally cause > worker reuse takes no effect. > Relative code block and more details listing below: > Current specialize logic of xrange don't need the passed-in iterator, > context.py: > {code:java} > if isinstance(c, xrange): > ... > def f(split, iterator): > return xrange(getStart(split), getStart(split + 1), step) > ... > return self.parallelize([], numSlices).mapPartitionsWithIndex(f) > {code} > We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check > end of stream. See the code in worker.py: > {code:java} > # check end of stream > if read_int(infile) == SpecialLengths.END_OF_STREAM: > write_int(SpecialLengths.END_OF_STREAM, outfile) > else: > # write a different value to tell JVM to not reuse this worker > write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) > sys.exit(-1) > {code} > The code works well for parallelize(range) because the END_OF_DATA_SECTION > has been handled during load iterator from the socket stream, see the code in > FramedSerializer: > {code:java} > def load_stream(self, stream): > while True: > try: > yield self._read_with_length(stream) > except EOFError: > return > ... > def _read_with_length(self, stream): > length = read_int(stream) > if length == SpecialLengths.END_OF_DATA_SECTION: > raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in > load_stream > elif length == SpecialLengths.NULL: > return None > obj = stream.read(length) > if len(obj) < length: > raise EOFError > return self.loads(obj) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org