I am pretty surprised that we do not have a @Category(ValidatesRunner) test in GroupByKeyTest that iterates multiple times. That is a major oversight. We should have this test, and it can be disabled by the SparkRunner's configuration.
Kenn On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com> wrote: > The Dataflow version does not spill to disk. However Spark's design might > require spilling to disk if you want that to be implemented properly. > > On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org> wrote: > >> Hi, >> >> Spark's GBK is currently implemented using `sortBy(key and >> value).mapPartition(...)` for non-merging windowing in order to support >> large keys and large scale shuffles. Merging windowing is implemented using >> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping), >> which is by design unable to support large keys. >> >> As Jan noted, problem with mapPartition is, that its UDF receives an >> Iterator. Only option here is to wrap this iterator to one that spills to >> disk once an internal buffer is exceeded (the approach suggested by >> Reuven). This unfortunately comes with a cost in some cases. The best >> approach would be to somehow determine, that user wants multiple iterations >> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas >> how to approach this? >> >> D. >> >> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> wrote: >> >>> The Beam API was written to support multiple iterations, and there are >>> definitely transforms that do so. I believe that CoGroupByKey may do this >>> as well with the resulting iterator. >>> >>> I know that the Dataflow runner is able to handles iterators larger than >>> available memory by paging them in from shuffle, which still allows for >>> reiterating. It sounds like Spark is less flexible here? >>> >>> Reuven >>> >>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> +dev <dev@beam.apache.org> <dev@beam.apache.org> >>>> >>>> Lukasz, why do you think that users expect to be able to iterate >>>> multiple times grouped elements? Besides that it obviously suggests the >>>> 'Iterable'? The way that spark behaves is pretty much analogous to how >>>> MapReduce used to work - in certain cases it calles >>>> repartitionAndSortWithinPartitions and then does mapPartition, which >>>> accepts Iterator - that is because internally it merge sorts pre sorted >>>> segments. This approach enables to GroupByKey data sets that are too big to >>>> fit into memory (per key). >>>> >>>> If multiple iterations should be expected by users, we probably should: >>>> >>>> a) include that in @ValidatesRunner tests >>>> >>>> b) store values in memory on spark, which will break for certain >>>> pipelines >>>> >>>> Because of (b) I think that it would be much better to remove this >>>> "expectation" and clearly document that the Iterable is not supposed to be >>>> iterated multiple times. >>>> >>>> Jan >>>> On 9/27/19 9:27 AM, Jan Lukavský wrote: >>>> >>>> I pretty much think so, because that is how Spark works. The Iterable >>>> inside is really an Iterator, which cannot be iterated multiple times. >>>> >>>> Jan >>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote: >>>> >>>> Jan, in Beam users expect to be able to iterate the GBK output multiple >>>> times even from within the same ParDo. >>>> Is this something that Beam on Spark Runner never supported? >>>> >>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> Hi Gershi, >>>>> >>>>> could you please outline the pipeline you are trying to execute? >>>>> Basically, you cannot iterate the Iterable multiple times in single ParDo. >>>>> It should be possible, though, to apply multiple ParDos to output from >>>>> GroupByKey. >>>>> >>>>> Jan >>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote: >>>>> >>>>> Hi, >>>>> >>>>> >>>>> >>>>> I want to iterate multiple times on the Iterable<V> (the output of >>>>> GroupByKey transformation) >>>>> >>>>> When my Runner is SparkRunner, I get an exception: >>>>> >>>>> >>>>> >>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be >>>>> iterated more than once,otherwise there could be data lost >>>>> >>>>> at >>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221) >>>>> >>>>> at java.lang.Iterable.spliterator(Iterable.java:101) >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> I understood I can branch the pipeline after GroupByKey into multiple >>>>> transformation and iterate in each of them once on the Iterable<V>. >>>>> >>>>> >>>>> >>>>> Is there a better way for that? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi* >>>>> >>>>> Software Developer >>>>> >>>>> *T*: +972 (3) 7405718 <+972%203-740-5718> >>>>> >>>>> [image: Mail_signature_blue] >>>>> >>>>> >>>>> >>>>>