I think the behavior to make explicit is the need to reiterate, not the
need to handle large results. How large of a result can be handled will
always be dependent on the runner, and each runner will probably have a
different definition of large keys. Reiteration however is a logical
difference in the programming API. Therefore I think it makes sense to
specify the latter. The need to reiterate is a property of the downstream
ParDo, so it should be specified there - not on the GBK.

Reuven

On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <je...@seznam.cz> wrote:

> Ok, I think I understand there might be some benefits of this. Then I'd
> propose we make this clear on the GBK. If we would support somehing like
> this:
>
>  PCollection<?> input = ....;
>
>  input.apply(GroupByKey.withLargeKeys());
>
> then SparkRunner could expand this to repartitionAndSortWithinPartitions
> only on this PTransform, and fallback to the default (in memory) in other
> situations. The default expansion of LargeGroupByKey (let's say) would be
> classic GBK, so that only runners that need to make sure that they don't
> break reiterations can expand this.
>
> WDYT?
>
> Jan
> On 9/27/19 8:56 PM, Reuven Lax wrote:
>
> As I mentioned above, CoGroupByKey already takes advantage of this.
> Reiterating is not the most common use case, but it's definitely one that
> comes up. Also keep in mind that this API has supported reiterating for the
> past five years (since even before the SDK was donated to Apache).
> Therefore you should assume that users are relying on it, in ways we might
> not expect.
>
> Historically, Google's Flume system had collections that did not support
> reiterating (they were even called OneShotCollections to make it clear).
> This was the source of problems and user frustration, which was one reason
> that in the original Dataflow SDK we made sure that these iterables could
> be reiterated. Another reason why it's advantageous for a runner to support
> this is allowing for better fusion. If two ParDos A and B both read from
> the same GroupByKey, it is nice to be able to fuse them into one logical
> operator. For this, you'll probably need a shuffle implementation that
> allows two independent readers from the same shuffle session.
>
> How easy it is to implement reiterables that don't have to fit in memory
> will depend on the runner.  For Dataflow it's possible because the shuffle
> session is logically persistent, so the runner can simply reread the
> shuffle session. For other runners with different shuffle implementations,
> it might be harder to support both properties. Maybe we should introduce a
> new @RequiresReiteration annotation on ParDo? That way the Spark runner can
> see this and switch to the in-memory version just for groupings consumed by
> those ParDos. Runners that already support reiteration can ignore this
> annotation, so it should be backwards compatible.
>
> Reuven
>
> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> I'd like to know the use-case. Why would you *need* to actually iterate
>> the grouped elements twice? By definition the first iteration would have to
>> extract some statistic (or subset of elements that must fit into memory).
>> This statistic can then be used as another input for the second iteration.
>> Why not then calculate the statistic in a separate branch in the pipeline
>> and feed it then into the ParDo as side input? That would be definitely
>> more efficient, because the calculation of the statistic would be probably
>> combinable (not sure if it is absolutely required to be combinable, but it
>> seems probable). Even if the calculation would not be combinable, it is not
>> less efficient than reiterating twice. Why then support multiple iterations
>> (besides the fact that output of GBK is Iterable). Am I missing something?
>>
>> Jan
>> On 9/27/19 6:36 PM, Reuven Lax wrote:
>>
>> This should be an element in the compatibility matrix as well.
>>
>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> I am pretty surprised that we do not have a @Category(ValidatesRunner)
>>> test in GroupByKeyTest that iterates multiple times. That is a major
>>> oversight. We should have this test, and it can be disabled by the
>>> SparkRunner's configuration.
>>>
>>> Kenn
>>>
>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> The Dataflow version does not spill to disk. However Spark's design
>>>> might require spilling to disk if you want that to be implemented properly.
>>>>
>>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Spark's GBK is currently implemented using `sortBy(key and
>>>>> value).mapPartition(...)` for non-merging windowing in order to support
>>>>> large keys and large scale shuffles. Merging windowing is implemented 
>>>>> using
>>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
>>>>> which is by design unable to support large keys.
>>>>>
>>>>> As Jan noted, problem with mapPartition is, that its UDF receives an
>>>>> Iterator. Only option here is to wrap this iterator to one that spills to
>>>>> disk once an internal buffer is exceeded (the approach suggested by
>>>>> Reuven). This unfortunately comes with a cost in some cases. The best
>>>>> approach would be to somehow determine, that user wants multiple 
>>>>> iterations
>>>>> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
>>>>> how to approach this?
>>>>>
>>>>> D.
>>>>>
>>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> The Beam API was written to support multiple iterations, and there
>>>>>> are definitely transforms that do so. I believe that CoGroupByKey may do
>>>>>> this as well with the resulting iterator.
>>>>>>
>>>>>> I know that the Dataflow runner is able to handles iterators larger
>>>>>> than available memory by paging them in from shuffle, which still allows
>>>>>> for reiterating. It sounds like Spark is less flexible here?
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>
>>>>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org>
>>>>>>>
>>>>>>> Lukasz, why do you think that users expect to be able to iterate
>>>>>>> multiple times grouped elements? Besides that it obviously suggests the
>>>>>>> 'Iterable'? The way that spark behaves is pretty much analogous to how
>>>>>>> MapReduce used to work - in certain cases it calles
>>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, which
>>>>>>> accepts Iterator - that is because internally it merge sorts pre sorted
>>>>>>> segments. This approach enables to GroupByKey data sets that are too 
>>>>>>> big to
>>>>>>> fit into memory (per key).
>>>>>>>
>>>>>>> If multiple iterations should be expected by users, we probably
>>>>>>> should:
>>>>>>>
>>>>>>>  a) include that in @ValidatesRunner tests
>>>>>>>
>>>>>>>  b) store values in memory on spark, which will break for certain
>>>>>>> pipelines
>>>>>>>
>>>>>>> Because of (b) I think that it would be much better to remove this
>>>>>>> "expectation" and clearly document that the Iterable is not supposed to 
>>>>>>> be
>>>>>>> iterated multiple times.
>>>>>>>
>>>>>>> Jan
>>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>>>>
>>>>>>> I pretty much think so, because that is how Spark works. The
>>>>>>> Iterable inside is really an Iterator, which cannot be iterated multiple
>>>>>>> times.
>>>>>>>
>>>>>>> Jan
>>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>>>>
>>>>>>> Jan, in Beam users expect to be able to iterate the GBK output
>>>>>>> multiple times even from within the same ParDo.
>>>>>>> Is this something that Beam on Spark Runner never supported?
>>>>>>>
>>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Gershi,
>>>>>>>>
>>>>>>>> could you please outline the pipeline you are trying to execute?
>>>>>>>> Basically, you cannot iterate the Iterable multiple times in single 
>>>>>>>> ParDo.
>>>>>>>> It should be possible, though, to apply multiple ParDos to output from
>>>>>>>> GroupByKey.
>>>>>>>>
>>>>>>>> Jan
>>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I want to iterate multiple times on the Iterable<V> (the output of
>>>>>>>> GroupByKey transformation)
>>>>>>>>
>>>>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>>>>>>>> iterated more than once,otherwise there could be data lost
>>>>>>>>
>>>>>>>>                 at
>>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>>>>
>>>>>>>>                 at java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I understood I can branch the pipeline after GroupByKey into
>>>>>>>> multiple transformation and iterate in each of them once on the 
>>>>>>>> Iterable<V>.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Is there a better way for that?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>>>>>
>>>>>>>> Software Developer
>>>>>>>>
>>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>>>>
>>>>>>>> [image: Mail_signature_blue]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Reply via email to