[jira] [Commented] (SPARK-27025) Speed up toLocalIterator

2019-11-27 Thread Sean R. Owen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983646#comment-16983646
 ] 

Sean R. Owen commented on SPARK-27025:
--

It's a duplicate JIRA, so I closed it. It's always possible to keep discussing 
it. This one was auto-closed. See the discussion here for where it ended up. I 
think the next step is prototyping what cases it can speed up and how much.

> Speed up toLocalIterator
> 
>
> Key: SPARK-27025
> URL: https://issues.apache.org/jira/browse/SPARK-27025
> Project: Spark
>  Issue Type: Wish
>  Components: Spark Core
>Affects Versions: 2.3.3
>Reporter: Erik van Oosten
>Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27025) Speed up toLocalIterator

2019-11-27 Thread Peng Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983635#comment-16983635
 ] 

Peng Cheng commented on SPARK-27025:


Am I too late for this issue?

I submitted SPARK-29852. Do you think it is a viable solution [~srowen] ?

> Speed up toLocalIterator
> 
>
> Key: SPARK-27025
> URL: https://issues.apache.org/jira/browse/SPARK-27025
> Project: Spark
>  Issue Type: Wish
>  Components: Spark Core
>Affects Versions: 2.3.3
>Reporter: Erik van Oosten
>Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27025) Speed up toLocalIterator

2019-03-05 Thread Sean Owen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16784892#comment-16784892
 ] 

Sean Owen commented on SPARK-27025:
---

You'll want to cache() the thing you call toLocalIterator() on no matter what 
in this case. If it's not helping, then I think the delay remains the 
transferring of data to the driver, as it will all be computed and cached 
before you start. The 2-at-a-time implementation could help that and I'd be 
curious if it works out.

> Speed up toLocalIterator
> 
>
> Key: SPARK-27025
> URL: https://issues.apache.org/jira/browse/SPARK-27025
> Project: Spark
>  Issue Type: Wish
>  Components: Spark Core
>Affects Versions: 2.3.3
>Reporter: Erik van Oosten
>Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27025) Speed up toLocalIterator

2019-03-05 Thread Erik van Oosten (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16784854#comment-16784854
 ] 

Erik van Oosten commented on SPARK-27025:
-

If there is no obvious way to improve Spark, then its probably better to close 
this issue until someone finds a better angle.

BTW, the cache/count/iterate/unpersist cycle did not make it faster for my use 
case. I will try the 2-partition implementation of toLocalIterator.

[~srowen], [~hyukjin.kwon], thanks for your input!

> Speed up toLocalIterator
> 
>
> Key: SPARK-27025
> URL: https://issues.apache.org/jira/browse/SPARK-27025
> Project: Spark
>  Issue Type: Wish
>  Components: Spark Core
>Affects Versions: 2.3.3
>Reporter: Erik van Oosten
>Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27025) Speed up toLocalIterator

2019-03-04 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16784014#comment-16784014
 ] 

Hyukjin Kwon commented on SPARK-27025:
--

Yes but there might be many variants of implementations. It has a tradeoff as 
Sean described above.

> Speed up toLocalIterator
> 
>
> Key: SPARK-27025
> URL: https://issues.apache.org/jira/browse/SPARK-27025
> Project: Spark
>  Issue Type: Wish
>  Components: Spark Core
>Affects Versions: 2.3.3
>Reporter: Erik van Oosten
>Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27025) Speed up toLocalIterator

2019-03-04 Thread Erik van Oosten (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16783220#comment-16783220
 ] 

Erik van Oosten commented on SPARK-27025:
-

[~hyukjin.kwon] maybe I misunderstood Sean's comment. I understood that every 
invocation of toLocalIterator will either benefit, or not have any negative 
side effect.

Under this assumption, it would be better to put the 
cache/count/iterate/unpersist logic directly in toLocalIterator.

> Speed up toLocalIterator
> 
>
> Key: SPARK-27025
> URL: https://issues.apache.org/jira/browse/SPARK-27025
> Project: Spark
>  Issue Type: Wish
>  Components: Spark Core
>Affects Versions: 2.3.3
>Reporter: Erik van Oosten
>Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27025) Speed up toLocalIterator

2019-03-04 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16783122#comment-16783122
 ] 

Hyukjin Kwon commented on SPARK-27025:
--

It's one use case. How common is that use case? If not, let's don't add it to 
Spark.

> Speed up toLocalIterator
> 
>
> Key: SPARK-27025
> URL: https://issues.apache.org/jira/browse/SPARK-27025
> Project: Spark
>  Issue Type: Wish
>  Components: Spark Core
>Affects Versions: 2.3.3
>Reporter: Erik van Oosten
>Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27025) Speed up toLocalIterator

2019-03-04 Thread Erik van Oosten (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16783110#comment-16783110
 ] 

Erik van Oosten commented on SPARK-27025:
-

Thanks Sean, that is very useful.

In my use case the entire data set is too big for the driver, but I can easily 
fit 1/10th of it. So even with as little as 20 partitions, 2 partitions on the 
driver would be fine.
In the use case there are 2 joins, and a groupby/count so this is probably a 
wide transformation. So it seems that the cache/count/toLocalIterator/unpersist 
approach is applicable.

The ergonomics of this approach are way worse, so I don't agree that it is 
'better' to do this in application code.

> Speed up toLocalIterator
> 
>
> Key: SPARK-27025
> URL: https://issues.apache.org/jira/browse/SPARK-27025
> Project: Spark
>  Issue Type: Wish
>  Components: Spark Core
>Affects Versions: 2.3.3
>Reporter: Erik van Oosten
>Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27025) Speed up toLocalIterator

2019-03-04 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16783088#comment-16783088
 ] 

Hyukjin Kwon commented on SPARK-27025:
--

Yes, I think it should better be implemented in application side codes.

> Speed up toLocalIterator
> 
>
> Key: SPARK-27025
> URL: https://issues.apache.org/jira/browse/SPARK-27025
> Project: Spark
>  Issue Type: Wish
>  Components: Spark Core
>Affects Versions: 2.3.3
>Reporter: Erik van Oosten
>Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27025) Speed up toLocalIterator

2019-03-03 Thread Sean Owen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782789#comment-16782789
 ] 

Sean Owen commented on SPARK-27025:
---

It's an interesting question; let's break it down.

Calling toLocalIterator on an RDD of N partitions actually runs N jobs to 
compute the partitions individually. That's fine except you wait for the next 
partition job to complete after consuming the last one's iterator.

You could cache the RDD (essential here anyway) and materialize it all first 
with count() or something, then run toLocalIterator. That more or less 
eliminates this delay and ensures you only have one partition of data on the 
driver at a time. Yes it means you persist the RDD. That's actually vital for 
an RDD from a wide transformation; you absolutely don't want to recompute the 
whole thing N times. For a narrow transform, OK, per-partition computation is 
in theory no more work than computing it once in one go, even without caching.

Of course, this also means you don't start iterating at all until all are 
partitions are done. In some cases you can't do better anyway (e.g. a wide 
transformation where all partitions have to be computed at once anyway). But 
then again, even for narrow transforms, the wall-clock time to compute 1 
partition is about the same for all partitions. You'd wait as long for 1 to 
finish as for N, assuming they're fairly equally sized tasks.

toLocalIterator could also compute the partitions in parallel on the driver. 
But this more or less reduces to collect(), as all the results might arrive on 
the driver before they're consumed.

It could, say, compute partitions in parallel in a way that partition N+1 is 
started as soon as the job for N finishes. That's not too hard even, but now we 
have up to 2 partitions' worth of data on the driver instead of 1. There's a 
tradeoff there, in complexity and extra driver memory, but it's coherent.

This is even implementable now in your code if you want to try it; just call 
sc.runJob directly like toLocalIterator does and add the fetch-ahead logic. Do 
you even care about consuming the results in order, or just iterating over the 
partitions' results as soon as each is available? if doing it in order isn't 
required, this is even better than a parallel toLocalIterator. You run back 
into the issue that all the data might arrive on the driver at one time; if 
that's an issue here this probably won't fly. If it's not an issue, this 
probably doesn't add a lot over just collect()-ing but it's possible.

I'm not against trying the 2-partition implementation of toLocalIterator, but 
think the use case for it is limited, given that many scenarios have better or 
no-worse solutions already, per above.

> Speed up toLocalIterator
> 
>
> Key: SPARK-27025
> URL: https://issues.apache.org/jira/browse/SPARK-27025
> Project: Spark
>  Issue Type: Wish
>  Components: Spark Core
>Affects Versions: 2.3.3
>Reporter: Erik van Oosten
>Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27025) Speed up toLocalIterator

2019-03-02 Thread Erik van Oosten (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782322#comment-16782322
 ] 

Erik van Oosten commented on SPARK-27025:
-

I have a program in which several steps need to be executed before anything can 
be transferred to the driver. So why can't the executors start executing 
immediately, and only transfer the results to the driver when its ready?

> Speed up toLocalIterator
> 
>
> Key: SPARK-27025
> URL: https://issues.apache.org/jira/browse/SPARK-27025
> Project: Spark
>  Issue Type: Wish
>  Components: Spark Core
>Affects Versions: 2.3.3
>Reporter: Erik van Oosten
>Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27025) Speed up toLocalIterator

2019-03-01 Thread Sean Owen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782051#comment-16782051
 ] 

Sean Owen commented on SPARK-27025:
---

If you fetched it all at once proactively, you have another problem: what if it 
doesn't fit on the driver? the use case for toLocalIterator() is probably 
exactly to avoid this.

> Speed up toLocalIterator
> 
>
> Key: SPARK-27025
> URL: https://issues.apache.org/jira/browse/SPARK-27025
> Project: Spark
>  Issue Type: Wish
>  Components: Spark Core
>Affects Versions: 2.3.3
>Reporter: Erik van Oosten
>Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org