[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...

2018-06-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19498


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...

2017-11-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19498#discussion_r151855253
  
--- Diff: python/pyspark/streaming/util.py ---
@@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds):
 t = datetime.fromtimestamp(milliseconds / 1000.0)
 r = self.func(t, *rdds)
 if r:
-return r._jrdd
+# Here, we work around to ensure `_jrdd` is `JavaRDD` by 
wrapping it by `map`.
+# 
org.apache.spark.streaming.api.python.PythonTransformFunction requires to return
+# `JavaRDD`; however, this could be `JavaPairRDD` by some 
APIs, for example, `zip`.
+# See SPARK-17756.
+return r.map(lambda x: x)._jrdd
--- End diff --

Thanks for review @holdenk. Let me push the change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...

2017-11-18 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19498#discussion_r151839513
  
--- Diff: python/pyspark/streaming/util.py ---
@@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds):
 t = datetime.fromtimestamp(milliseconds / 1000.0)
 r = self.func(t, *rdds)
 if r:
-return r._jrdd
+# Here, we work around to ensure `_jrdd` is `JavaRDD` by 
wrapping it by `map`.
+# 
org.apache.spark.streaming.api.python.PythonTransformFunction requires to return
+# `JavaRDD`; however, this could be `JavaPairRDD` by some 
APIs, for example, `zip`.
+# See SPARK-17756.
+return r.map(lambda x: x)._jrdd
--- End diff --

Personally, I think the only applying the `map` when the result is not 
JavaRDD is a good incremental improvement (since otherwise the code path fails 
right?).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...

2017-11-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19498#discussion_r149649246
  
--- Diff: python/pyspark/streaming/util.py ---
@@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds):
 t = datetime.fromtimestamp(milliseconds / 1000.0)
 r = self.func(t, *rdds)
 if r:
-return r._jrdd
+# Here, we work around to ensure `_jrdd` is `JavaRDD` by 
wrapping it by `map`.
+# 
org.apache.spark.streaming.api.python.PythonTransformFunction requires to return
+# `JavaRDD`; however, this could be `JavaPairRDD` by some 
APIs, for example, `zip`.
+# See SPARK-17756.
+return r.map(lambda x: x)._jrdd
--- End diff --

@tdas, it's fine if you feel we should find another way or make a small 
benchmark. I can try. But just wonder which one you'd prefer before I go too 
far in a different way you'd prefer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...

2017-10-28 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19498#discussion_r147551966
  
--- Diff: python/pyspark/streaming/util.py ---
@@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds):
 t = datetime.fromtimestamp(milliseconds / 1000.0)
 r = self.func(t, *rdds)
 if r:
-return r._jrdd
+# Here, we work around to ensure `_jrdd` is `JavaRDD` by 
wrapping it by `map`.
+# 
org.apache.spark.streaming.api.python.PythonTransformFunction requires to return
+# `JavaRDD`; however, this could be `JavaPairRDD` by some 
APIs, for example, `zip`.
+# See SPARK-17756.
+return r.map(lambda x: x)._jrdd
--- End diff --

@tdas, I believe this one is something we should fix ... would you like me 
to keep this way, try the way above or find another way to avoid a map even if 
it is complex? 

Otherwise, I can run a benchmark on RDD with extra map too see the slight 
perf diff (although I am not sure yet if I can show the meaningful diff).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...

2017-10-24 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19498#discussion_r146531880
  
--- Diff: python/pyspark/streaming/util.py ---
@@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds):
 t = datetime.fromtimestamp(milliseconds / 1000.0)
 r = self.func(t, *rdds)
 if r:
-return r._jrdd
+# Here, we work around to ensure `_jrdd` is `JavaRDD` by 
wrapping it by `map`.
+# 
org.apache.spark.streaming.api.python.PythonTransformFunction requires to return
+# `JavaRDD`; however, this could be `JavaPairRDD` by some 
APIs, for example, `zip`.
+# See SPARK-17756.
+return r.map(lambda x: x)._jrdd
--- End diff --

Yup, it guess it would but I could not think of a better and simpler way 
(and I saw this workaround is already used above in 
`python/pyspark/streaming/context.py`). If a little bit more uglier (possibly 
flaky imho) way is okay for this concern, I could do this, for example, as 
below:

```python
if r._jrdd.getClass().toString() == "JavaRDD":
return r._jrdd
else:
return r.map(lambda x: x)._jrdd
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...

2017-10-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19498#discussion_r146514175
  
--- Diff: python/pyspark/streaming/util.py ---
@@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds):
 t = datetime.fromtimestamp(milliseconds / 1000.0)
 r = self.func(t, *rdds)
 if r:
-return r._jrdd
+# Here, we work around to ensure `_jrdd` is `JavaRDD` by 
wrapping it by `map`.
+# 
org.apache.spark.streaming.api.python.PythonTransformFunction requires to return
+# `JavaRDD`; however, this could be `JavaPairRDD` by some 
APIs, for example, `zip`.
+# See SPARK-17756.
+return r.map(lambda x: x)._jrdd
--- End diff --

wouldnt introducing this additional map cause a slight perf regression?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...

2017-10-14 Thread HyukjinKwon
GitHub user HyukjinKwon opened a pull request:

https://github.com/apache/spark/pull/19498

[SPARK-17756][PYTHON][STREAMING] Workaround to avoid return type mismatch 
in PythonTransformFunction

## What changes were proposed in this pull request?

This PR proposes to wrap the transformed rdd within `TransformFunction`. 
`PythonTransformFunction` looks requiring to return `JavaRDD` in `_jrdd`.


https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/python/pyspark/streaming/util.py#L67


https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala#L43

However, this could be `JavaPairRDD` by some APIs, for example, `zip` in 
PySpark's RDD API.
`_jrdd` could be checked as below:

```python
>>> rdd.zip(rdd)._jrdd.getClass().toString()
u'class org.apache.spark.api.java.JavaPairRDD'
```

So, here, I wrapped it with `map` so that it ensures returning `JavaRDD`.

```python
>>> rdd.zip(rdd).map(lambda x: x)._jrdd.getClass().toString()
u'class org.apache.spark.api.java.JavaRDD'
```

I tried to elaborate some failure cases as below:

```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]) \
.transform(lambda rdd: rdd.cartesian(rdd)) \
.pprint()
ssc.start()
```

```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.cartesian(rdd))
ssc.start()
```

```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.zip(rdd))
ssc.start()
```

```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: 
rdd.zip(rdd).union(rdd.zip(rdd)))
ssc.start()
```

```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: 
rdd.zip(rdd).coalesce(1))
ssc.start()
```

## How was this patch tested?

Unit tests were added in `python/pyspark/streaming/tests.py` and manually 
tested.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HyukjinKwon/spark SPARK-17756

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19498.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19498


commit f5a2a884d860e9c8b3f98fc4ae5f10eaf3c1a0a4
Author: hyukjinkwon 
Date:   2017-10-14T13:50:49Z

Workaround to avoid return type mispatch in PythonTransformFunction




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org