[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233283645 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- ``` could you add some comments to explain it? ``` @cloud-fan Sorry for the less explain and more comments should be done at first, will done in follow up PR. ``` Can we get rid of the rewrite all? we should remove __init__ too next time please fully describe what's going on in PR description ``` @HyukjinKwon Sorry for the less explain, all these will be done in next follow up PR, and the new UT. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233275410 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- ``` could you add some comments to explain it? ``` @cloud-fan Sorry for the less explain and more comments should be done at first, will done in follow up PR. ``` Can we get rid of the rewrite all? we should remove __init__ too next time please fully describe what's going on in PR description ``` @HyukjinKwon Sorry for the less explain, all these will be done in next follow up PR, and the new UT. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233131375 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- Also, next time please fully describe what's going on in PR description. Even I was confused about it and misread that `__new__` is actually being inherited. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233130494 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- Also, we should remove __init__ too. That's what Python interpretor will implicitly insert for both. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233130221 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- Can we get rid of the rewrite all? It's never a good idea to overwrite them unless it's absolutely required. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233117222 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- could you add some comments to explain it? so that people won't get confused again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233055939 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- I think the answer is no, maybe I was not clear enough in my previous explain https://github.com/apache/spark/pull/22962#discussion_r232528333, use `BarrierTaskContext()` here is my first commit https://github.com/apache/spark/pull/22962/commits/0cb2cf6e9ece66861073c31b579b595a9de5ce81 , it should also need to rewrite `__new__` for `BarrierTaskContext`, otherwise the bug still exists cause its parent class `TaskContext` rewrite `__new__()`, when we call `BarrierTaskContext()` here in a reused worker, a `TaskContext` instance will be returned in `TaskContext.__new__()`:https://github.com/apache/spark/blob/c00e72f3d7530eb2ae43d4d45e8efde783daf6ff/python/pyspark/taskcontext.py#L47 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233033846 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- ah good point! @xuanyuanking can you send a small followup? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232991503 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- BTW, I think we should just `BarrierTaskContext()`. Let's don't make it complicated next time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232990319 --- Diff: python/pyspark/tests.py --- @@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self): """ Verify that BarrierTaskContext.barrier() with reused python worker. """ +self.sc._conf.set("spark.python.work.reuse", "true") --- End diff -- Yup. sorry for late response. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232986340 --- Diff: python/pyspark/tests.py --- @@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self): """ Verify that BarrierTaskContext.barrier() with reused python worker. """ +self.sc._conf.set("spark.python.work.reuse", "true") --- End diff -- @HyukjinKwon Hi Hyukjin if you still think this need a separate class I'll think about the method of checking worker reuse and give a follow up PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22962 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232634698 --- Diff: python/pyspark/tests.py --- @@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self): """ Verify that BarrierTaskContext.barrier() with reused python worker. """ +self.sc._conf.set("spark.python.work.reuse", "true") --- End diff -- I do these 2 check like below: 1. Run this test case without fix in `BarrierTaskContext._getOrCreate`, the bug can be reproduced. 2. Same code running in pyspark shell and set `spark.python.work.resue=false`, it return successfully. Maybe this can prove the UT can cover the issue and also can reuse the original barrier case code, WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232528655 --- Diff: python/pyspark/tests.py --- @@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self): """ Verify that BarrierTaskContext.barrier() with reused python worker. """ +self.sc._conf.set("spark.python.work.reuse", "true") --- End diff -- @xuanyuanking, this will probably need a separate suite case since it's also related with how we start the worker or not. You can make a new class, run a simple job to make sure workers are created and being resued, test it and stop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232528333 --- Diff: python/pyspark/taskcontext.py --- @@ -144,10 +144,19 @@ def __init__(self): """Construct a BarrierTaskContext, use get instead""" pass +def __new__(cls): --- End diff -- Yep, do this in `_getOrCreate` has same effect, this is an over consider of https://github.com/apache/spark/blob/aec0af4a952df2957e21d39d1e0546a36ab7ab86/python/pyspark/taskcontext.py#L44-L45 Deleted in 02555b8. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232527808 --- Diff: python/pyspark/tests.py --- @@ -614,6 +614,18 @@ def context_barrier(x): times = rdd.barrier().mapPartitions(f).map(context_barrier).collect() self.assertTrue(max(times) - min(times) < 1) +def test_barrier_with_python_worker_reuse(self): +""" +Verify that BarrierTaskContext.barrier() with reused python worker. +""" +rdd = self.sc.parallelize(range(4), 4) --- End diff -- Thanks, done in 02555b8. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org