As I mentioned above, CoGroupByKey already takes advantage of this.
Reiterating is not the most common use case, but it's definitely one that
comes up. Also keep in mind that this API has supported reiterating for the
past five years (since even before the SDK was donated to Apache).
Therefore you should assume that users are relying on it, in ways we might
not expect.

Historically, Google's Flume system had collections that did not support
reiterating (they were even called OneShotCollections to make it clear).
This was the source of problems and user frustration, which was one reason
that in the original Dataflow SDK we made sure that these iterables could
be reiterated. Another reason why it's advantageous for a runner to support
this is allowing for better fusion. If two ParDos A and B both read from
the same GroupByKey, it is nice to be able to fuse them into one logical
operator. For this, you'll probably need a shuffle implementation that
allows two independent readers from the same shuffle session.

How easy it is to implement reiterables that don't have to fit in memory
will depend on the runner.  For Dataflow it's possible because the shuffle
session is logically persistent, so the runner can simply reread the
shuffle session. For other runners with different shuffle implementations,
it might be harder to support both properties. Maybe we should introduce a
new @RequiresReiteration annotation on ParDo? That way the Spark runner can
see this and switch to the in-memory version just for groupings consumed by
those ParDos. Runners that already support reiteration can ignore this
annotation, so it should be backwards compatible.

Reuven

On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz> wrote:

> I'd like to know the use-case. Why would you *need* to actually iterate
> the grouped elements twice? By definition the first iteration would have to
> extract some statistic (or subset of elements that must fit into memory).
> This statistic can then be used as another input for the second iteration.
> Why not then calculate the statistic in a separate branch in the pipeline
> and feed it then into the ParDo as side input? That would be definitely
> more efficient, because the calculation of the statistic would be probably
> combinable (not sure if it is absolutely required to be combinable, but it
> seems probable). Even if the calculation would not be combinable, it is not
> less efficient than reiterating twice. Why then support multiple iterations
> (besides the fact that output of GBK is Iterable). Am I missing something?
>
> Jan
> On 9/27/19 6:36 PM, Reuven Lax wrote:
>
> This should be an element in the compatibility matrix as well.
>
> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> I am pretty surprised that we do not have a @Category(ValidatesRunner)
>> test in GroupByKeyTest that iterates multiple times. That is a major
>> oversight. We should have this test, and it can be disabled by the
>> SparkRunner's configuration.
>>
>> Kenn
>>
>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com> wrote:
>>
>>> The Dataflow version does not spill to disk. However Spark's design
>>> might require spilling to disk if you want that to be implemented properly.
>>>
>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> Spark's GBK is currently implemented using `sortBy(key and
>>>> value).mapPartition(...)` for non-merging windowing in order to support
>>>> large keys and large scale shuffles. Merging windowing is implemented using
>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
>>>> which is by design unable to support large keys.
>>>>
>>>> As Jan noted, problem with mapPartition is, that its UDF receives an
>>>> Iterator. Only option here is to wrap this iterator to one that spills to
>>>> disk once an internal buffer is exceeded (the approach suggested by
>>>> Reuven). This unfortunately comes with a cost in some cases. The best
>>>> approach would be to somehow determine, that user wants multiple iterations
>>>> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
>>>> how to approach this?
>>>>
>>>> D.
>>>>
>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> The Beam API was written to support multiple iterations, and there are
>>>>> definitely transforms that do so. I believe that CoGroupByKey may do this
>>>>> as well with the resulting iterator.
>>>>>
>>>>> I know that the Dataflow runner is able to handles iterators larger
>>>>> than available memory by paging them in from shuffle, which still allows
>>>>> for reiterating. It sounds like Spark is less flexible here?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org>
>>>>>>
>>>>>> Lukasz, why do you think that users expect to be able to iterate
>>>>>> multiple times grouped elements? Besides that it obviously suggests the
>>>>>> 'Iterable'? The way that spark behaves is pretty much analogous to how
>>>>>> MapReduce used to work - in certain cases it calles
>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, which
>>>>>> accepts Iterator - that is because internally it merge sorts pre sorted
>>>>>> segments. This approach enables to GroupByKey data sets that are too big 
>>>>>> to
>>>>>> fit into memory (per key).
>>>>>>
>>>>>> If multiple iterations should be expected by users, we probably
>>>>>> should:
>>>>>>
>>>>>>  a) include that in @ValidatesRunner tests
>>>>>>
>>>>>>  b) store values in memory on spark, which will break for certain
>>>>>> pipelines
>>>>>>
>>>>>> Because of (b) I think that it would be much better to remove this
>>>>>> "expectation" and clearly document that the Iterable is not supposed to 
>>>>>> be
>>>>>> iterated multiple times.
>>>>>>
>>>>>> Jan
>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>>>
>>>>>> I pretty much think so, because that is how Spark works. The Iterable
>>>>>> inside is really an Iterator, which cannot be iterated multiple times.
>>>>>>
>>>>>> Jan
>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>>>
>>>>>> Jan, in Beam users expect to be able to iterate the GBK output
>>>>>> multiple times even from within the same ParDo.
>>>>>> Is this something that Beam on Spark Runner never supported?
>>>>>>
>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>
>>>>>>> Hi Gershi,
>>>>>>>
>>>>>>> could you please outline the pipeline you are trying to execute?
>>>>>>> Basically, you cannot iterate the Iterable multiple times in single 
>>>>>>> ParDo.
>>>>>>> It should be possible, though, to apply multiple ParDos to output from
>>>>>>> GroupByKey.
>>>>>>>
>>>>>>> Jan
>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I want to iterate multiple times on the Iterable<V> (the output of
>>>>>>> GroupByKey transformation)
>>>>>>>
>>>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>>>>>>> iterated more than once,otherwise there could be data lost
>>>>>>>
>>>>>>>                 at
>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>>>
>>>>>>>                 at java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I understood I can branch the pipeline after GroupByKey into
>>>>>>> multiple transformation and iterate in each of them once on the 
>>>>>>> Iterable<V>.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Is there a better way for that?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>>>>
>>>>>>> Software Developer
>>>>>>>
>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>>>
>>>>>>> [image: Mail_signature_blue]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to