Hi, Spark's GBK is currently implemented using `sortBy(key and value).mapPartition(...)` for non-merging windowing in order to support large keys and large scale shuffles. Merging windowing is implemented using standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping), which is by design unable to support large keys.
As Jan noted, problem with mapPartition is, that its UDF receives an Iterator. Only option here is to wrap this iterator to one that spills to disk once an internal buffer is exceeded (the approach suggested by Reuven). This unfortunately comes with a cost in some cases. The best approach would be to somehow determine, that user wants multiple iterations and than wrap it in "re-iterator" if necessary. Does anyone have any ideas how to approach this? D. On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> wrote: > The Beam API was written to support multiple iterations, and there are > definitely transforms that do so. I believe that CoGroupByKey may do this > as well with the resulting iterator. > > I know that the Dataflow runner is able to handles iterators larger than > available memory by paging them in from shuffle, which still allows for > reiterating. It sounds like Spark is less flexible here? > > Reuven > > On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz> wrote: > >> +dev <dev@beam.apache.org> <dev@beam.apache.org> >> >> Lukasz, why do you think that users expect to be able to iterate multiple >> times grouped elements? Besides that it obviously suggests the 'Iterable'? >> The way that spark behaves is pretty much analogous to how MapReduce used >> to work - in certain cases it calles repartitionAndSortWithinPartitions and >> then does mapPartition, which accepts Iterator - that is because internally >> it merge sorts pre sorted segments. This approach enables to GroupByKey >> data sets that are too big to fit into memory (per key). >> >> If multiple iterations should be expected by users, we probably should: >> >> a) include that in @ValidatesRunner tests >> >> b) store values in memory on spark, which will break for certain >> pipelines >> >> Because of (b) I think that it would be much better to remove this >> "expectation" and clearly document that the Iterable is not supposed to be >> iterated multiple times. >> >> Jan >> On 9/27/19 9:27 AM, Jan Lukavský wrote: >> >> I pretty much think so, because that is how Spark works. The Iterable >> inside is really an Iterator, which cannot be iterated multiple times. >> >> Jan >> On 9/27/19 2:00 AM, Lukasz Cwik wrote: >> >> Jan, in Beam users expect to be able to iterate the GBK output multiple >> times even from within the same ParDo. >> Is this something that Beam on Spark Runner never supported? >> >> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi Gershi, >>> >>> could you please outline the pipeline you are trying to execute? >>> Basically, you cannot iterate the Iterable multiple times in single ParDo. >>> It should be possible, though, to apply multiple ParDos to output from >>> GroupByKey. >>> >>> Jan >>> On 9/26/19 3:32 PM, Gershi, Noam wrote: >>> >>> Hi, >>> >>> >>> >>> I want to iterate multiple times on the Iterable<V> (the output of >>> GroupByKey transformation) >>> >>> When my Runner is SparkRunner, I get an exception: >>> >>> >>> >>> Caused by: java.lang.IllegalStateException: ValueIterator can't be >>> iterated more than once,otherwise there could be data lost >>> >>> at >>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221) >>> >>> at java.lang.Iterable.spliterator(Iterable.java:101) >>> >>> >>> >>> >>> >>> I understood I can branch the pipeline after GroupByKey into multiple >>> transformation and iterate in each of them once on the Iterable<V>. >>> >>> >>> >>> Is there a better way for that? >>> >>> >>> >>> >>> >>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi* >>> >>> Software Developer >>> >>> *T*: +972 (3) 7405718 <+972%203-740-5718> >>> >>> [image: Mail_signature_blue] >>> >>> >>> >>>