[ 
https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782789#comment-16782789
 ] 

Sean Owen commented on SPARK-27025:
-----------------------------------

It's an interesting question; let's break it down.

Calling toLocalIterator on an RDD of N partitions actually runs N jobs to 
compute the partitions individually. That's fine except you wait for the next 
partition job to complete after consuming the last one's iterator.

You could cache the RDD (essential here anyway) and materialize it all first 
with count() or something, then run toLocalIterator. That more or less 
eliminates this delay and ensures you only have one partition of data on the 
driver at a time. Yes it means you persist the RDD. That's actually vital for 
an RDD from a wide transformation; you absolutely don't want to recompute the 
whole thing N times. For a narrow transform, OK, per-partition computation is 
in theory no more work than computing it once in one go, even without caching.

Of course, this also means you don't start iterating at all until all are 
partitions are done. In some cases you can't do better anyway (e.g. a wide 
transformation where all partitions have to be computed at once anyway). But 
then again, even for narrow transforms, the wall-clock time to compute 1 
partition is about the same for all partitions. You'd wait as long for 1 to 
finish as for N, assuming they're fairly equally sized tasks.

toLocalIterator could also compute the partitions in parallel on the driver. 
But this more or less reduces to collect(), as all the results might arrive on 
the driver before they're consumed.

It could, say, compute partitions in parallel in a way that partition N+1 is 
started as soon as the job for N finishes. That's not too hard even, but now we 
have up to 2 partitions' worth of data on the driver instead of 1. There's a 
tradeoff there, in complexity and extra driver memory, but it's coherent.

This is even implementable now in your code if you want to try it; just call 
sc.runJob directly like toLocalIterator does and add the fetch-ahead logic. Do 
you even care about consuming the results in order, or just iterating over the 
partitions' results as soon as each is available? if doing it in order isn't 
required, this is even better than a parallel toLocalIterator. You run back 
into the issue that all the data might arrive on the driver at one time; if 
that's an issue here this probably won't fly. If it's not an issue, this 
probably doesn't add a lot over just collect()-ing but it's possible.

I'm not against trying the 2-partition implementation of toLocalIterator, but 
think the use case for it is limited, given that many scenarios have better or 
no-worse solutions already, per above.

> Speed up toLocalIterator
> ------------------------
>
>                 Key: SPARK-27025
>                 URL: https://issues.apache.org/jira/browse/SPARK-27025
>             Project: Spark
>          Issue Type: Wish
>          Components: Spark Core
>    Affects Versions: 2.3.3
>            Reporter: Erik van Oosten
>            Priority: Major
>
> Method {{toLocalIterator}} fetches the partitions to the driver one by one. 
> However, as far as I can see, any required computation for the 
> yet-to-be-fetched-partitions is not kicked off until it is fetched. 
> Effectively only one partition is being computed at the same time. 
> Desired behavior: immediately start calculation of all partitions while 
> retaining the download-a-partition at a time behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to