This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new dbbba80 [SPARK-26549][PYSPARK] Fix for python worker reuse take no effect for parallelize lazy iterable range dbbba80 is described below commit dbbba80b3cb319b147dcf82a69963eee662e289f Author: Yuanjian Li <xyliyuanj...@gmail.com> AuthorDate: Wed Jan 9 11:55:12 2019 +0800 [SPARK-26549][PYSPARK] Fix for python worker reuse take no effect for parallelize lazy iterable range ## What changes were proposed in this pull request? During the follow-up work(#23435) for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because of the specialize rdd.parallelize logic for xrange(introduced in #3264) generated data by lazy iterable range, which don't need to use the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse takes no effect. See more details in [SPARK-26549](https://issue [...] We fix this by force using the passed-in iterator. ## How was this patch tested? New UT in test_worker.py. Closes #23470 from xuanyuanking/SPARK-26549. Authored-by: Yuanjian Li <xyliyuanj...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/context.py | 8 ++++++++ python/pyspark/tests/test_worker.py | 12 +++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 64178eb..316fbc8 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -498,6 +498,14 @@ class SparkContext(object): return start0 + int((split * size / numSlices)) * step def f(split, iterator): + # it's an empty iterator here but we need this line for triggering the + # logic of signal handling in FramedSerializer.load_stream, for instance, + # SpecialLengths.END_OF_DATA_SECTION in _read_with_length. Since + # FramedSerializer.load_stream produces a generator, the control should + # at least be in that function once. Here we do it by explicitly converting + # the empty iterator to a list, thus make sure worker reuse takes effect. + # See more details in SPARK-26549. + assert len(list(iterator)) == 0 return xrange(getStart(split), getStart(split + 1), step) return self.parallelize([], numSlices).mapPartitionsWithIndex(f) diff --git a/python/pyspark/tests/test_worker.py b/python/pyspark/tests/test_worker.py index a33b77d..a4f108f 100644 --- a/python/pyspark/tests/test_worker.py +++ b/python/pyspark/tests/test_worker.py @@ -22,7 +22,7 @@ import time from py4j.protocol import Py4JJavaError -from pyspark.testing.utils import ReusedPySparkTestCase, QuietTest +from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, QuietTest if sys.version_info[0] >= 3: xrange = range @@ -145,6 +145,16 @@ class WorkerTests(ReusedPySparkTestCase): self.sc.pythonVer = version +class WorkerReuseTest(PySparkTestCase): + + def test_reuse_worker_of_parallelize_xrange(self): + rdd = self.sc.parallelize(xrange(20), 8) + previous_pids = rdd.map(lambda x: os.getpid()).collect() + current_pids = rdd.map(lambda x: os.getpid()).collect() + for pid in current_pids: + self.assertTrue(pid in previous_pids) + + if __name__ == "__main__": import unittest from pyspark.tests.test_worker import * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org