The Dataflow version does not spill to disk. However Spark's design might require spilling to disk if you want that to be implemented properly.
On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org> wrote: > Hi, > > Spark's GBK is currently implemented using `sortBy(key and > value).mapPartition(...)` for non-merging windowing in order to support > large keys and large scale shuffles. Merging windowing is implemented using > standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping), > which is by design unable to support large keys. > > As Jan noted, problem with mapPartition is, that its UDF receives an > Iterator. Only option here is to wrap this iterator to one that spills to > disk once an internal buffer is exceeded (the approach suggested by > Reuven). This unfortunately comes with a cost in some cases. The best > approach would be to somehow determine, that user wants multiple iterations > and than wrap it in "re-iterator" if necessary. Does anyone have any ideas > how to approach this? > > D. > > On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> wrote: > >> The Beam API was written to support multiple iterations, and there are >> definitely transforms that do so. I believe that CoGroupByKey may do this >> as well with the resulting iterator. >> >> I know that the Dataflow runner is able to handles iterators larger than >> available memory by paging them in from shuffle, which still allows for >> reiterating. It sounds like Spark is less flexible here? >> >> Reuven >> >> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> +dev <dev@beam.apache.org> <dev@beam.apache.org> >>> >>> Lukasz, why do you think that users expect to be able to iterate >>> multiple times grouped elements? Besides that it obviously suggests the >>> 'Iterable'? The way that spark behaves is pretty much analogous to how >>> MapReduce used to work - in certain cases it calles >>> repartitionAndSortWithinPartitions and then does mapPartition, which >>> accepts Iterator - that is because internally it merge sorts pre sorted >>> segments. This approach enables to GroupByKey data sets that are too big to >>> fit into memory (per key). >>> >>> If multiple iterations should be expected by users, we probably should: >>> >>> a) include that in @ValidatesRunner tests >>> >>> b) store values in memory on spark, which will break for certain >>> pipelines >>> >>> Because of (b) I think that it would be much better to remove this >>> "expectation" and clearly document that the Iterable is not supposed to be >>> iterated multiple times. >>> >>> Jan >>> On 9/27/19 9:27 AM, Jan Lukavský wrote: >>> >>> I pretty much think so, because that is how Spark works. The Iterable >>> inside is really an Iterator, which cannot be iterated multiple times. >>> >>> Jan >>> On 9/27/19 2:00 AM, Lukasz Cwik wrote: >>> >>> Jan, in Beam users expect to be able to iterate the GBK output multiple >>> times even from within the same ParDo. >>> Is this something that Beam on Spark Runner never supported? >>> >>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi Gershi, >>>> >>>> could you please outline the pipeline you are trying to execute? >>>> Basically, you cannot iterate the Iterable multiple times in single ParDo. >>>> It should be possible, though, to apply multiple ParDos to output from >>>> GroupByKey. >>>> >>>> Jan >>>> On 9/26/19 3:32 PM, Gershi, Noam wrote: >>>> >>>> Hi, >>>> >>>> >>>> >>>> I want to iterate multiple times on the Iterable<V> (the output of >>>> GroupByKey transformation) >>>> >>>> When my Runner is SparkRunner, I get an exception: >>>> >>>> >>>> >>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be >>>> iterated more than once,otherwise there could be data lost >>>> >>>> at >>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221) >>>> >>>> at java.lang.Iterable.spliterator(Iterable.java:101) >>>> >>>> >>>> >>>> >>>> >>>> I understood I can branch the pipeline after GroupByKey into multiple >>>> transformation and iterate in each of them once on the Iterable<V>. >>>> >>>> >>>> >>>> Is there a better way for that? >>>> >>>> >>>> >>>> >>>> >>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi* >>>> >>>> Software Developer >>>> >>>> *T*: +972 (3) 7405718 <+972%203-740-5718> >>>> >>>> [image: Mail_signature_blue] >>>> >>>> >>>> >>>>