[ https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782789#comment-16782789 ]
Sean Owen commented on SPARK-27025: ----------------------------------- It's an interesting question; let's break it down. Calling toLocalIterator on an RDD of N partitions actually runs N jobs to compute the partitions individually. That's fine except you wait for the next partition job to complete after consuming the last one's iterator. You could cache the RDD (essential here anyway) and materialize it all first with count() or something, then run toLocalIterator. That more or less eliminates this delay and ensures you only have one partition of data on the driver at a time. Yes it means you persist the RDD. That's actually vital for an RDD from a wide transformation; you absolutely don't want to recompute the whole thing N times. For a narrow transform, OK, per-partition computation is in theory no more work than computing it once in one go, even without caching. Of course, this also means you don't start iterating at all until all are partitions are done. In some cases you can't do better anyway (e.g. a wide transformation where all partitions have to be computed at once anyway). But then again, even for narrow transforms, the wall-clock time to compute 1 partition is about the same for all partitions. You'd wait as long for 1 to finish as for N, assuming they're fairly equally sized tasks. toLocalIterator could also compute the partitions in parallel on the driver. But this more or less reduces to collect(), as all the results might arrive on the driver before they're consumed. It could, say, compute partitions in parallel in a way that partition N+1 is started as soon as the job for N finishes. That's not too hard even, but now we have up to 2 partitions' worth of data on the driver instead of 1. There's a tradeoff there, in complexity and extra driver memory, but it's coherent. This is even implementable now in your code if you want to try it; just call sc.runJob directly like toLocalIterator does and add the fetch-ahead logic. Do you even care about consuming the results in order, or just iterating over the partitions' results as soon as each is available? if doing it in order isn't required, this is even better than a parallel toLocalIterator. You run back into the issue that all the data might arrive on the driver at one time; if that's an issue here this probably won't fly. If it's not an issue, this probably doesn't add a lot over just collect()-ing but it's possible. I'm not against trying the 2-partition implementation of toLocalIterator, but think the use case for it is limited, given that many scenarios have better or no-worse solutions already, per above. > Speed up toLocalIterator > ------------------------ > > Key: SPARK-27025 > URL: https://issues.apache.org/jira/browse/SPARK-27025 > Project: Spark > Issue Type: Wish > Components: Spark Core > Affects Versions: 2.3.3 > Reporter: Erik van Oosten > Priority: Major > > Method {{toLocalIterator}} fetches the partitions to the driver one by one. > However, as far as I can see, any required computation for the > yet-to-be-fetched-partitions is not kicked off until it is fetched. > Effectively only one partition is being computed at the same time. > Desired behavior: immediately start calculation of all partitions while > retaining the download-a-partition at a time behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org