CoGroupByKey is one example. To perform a CoGroupByKey based join requires
multiple iterations (caching is key to getting performance). You could make
up other calculations that require it, most of which would look like a
self-join, like "output the largest difference between any two elements for
each key". In both these examples, multiple iterations avoids materializing
a cartesian product but allows more like a nested loop join.

On the philosophical side, an iterable behaves like a "value" (it has
well-defined contents, a size, etc) whereas an iterator is more of a
mutating process than a value. So you cannot easily reason about a
transform "for input X has output Y" when the output is a construct that
cannot be interpreted as a value at all.

Kenn

On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <[email protected]> wrote:

> I'd like to know the use-case. Why would you *need* to actually iterate
> the grouped elements twice? By definition the first iteration would have to
> extract some statistic (or subset of elements that must fit into memory).
> This statistic can then be used as another input for the second iteration.
> Why not then calculate the statistic in a separate branch in the pipeline
> and feed it then into the ParDo as side input? That would be definitely
> more efficient, because the calculation of the statistic would be probably
> combinable (not sure if it is absolutely required to be combinable, but it
> seems probable). Even if the calculation would not be combinable, it is not
> less efficient than reiterating twice. Why then support multiple iterations
> (besides the fact that output of GBK is Iterable). Am I missing something?
>
> Jan
> On 9/27/19 6:36 PM, Reuven Lax wrote:
>
> This should be an element in the compatibility matrix as well.
>
> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <[email protected]> wrote:
>
>> I am pretty surprised that we do not have a @Category(ValidatesRunner)
>> test in GroupByKeyTest that iterates multiple times. That is a major
>> oversight. We should have this test, and it can be disabled by the
>> SparkRunner's configuration.
>>
>> Kenn
>>
>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <[email protected]> wrote:
>>
>>> The Dataflow version does not spill to disk. However Spark's design
>>> might require spilling to disk if you want that to be implemented properly.
>>>
>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <[email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> Spark's GBK is currently implemented using `sortBy(key and
>>>> value).mapPartition(...)` for non-merging windowing in order to support
>>>> large keys and large scale shuffles. Merging windowing is implemented using
>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
>>>> which is by design unable to support large keys.
>>>>
>>>> As Jan noted, problem with mapPartition is, that its UDF receives an
>>>> Iterator. Only option here is to wrap this iterator to one that spills to
>>>> disk once an internal buffer is exceeded (the approach suggested by
>>>> Reuven). This unfortunately comes with a cost in some cases. The best
>>>> approach would be to somehow determine, that user wants multiple iterations
>>>> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
>>>> how to approach this?
>>>>
>>>> D.
>>>>
>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> The Beam API was written to support multiple iterations, and there are
>>>>> definitely transforms that do so. I believe that CoGroupByKey may do this
>>>>> as well with the resulting iterator.
>>>>>
>>>>> I know that the Dataflow runner is able to handles iterators larger
>>>>> than available memory by paging them in from shuffle, which still allows
>>>>> for reiterating. It sounds like Spark is less flexible here?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <[email protected]> wrote:
>>>>>
>>>>>> +dev <[email protected]> <[email protected]>
>>>>>>
>>>>>> Lukasz, why do you think that users expect to be able to iterate
>>>>>> multiple times grouped elements? Besides that it obviously suggests the
>>>>>> 'Iterable'? The way that spark behaves is pretty much analogous to how
>>>>>> MapReduce used to work - in certain cases it calles
>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, which
>>>>>> accepts Iterator - that is because internally it merge sorts pre sorted
>>>>>> segments. This approach enables to GroupByKey data sets that are too big 
>>>>>> to
>>>>>> fit into memory (per key).
>>>>>>
>>>>>> If multiple iterations should be expected by users, we probably
>>>>>> should:
>>>>>>
>>>>>>  a) include that in @ValidatesRunner tests
>>>>>>
>>>>>>  b) store values in memory on spark, which will break for certain
>>>>>> pipelines
>>>>>>
>>>>>> Because of (b) I think that it would be much better to remove this
>>>>>> "expectation" and clearly document that the Iterable is not supposed to 
>>>>>> be
>>>>>> iterated multiple times.
>>>>>>
>>>>>> Jan
>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>>>
>>>>>> I pretty much think so, because that is how Spark works. The Iterable
>>>>>> inside is really an Iterator, which cannot be iterated multiple times.
>>>>>>
>>>>>> Jan
>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>>>
>>>>>> Jan, in Beam users expect to be able to iterate the GBK output
>>>>>> multiple times even from within the same ParDo.
>>>>>> Is this something that Beam on Spark Runner never supported?
>>>>>>
>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <[email protected]> wrote:
>>>>>>
>>>>>>> Hi Gershi,
>>>>>>>
>>>>>>> could you please outline the pipeline you are trying to execute?
>>>>>>> Basically, you cannot iterate the Iterable multiple times in single 
>>>>>>> ParDo.
>>>>>>> It should be possible, though, to apply multiple ParDos to output from
>>>>>>> GroupByKey.
>>>>>>>
>>>>>>> Jan
>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I want to iterate multiple times on the Iterable<V> (the output of
>>>>>>> GroupByKey transformation)
>>>>>>>
>>>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>>>>>>> iterated more than once,otherwise there could be data lost
>>>>>>>
>>>>>>>                 at
>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>>>
>>>>>>>                 at java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I understood I can branch the pipeline after GroupByKey into
>>>>>>> multiple transformation and iterate in each of them once on the 
>>>>>>> Iterable<V>.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Is there a better way for that?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>>>>
>>>>>>> Software Developer
>>>>>>>
>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>>>
>>>>>>> [image: Mail_signature_blue]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to