This should be an element in the compatibility matrix as well. On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org> wrote:
> I am pretty surprised that we do not have a @Category(ValidatesRunner) > test in GroupByKeyTest that iterates multiple times. That is a major > oversight. We should have this test, and it can be disabled by the > SparkRunner's configuration. > > Kenn > > On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com> wrote: > >> The Dataflow version does not spill to disk. However Spark's design might >> require spilling to disk if you want that to be implemented properly. >> >> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org> wrote: >> >>> Hi, >>> >>> Spark's GBK is currently implemented using `sortBy(key and >>> value).mapPartition(...)` for non-merging windowing in order to support >>> large keys and large scale shuffles. Merging windowing is implemented using >>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping), >>> which is by design unable to support large keys. >>> >>> As Jan noted, problem with mapPartition is, that its UDF receives an >>> Iterator. Only option here is to wrap this iterator to one that spills to >>> disk once an internal buffer is exceeded (the approach suggested by >>> Reuven). This unfortunately comes with a cost in some cases. The best >>> approach would be to somehow determine, that user wants multiple iterations >>> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas >>> how to approach this? >>> >>> D. >>> >>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> wrote: >>> >>>> The Beam API was written to support multiple iterations, and there are >>>> definitely transforms that do so. I believe that CoGroupByKey may do this >>>> as well with the resulting iterator. >>>> >>>> I know that the Dataflow runner is able to handles iterators larger >>>> than available memory by paging them in from shuffle, which still allows >>>> for reiterating. It sounds like Spark is less flexible here? >>>> >>>> Reuven >>>> >>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org> >>>>> >>>>> Lukasz, why do you think that users expect to be able to iterate >>>>> multiple times grouped elements? Besides that it obviously suggests the >>>>> 'Iterable'? The way that spark behaves is pretty much analogous to how >>>>> MapReduce used to work - in certain cases it calles >>>>> repartitionAndSortWithinPartitions and then does mapPartition, which >>>>> accepts Iterator - that is because internally it merge sorts pre sorted >>>>> segments. This approach enables to GroupByKey data sets that are too big >>>>> to >>>>> fit into memory (per key). >>>>> >>>>> If multiple iterations should be expected by users, we probably should: >>>>> >>>>> a) include that in @ValidatesRunner tests >>>>> >>>>> b) store values in memory on spark, which will break for certain >>>>> pipelines >>>>> >>>>> Because of (b) I think that it would be much better to remove this >>>>> "expectation" and clearly document that the Iterable is not supposed to be >>>>> iterated multiple times. >>>>> >>>>> Jan >>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote: >>>>> >>>>> I pretty much think so, because that is how Spark works. The Iterable >>>>> inside is really an Iterator, which cannot be iterated multiple times. >>>>> >>>>> Jan >>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote: >>>>> >>>>> Jan, in Beam users expect to be able to iterate the GBK output >>>>> multiple times even from within the same ParDo. >>>>> Is this something that Beam on Spark Runner never supported? >>>>> >>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz> wrote: >>>>> >>>>>> Hi Gershi, >>>>>> >>>>>> could you please outline the pipeline you are trying to execute? >>>>>> Basically, you cannot iterate the Iterable multiple times in single >>>>>> ParDo. >>>>>> It should be possible, though, to apply multiple ParDos to output from >>>>>> GroupByKey. >>>>>> >>>>>> Jan >>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> >>>>>> >>>>>> I want to iterate multiple times on the Iterable<V> (the output of >>>>>> GroupByKey transformation) >>>>>> >>>>>> When my Runner is SparkRunner, I get an exception: >>>>>> >>>>>> >>>>>> >>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be >>>>>> iterated more than once,otherwise there could be data lost >>>>>> >>>>>> at >>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221) >>>>>> >>>>>> at java.lang.Iterable.spliterator(Iterable.java:101) >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> I understood I can branch the pipeline after GroupByKey into multiple >>>>>> transformation and iterate in each of them once on the Iterable<V>. >>>>>> >>>>>> >>>>>> >>>>>> Is there a better way for that? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi* >>>>>> >>>>>> Software Developer >>>>>> >>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718> >>>>>> >>>>>> [image: Mail_signature_blue] >>>>>> >>>>>> >>>>>> >>>>>>