[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r233283645
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -147,8 +147,8 @@ def __init__(self):
 @classmethod
 def _getOrCreate(cls):
 """Internal function to get or create global BarrierTaskContext."""
-if cls._taskContext is None:
-cls._taskContext = BarrierTaskContext()
+if not isinstance(cls._taskContext, BarrierTaskContext):
+cls._taskContext = object.__new__(cls)
--- End diff --

```
could you add some comments to explain it?
```
@cloud-fan Sorry for the less explain and more comments should be done at 
first, will done in follow up PR.
```
Can we get rid of the rewrite all?
we should remove __init__ too
next time please fully describe what's going on in PR description
```
@HyukjinKwon Sorry for the less explain, all these will be done in next 
follow up PR, and the new UT.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r233275410
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -147,8 +147,8 @@ def __init__(self):
 @classmethod
 def _getOrCreate(cls):
 """Internal function to get or create global BarrierTaskContext."""
-if cls._taskContext is None:
-cls._taskContext = BarrierTaskContext()
+if not isinstance(cls._taskContext, BarrierTaskContext):
+cls._taskContext = object.__new__(cls)
--- End diff --

```
could you add some comments to explain it?
```
@cloud-fan Sorry for the less explain and more comments should be done at 
first, will done in follow up PR.
```
Can we get rid of the rewrite all?
we should remove __init__ too
next time please fully describe what's going on in PR description
```
@HyukjinKwon Sorry for the less explain, all these will be done in next 
follow up PR, and the new UT.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r233131375
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -147,8 +147,8 @@ def __init__(self):
 @classmethod
 def _getOrCreate(cls):
 """Internal function to get or create global BarrierTaskContext."""
-if cls._taskContext is None:
-cls._taskContext = BarrierTaskContext()
+if not isinstance(cls._taskContext, BarrierTaskContext):
+cls._taskContext = object.__new__(cls)
--- End diff --

Also, next time please fully describe what's going on in PR description. 
Even I was confused about it and misread that `__new__` is actually being 
inherited.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r233130494
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -147,8 +147,8 @@ def __init__(self):
 @classmethod
 def _getOrCreate(cls):
 """Internal function to get or create global BarrierTaskContext."""
-if cls._taskContext is None:
-cls._taskContext = BarrierTaskContext()
+if not isinstance(cls._taskContext, BarrierTaskContext):
+cls._taskContext = object.__new__(cls)
--- End diff --

Also, we should remove __init__ too. That's what Python interpretor will 
implicitly insert for both.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r233130221
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -147,8 +147,8 @@ def __init__(self):
 @classmethod
 def _getOrCreate(cls):
 """Internal function to get or create global BarrierTaskContext."""
-if cls._taskContext is None:
-cls._taskContext = BarrierTaskContext()
+if not isinstance(cls._taskContext, BarrierTaskContext):
+cls._taskContext = object.__new__(cls)
--- End diff --

Can we get rid of the rewrite all? It's never a good idea to overwrite them 
unless it's absolutely required.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r233117222
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -147,8 +147,8 @@ def __init__(self):
 @classmethod
 def _getOrCreate(cls):
 """Internal function to get or create global BarrierTaskContext."""
-if cls._taskContext is None:
-cls._taskContext = BarrierTaskContext()
+if not isinstance(cls._taskContext, BarrierTaskContext):
+cls._taskContext = object.__new__(cls)
--- End diff --

could you add some comments to explain it? so that people won't get 
confused again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r233055939
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -147,8 +147,8 @@ def __init__(self):
 @classmethod
 def _getOrCreate(cls):
 """Internal function to get or create global BarrierTaskContext."""
-if cls._taskContext is None:
-cls._taskContext = BarrierTaskContext()
+if not isinstance(cls._taskContext, BarrierTaskContext):
+cls._taskContext = object.__new__(cls)
--- End diff --

I think the answer is no, maybe I was not clear enough in my previous 
explain  https://github.com/apache/spark/pull/22962#discussion_r232528333, use 
`BarrierTaskContext()` here is my first commit 
https://github.com/apache/spark/pull/22962/commits/0cb2cf6e9ece66861073c31b579b595a9de5ce81
 , it should also need to rewrite `__new__` for `BarrierTaskContext`, otherwise 
the bug still exists cause its parent class `TaskContext` rewrite `__new__()`, 
when we call `BarrierTaskContext()` here in a reused worker, a `TaskContext` 
instance will be returned in 
`TaskContext.__new__()`:https://github.com/apache/spark/blob/c00e72f3d7530eb2ae43d4d45e8efde783daf6ff/python/pyspark/taskcontext.py#L47



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r233033846
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -147,8 +147,8 @@ def __init__(self):
 @classmethod
 def _getOrCreate(cls):
 """Internal function to get or create global BarrierTaskContext."""
-if cls._taskContext is None:
-cls._taskContext = BarrierTaskContext()
+if not isinstance(cls._taskContext, BarrierTaskContext):
+cls._taskContext = object.__new__(cls)
--- End diff --

ah good point! @xuanyuanking can you send a small followup?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r232991503
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -147,8 +147,8 @@ def __init__(self):
 @classmethod
 def _getOrCreate(cls):
 """Internal function to get or create global BarrierTaskContext."""
-if cls._taskContext is None:
-cls._taskContext = BarrierTaskContext()
+if not isinstance(cls._taskContext, BarrierTaskContext):
+cls._taskContext = object.__new__(cls)
--- End diff --

BTW, I think we should just `BarrierTaskContext()`. Let's don't make it 
complicated next time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r232990319
  
--- Diff: python/pyspark/tests.py ---
@@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self):
 """
 Verify that BarrierTaskContext.barrier() with reused python worker.
 """
+self.sc._conf.set("spark.python.work.reuse", "true")
--- End diff --

Yup. sorry for late response.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r232986340
  
--- Diff: python/pyspark/tests.py ---
@@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self):
 """
 Verify that BarrierTaskContext.barrier() with reused python worker.
 """
+self.sc._conf.set("spark.python.work.reuse", "true")
--- End diff --

@HyukjinKwon Hi Hyukjin if you still think this need a separate class I'll 
think about the method of checking worker reuse and give a follow up PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22962


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-12 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r232634698
  
--- Diff: python/pyspark/tests.py ---
@@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self):
 """
 Verify that BarrierTaskContext.barrier() with reused python worker.
 """
+self.sc._conf.set("spark.python.work.reuse", "true")
--- End diff --

I do these 2 check like below:
1. Run this test case without fix in `BarrierTaskContext._getOrCreate`, the 
bug can be reproduced.
2. Same code running in pyspark shell and set 
`spark.python.work.resue=false`, it return successfully.
Maybe this can prove the UT can cover the issue and also can reuse the 
original barrier case code, WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r232528655
  
--- Diff: python/pyspark/tests.py ---
@@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self):
 """
 Verify that BarrierTaskContext.barrier() with reused python worker.
 """
+self.sc._conf.set("spark.python.work.reuse", "true")
--- End diff --

@xuanyuanking, this will probably need a separate suite case since it's 
also related with how we start the worker or not. You can make a new class, run 
a simple job to make sure workers are created and being resued, test it and 
stop.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r232528333
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -144,10 +144,19 @@ def __init__(self):
 """Construct a BarrierTaskContext, use get instead"""
 pass
 
+def __new__(cls):
--- End diff --

Yep, do this in `_getOrCreate` has same effect, this is an over consider of 
https://github.com/apache/spark/blob/aec0af4a952df2957e21d39d1e0546a36ab7ab86/python/pyspark/taskcontext.py#L44-L45
Deleted in 02555b8.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r232527808
  
--- Diff: python/pyspark/tests.py ---
@@ -614,6 +614,18 @@ def context_barrier(x):
 times = 
rdd.barrier().mapPartitions(f).map(context_barrier).collect()
 self.assertTrue(max(times) - min(times) < 1)
 
+def test_barrier_with_python_worker_reuse(self):
+"""
+Verify that BarrierTaskContext.barrier() with reused python worker.
+"""
+rdd = self.sc.parallelize(range(4), 4)
--- End diff --

Thanks, done in 02555b8.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org