In many cases, the writer of the ParDo has no access to the GBK (e.g. the
GBK is hidden inside an upstream PTransform that they cannot modify). This
is the same reason why RequiresStableInput was made a property of the
ParDo, because the GroupByKey is quite often inaccessible.

The car analogy doesn't quite apply here, because the runner does have a
full view of the pipeline so can satisfy all constraints. The car
dealership generally cannot read your mind (thankfully!), so you have to
specify what you want. Or to put it another way, the various transforms in
a Beam pipeline do not live in isolation. The full pipeline graph is what
is executed, and the runner already has to analyze the full graph to run
the pipeline (as well as to optimize the pipeline).

Reuven

On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <je...@seznam.cz> wrote:

> I'd suggest Stream instead of Iterator, it has the same semantics and much
> better API.
>
> Still not sure, what is wrong on letting the GBK to decide this. I have an
> analogy - if I decide to buy a car, I have to decide *what* car I'm going
> to buy (by think about how I'm going to use it) *before* I buy it. I cannot
> just buy "a car" and then change it from minivan to sport car based on my
> actual need. Same with the GBK - if I want to be able to reiterate the
> result, then I should tell it in advance.
>
> Jan
> On 9/27/19 10:50 PM, Kenneth Knowles wrote:
>
> Good point about sibling fusion requiring this.
>
> The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does imply that
> the output iterable can be iterated arbitrarily many times.
>
> I think this should remain the default for all the reasons mentioned.
>
> We could have opt-in to the weaker KV<K, Iterator<V>> version. Agree that
> this is a property of the ParDo. A particular use of a GBK has no idea what
> is downstream. If you owned the whole pipeline, a special
> ParDo<Iterator<V>, Foo> would work. But to make the types line up, this
> would require changes upstream, which is not good.
>
> Maybe something like this:
>
> ParDo<Iterable<V>, Foo> {
>   @ProcessElement
>   void process(@OneShotIterator Iterator<V> iter) {
>     ...
>   }
> }
>
> I've described all of this in terms of Java SDK. So we would need a
> portable representation for all this metadata.
>
> Kenn
>
> On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com> wrote:
>
>> I think the behavior to make explicit is the need to reiterate, not the
>> need to handle large results. How large of a result can be handled will
>> always be dependent on the runner, and each runner will probably have a
>> different definition of large keys. Reiteration however is a logical
>> difference in the programming API. Therefore I think it makes sense to
>> specify the latter. The need to reiterate is a property of the downstream
>> ParDo, so it should be specified there - not on the GBK.
>>
>> Reuven
>>
>> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Ok, I think I understand there might be some benefits of this. Then I'd
>>> propose we make this clear on the GBK. If we would support somehing like
>>> this:
>>>
>>>  PCollection<?> input = ....;
>>>
>>>  input.apply(GroupByKey.withLargeKeys());
>>>
>>> then SparkRunner could expand this to repartitionAndSortWithinPartitions
>>> only on this PTransform, and fallback to the default (in memory) in other
>>> situations. The default expansion of LargeGroupByKey (let's say) would be
>>> classic GBK, so that only runners that need to make sure that they don't
>>> break reiterations can expand this.
>>>
>>> WDYT?
>>>
>>> Jan
>>> On 9/27/19 8:56 PM, Reuven Lax wrote:
>>>
>>> As I mentioned above, CoGroupByKey already takes advantage of this.
>>> Reiterating is not the most common use case, but it's definitely one that
>>> comes up. Also keep in mind that this API has supported reiterating for the
>>> past five years (since even before the SDK was donated to Apache).
>>> Therefore you should assume that users are relying on it, in ways we might
>>> not expect.
>>>
>>> Historically, Google's Flume system had collections that did not support
>>> reiterating (they were even called OneShotCollections to make it clear).
>>> This was the source of problems and user frustration, which was one reason
>>> that in the original Dataflow SDK we made sure that these iterables could
>>> be reiterated. Another reason why it's advantageous for a runner to support
>>> this is allowing for better fusion. If two ParDos A and B both read from
>>> the same GroupByKey, it is nice to be able to fuse them into one logical
>>> operator. For this, you'll probably need a shuffle implementation that
>>> allows two independent readers from the same shuffle session.
>>>
>>> How easy it is to implement reiterables that don't have to fit in memory
>>> will depend on the runner.  For Dataflow it's possible because the shuffle
>>> session is logically persistent, so the runner can simply reread the
>>> shuffle session. For other runners with different shuffle implementations,
>>> it might be harder to support both properties. Maybe we should introduce a
>>> new @RequiresReiteration annotation on ParDo? That way the Spark runner can
>>> see this and switch to the in-memory version just for groupings consumed by
>>> those ParDos. Runners that already support reiteration can ignore this
>>> annotation, so it should be backwards compatible.
>>>
>>> Reuven
>>>
>>> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> I'd like to know the use-case. Why would you *need* to actually iterate
>>>> the grouped elements twice? By definition the first iteration would have to
>>>> extract some statistic (or subset of elements that must fit into memory).
>>>> This statistic can then be used as another input for the second iteration.
>>>> Why not then calculate the statistic in a separate branch in the pipeline
>>>> and feed it then into the ParDo as side input? That would be definitely
>>>> more efficient, because the calculation of the statistic would be probably
>>>> combinable (not sure if it is absolutely required to be combinable, but it
>>>> seems probable). Even if the calculation would not be combinable, it is not
>>>> less efficient than reiterating twice. Why then support multiple iterations
>>>> (besides the fact that output of GBK is Iterable). Am I missing something?
>>>>
>>>> Jan
>>>> On 9/27/19 6:36 PM, Reuven Lax wrote:
>>>>
>>>> This should be an element in the compatibility matrix as well.
>>>>
>>>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> I am pretty surprised that we do not have a @Category(ValidatesRunner)
>>>>> test in GroupByKeyTest that iterates multiple times. That is a major
>>>>> oversight. We should have this test, and it can be disabled by the
>>>>> SparkRunner's configuration.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> The Dataflow version does not spill to disk. However Spark's design
>>>>>> might require spilling to disk if you want that to be implemented 
>>>>>> properly.
>>>>>>
>>>>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Spark's GBK is currently implemented using `sortBy(key and
>>>>>>> value).mapPartition(...)` for non-merging windowing in order to support
>>>>>>> large keys and large scale shuffles. Merging windowing is implemented 
>>>>>>> using
>>>>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
>>>>>>> which is by design unable to support large keys.
>>>>>>>
>>>>>>> As Jan noted, problem with mapPartition is, that its UDF receives an
>>>>>>> Iterator. Only option here is to wrap this iterator to one that spills 
>>>>>>> to
>>>>>>> disk once an internal buffer is exceeded (the approach suggested by
>>>>>>> Reuven). This unfortunately comes with a cost in some cases. The best
>>>>>>> approach would be to somehow determine, that user wants multiple 
>>>>>>> iterations
>>>>>>> and than wrap it in "re-iterator" if necessary. Does anyone have any 
>>>>>>> ideas
>>>>>>> how to approach this?
>>>>>>>
>>>>>>> D.
>>>>>>>
>>>>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> The Beam API was written to support multiple iterations, and there
>>>>>>>> are definitely transforms that do so. I believe that CoGroupByKey may 
>>>>>>>> do
>>>>>>>> this as well with the resulting iterator.
>>>>>>>>
>>>>>>>> I know that the Dataflow runner is able to handles iterators larger
>>>>>>>> than available memory by paging them in from shuffle, which still 
>>>>>>>> allows
>>>>>>>> for reiterating. It sounds like Spark is less flexible here?
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org>
>>>>>>>>>
>>>>>>>>> Lukasz, why do you think that users expect to be able to iterate
>>>>>>>>> multiple times grouped elements? Besides that it obviously suggests 
>>>>>>>>> the
>>>>>>>>> 'Iterable'? The way that spark behaves is pretty much analogous to how
>>>>>>>>> MapReduce used to work - in certain cases it calles
>>>>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, which
>>>>>>>>> accepts Iterator - that is because internally it merge sorts pre 
>>>>>>>>> sorted
>>>>>>>>> segments. This approach enables to GroupByKey data sets that are too 
>>>>>>>>> big to
>>>>>>>>> fit into memory (per key).
>>>>>>>>>
>>>>>>>>> If multiple iterations should be expected by users, we probably
>>>>>>>>> should:
>>>>>>>>>
>>>>>>>>>  a) include that in @ValidatesRunner tests
>>>>>>>>>
>>>>>>>>>  b) store values in memory on spark, which will break for certain
>>>>>>>>> pipelines
>>>>>>>>>
>>>>>>>>> Because of (b) I think that it would be much better to remove this
>>>>>>>>> "expectation" and clearly document that the Iterable is not supposed 
>>>>>>>>> to be
>>>>>>>>> iterated multiple times.
>>>>>>>>>
>>>>>>>>> Jan
>>>>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>>>>>>
>>>>>>>>> I pretty much think so, because that is how Spark works. The
>>>>>>>>> Iterable inside is really an Iterator, which cannot be iterated 
>>>>>>>>> multiple
>>>>>>>>> times.
>>>>>>>>>
>>>>>>>>> Jan
>>>>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>>>>>>
>>>>>>>>> Jan, in Beam users expect to be able to iterate the GBK output
>>>>>>>>> multiple times even from within the same ParDo.
>>>>>>>>> Is this something that Beam on Spark Runner never supported?
>>>>>>>>>
>>>>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Gershi,
>>>>>>>>>>
>>>>>>>>>> could you please outline the pipeline you are trying to execute?
>>>>>>>>>> Basically, you cannot iterate the Iterable multiple times in single 
>>>>>>>>>> ParDo.
>>>>>>>>>> It should be possible, though, to apply multiple ParDos to output 
>>>>>>>>>> from
>>>>>>>>>> GroupByKey.
>>>>>>>>>>
>>>>>>>>>> Jan
>>>>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I want to iterate multiple times on the Iterable<V> (the output
>>>>>>>>>> of GroupByKey transformation)
>>>>>>>>>>
>>>>>>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't
>>>>>>>>>> be iterated more than once,otherwise there could be data lost
>>>>>>>>>>
>>>>>>>>>>                 at
>>>>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>>>>>>
>>>>>>>>>>                 at
>>>>>>>>>> java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I understood I can branch the pipeline after GroupByKey into
>>>>>>>>>> multiple transformation and iterate in each of them once on the 
>>>>>>>>>> Iterable<V>.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Is there a better way for that?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>>>>>>>
>>>>>>>>>> Software Developer
>>>>>>>>>>
>>>>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>>>>>>
>>>>>>>>>> [image: Mail_signature_blue]
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>

Reply via email to