CoGroupByKey is one example. To perform a CoGroupByKey based join requires multiple iterations (caching is key to getting performance). You could make up other calculations that require it, most of which would look like a self-join, like "output the largest difference between any two elements for each key". In both these examples, multiple iterations avoids materializing a cartesian product but allows more like a nested loop join.
On the philosophical side, an iterable behaves like a "value" (it has well-defined contents, a size, etc) whereas an iterator is more of a mutating process than a value. So you cannot easily reason about a transform "for input X has output Y" when the output is a construct that cannot be interpreted as a value at all. Kenn On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <[email protected]> wrote: > I'd like to know the use-case. Why would you *need* to actually iterate > the grouped elements twice? By definition the first iteration would have to > extract some statistic (or subset of elements that must fit into memory). > This statistic can then be used as another input for the second iteration. > Why not then calculate the statistic in a separate branch in the pipeline > and feed it then into the ParDo as side input? That would be definitely > more efficient, because the calculation of the statistic would be probably > combinable (not sure if it is absolutely required to be combinable, but it > seems probable). Even if the calculation would not be combinable, it is not > less efficient than reiterating twice. Why then support multiple iterations > (besides the fact that output of GBK is Iterable). Am I missing something? > > Jan > On 9/27/19 6:36 PM, Reuven Lax wrote: > > This should be an element in the compatibility matrix as well. > > On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <[email protected]> wrote: > >> I am pretty surprised that we do not have a @Category(ValidatesRunner) >> test in GroupByKeyTest that iterates multiple times. That is a major >> oversight. We should have this test, and it can be disabled by the >> SparkRunner's configuration. >> >> Kenn >> >> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <[email protected]> wrote: >> >>> The Dataflow version does not spill to disk. However Spark's design >>> might require spilling to disk if you want that to be implemented properly. >>> >>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <[email protected]> wrote: >>> >>>> Hi, >>>> >>>> Spark's GBK is currently implemented using `sortBy(key and >>>> value).mapPartition(...)` for non-merging windowing in order to support >>>> large keys and large scale shuffles. Merging windowing is implemented using >>>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping), >>>> which is by design unable to support large keys. >>>> >>>> As Jan noted, problem with mapPartition is, that its UDF receives an >>>> Iterator. Only option here is to wrap this iterator to one that spills to >>>> disk once an internal buffer is exceeded (the approach suggested by >>>> Reuven). This unfortunately comes with a cost in some cases. The best >>>> approach would be to somehow determine, that user wants multiple iterations >>>> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas >>>> how to approach this? >>>> >>>> D. >>>> >>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> The Beam API was written to support multiple iterations, and there are >>>>> definitely transforms that do so. I believe that CoGroupByKey may do this >>>>> as well with the resulting iterator. >>>>> >>>>> I know that the Dataflow runner is able to handles iterators larger >>>>> than available memory by paging them in from shuffle, which still allows >>>>> for reiterating. It sounds like Spark is less flexible here? >>>>> >>>>> Reuven >>>>> >>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <[email protected]> wrote: >>>>> >>>>>> +dev <[email protected]> <[email protected]> >>>>>> >>>>>> Lukasz, why do you think that users expect to be able to iterate >>>>>> multiple times grouped elements? Besides that it obviously suggests the >>>>>> 'Iterable'? The way that spark behaves is pretty much analogous to how >>>>>> MapReduce used to work - in certain cases it calles >>>>>> repartitionAndSortWithinPartitions and then does mapPartition, which >>>>>> accepts Iterator - that is because internally it merge sorts pre sorted >>>>>> segments. This approach enables to GroupByKey data sets that are too big >>>>>> to >>>>>> fit into memory (per key). >>>>>> >>>>>> If multiple iterations should be expected by users, we probably >>>>>> should: >>>>>> >>>>>> a) include that in @ValidatesRunner tests >>>>>> >>>>>> b) store values in memory on spark, which will break for certain >>>>>> pipelines >>>>>> >>>>>> Because of (b) I think that it would be much better to remove this >>>>>> "expectation" and clearly document that the Iterable is not supposed to >>>>>> be >>>>>> iterated multiple times. >>>>>> >>>>>> Jan >>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote: >>>>>> >>>>>> I pretty much think so, because that is how Spark works. The Iterable >>>>>> inside is really an Iterator, which cannot be iterated multiple times. >>>>>> >>>>>> Jan >>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote: >>>>>> >>>>>> Jan, in Beam users expect to be able to iterate the GBK output >>>>>> multiple times even from within the same ParDo. >>>>>> Is this something that Beam on Spark Runner never supported? >>>>>> >>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <[email protected]> wrote: >>>>>> >>>>>>> Hi Gershi, >>>>>>> >>>>>>> could you please outline the pipeline you are trying to execute? >>>>>>> Basically, you cannot iterate the Iterable multiple times in single >>>>>>> ParDo. >>>>>>> It should be possible, though, to apply multiple ParDos to output from >>>>>>> GroupByKey. >>>>>>> >>>>>>> Jan >>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> >>>>>>> >>>>>>> I want to iterate multiple times on the Iterable<V> (the output of >>>>>>> GroupByKey transformation) >>>>>>> >>>>>>> When my Runner is SparkRunner, I get an exception: >>>>>>> >>>>>>> >>>>>>> >>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be >>>>>>> iterated more than once,otherwise there could be data lost >>>>>>> >>>>>>> at >>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221) >>>>>>> >>>>>>> at java.lang.Iterable.spliterator(Iterable.java:101) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> I understood I can branch the pipeline after GroupByKey into >>>>>>> multiple transformation and iterate in each of them once on the >>>>>>> Iterable<V>. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Is there a better way for that? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi* >>>>>>> >>>>>>> Software Developer >>>>>>> >>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718> >>>>>>> >>>>>>> [image: Mail_signature_blue] >>>>>>> >>>>>>> >>>>>>> >>>>>>>
