I think the behavior to make explicit is the need to reiterate, not the need to handle large results. How large of a result can be handled will always be dependent on the runner, and each runner will probably have a different definition of large keys. Reiteration however is a logical difference in the programming API. Therefore I think it makes sense to specify the latter. The need to reiterate is a property of the downstream ParDo, so it should be specified there - not on the GBK.
Reuven On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <je...@seznam.cz> wrote: > Ok, I think I understand there might be some benefits of this. Then I'd > propose we make this clear on the GBK. If we would support somehing like > this: > > PCollection<?> input = ....; > > input.apply(GroupByKey.withLargeKeys()); > > then SparkRunner could expand this to repartitionAndSortWithinPartitions > only on this PTransform, and fallback to the default (in memory) in other > situations. The default expansion of LargeGroupByKey (let's say) would be > classic GBK, so that only runners that need to make sure that they don't > break reiterations can expand this. > > WDYT? > > Jan > On 9/27/19 8:56 PM, Reuven Lax wrote: > > As I mentioned above, CoGroupByKey already takes advantage of this. > Reiterating is not the most common use case, but it's definitely one that > comes up. Also keep in mind that this API has supported reiterating for the > past five years (since even before the SDK was donated to Apache). > Therefore you should assume that users are relying on it, in ways we might > not expect. > > Historically, Google's Flume system had collections that did not support > reiterating (they were even called OneShotCollections to make it clear). > This was the source of problems and user frustration, which was one reason > that in the original Dataflow SDK we made sure that these iterables could > be reiterated. Another reason why it's advantageous for a runner to support > this is allowing for better fusion. If two ParDos A and B both read from > the same GroupByKey, it is nice to be able to fuse them into one logical > operator. For this, you'll probably need a shuffle implementation that > allows two independent readers from the same shuffle session. > > How easy it is to implement reiterables that don't have to fit in memory > will depend on the runner. For Dataflow it's possible because the shuffle > session is logically persistent, so the runner can simply reread the > shuffle session. For other runners with different shuffle implementations, > it might be harder to support both properties. Maybe we should introduce a > new @RequiresReiteration annotation on ParDo? That way the Spark runner can > see this and switch to the in-memory version just for groupings consumed by > those ParDos. Runners that already support reiteration can ignore this > annotation, so it should be backwards compatible. > > Reuven > > On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz> wrote: > >> I'd like to know the use-case. Why would you *need* to actually iterate >> the grouped elements twice? By definition the first iteration would have to >> extract some statistic (or subset of elements that must fit into memory). >> This statistic can then be used as another input for the second iteration. >> Why not then calculate the statistic in a separate branch in the pipeline >> and feed it then into the ParDo as side input? That would be definitely >> more efficient, because the calculation of the statistic would be probably >> combinable (not sure if it is absolutely required to be combinable, but it >> seems probable). Even if the calculation would not be combinable, it is not >> less efficient than reiterating twice. Why then support multiple iterations >> (besides the fact that output of GBK is Iterable). Am I missing something? >> >> Jan >> On 9/27/19 6:36 PM, Reuven Lax wrote: >> >> This should be an element in the compatibility matrix as well. >> >> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> I am pretty surprised that we do not have a @Category(ValidatesRunner) >>> test in GroupByKeyTest that iterates multiple times. That is a major >>> oversight. We should have this test, and it can be disabled by the >>> SparkRunner's configuration. >>> >>> Kenn >>> >>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com> wrote: >>> >>>> The Dataflow version does not spill to disk. However Spark's design >>>> might require spilling to disk if you want that to be implemented properly. >>>> >>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org> wrote: >>>> >>>>> Hi, >>>>> >>>>> Spark's GBK is currently implemented using `sortBy(key and >>>>> value).mapPartition(...)` for non-merging windowing in order to support >>>>> large keys and large scale shuffles. Merging windowing is implemented >>>>> using >>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping), >>>>> which is by design unable to support large keys. >>>>> >>>>> As Jan noted, problem with mapPartition is, that its UDF receives an >>>>> Iterator. Only option here is to wrap this iterator to one that spills to >>>>> disk once an internal buffer is exceeded (the approach suggested by >>>>> Reuven). This unfortunately comes with a cost in some cases. The best >>>>> approach would be to somehow determine, that user wants multiple >>>>> iterations >>>>> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas >>>>> how to approach this? >>>>> >>>>> D. >>>>> >>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> The Beam API was written to support multiple iterations, and there >>>>>> are definitely transforms that do so. I believe that CoGroupByKey may do >>>>>> this as well with the resulting iterator. >>>>>> >>>>>> I know that the Dataflow runner is able to handles iterators larger >>>>>> than available memory by paging them in from shuffle, which still allows >>>>>> for reiterating. It sounds like Spark is less flexible here? >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz> wrote: >>>>>> >>>>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org> >>>>>>> >>>>>>> Lukasz, why do you think that users expect to be able to iterate >>>>>>> multiple times grouped elements? Besides that it obviously suggests the >>>>>>> 'Iterable'? The way that spark behaves is pretty much analogous to how >>>>>>> MapReduce used to work - in certain cases it calles >>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, which >>>>>>> accepts Iterator - that is because internally it merge sorts pre sorted >>>>>>> segments. This approach enables to GroupByKey data sets that are too >>>>>>> big to >>>>>>> fit into memory (per key). >>>>>>> >>>>>>> If multiple iterations should be expected by users, we probably >>>>>>> should: >>>>>>> >>>>>>> a) include that in @ValidatesRunner tests >>>>>>> >>>>>>> b) store values in memory on spark, which will break for certain >>>>>>> pipelines >>>>>>> >>>>>>> Because of (b) I think that it would be much better to remove this >>>>>>> "expectation" and clearly document that the Iterable is not supposed to >>>>>>> be >>>>>>> iterated multiple times. >>>>>>> >>>>>>> Jan >>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote: >>>>>>> >>>>>>> I pretty much think so, because that is how Spark works. The >>>>>>> Iterable inside is really an Iterator, which cannot be iterated multiple >>>>>>> times. >>>>>>> >>>>>>> Jan >>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote: >>>>>>> >>>>>>> Jan, in Beam users expect to be able to iterate the GBK output >>>>>>> multiple times even from within the same ParDo. >>>>>>> Is this something that Beam on Spark Runner never supported? >>>>>>> >>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Gershi, >>>>>>>> >>>>>>>> could you please outline the pipeline you are trying to execute? >>>>>>>> Basically, you cannot iterate the Iterable multiple times in single >>>>>>>> ParDo. >>>>>>>> It should be possible, though, to apply multiple ParDos to output from >>>>>>>> GroupByKey. >>>>>>>> >>>>>>>> Jan >>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I want to iterate multiple times on the Iterable<V> (the output of >>>>>>>> GroupByKey transformation) >>>>>>>> >>>>>>>> When my Runner is SparkRunner, I get an exception: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be >>>>>>>> iterated more than once,otherwise there could be data lost >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221) >>>>>>>> >>>>>>>> at java.lang.Iterable.spliterator(Iterable.java:101) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I understood I can branch the pipeline after GroupByKey into >>>>>>>> multiple transformation and iterate in each of them once on the >>>>>>>> Iterable<V>. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Is there a better way for that? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi* >>>>>>>> >>>>>>>> Software Developer >>>>>>>> >>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718> >>>>>>>> >>>>>>>> [image: Mail_signature_blue] >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>