Ok, I think I understand there might be some benefits of this. Then I'd propose we make this clear on the GBK. If we would support somehing like this:

 PCollection<?> input = ....;

 input.apply(GroupByKey.withLargeKeys());

then SparkRunner could expand this to repartitionAndSortWithinPartitions only on this PTransform, and fallback to the default (in memory) in other situations. The default expansion of LargeGroupByKey (let's say) would be classic GBK, so that only runners that need to make sure that they don't break reiterations can expand this.

WDYT?

Jan

On 9/27/19 8:56 PM, Reuven Lax wrote:
As I mentioned above, CoGroupByKey already takes advantage of this. Reiterating is not the most common use case, but it's definitely one that comes up. Also keep in mind that this API has supported reiterating for the past five years (since even before the SDK was donated to Apache). Therefore you should assume that users are relying on it, in ways we might not expect.

Historically, Google's Flume system had collections that did not support reiterating (they were even called OneShotCollections to make it clear). This was the source of problems and user frustration, which was one reason that in the original Dataflow SDK we made sure that these iterables could be reiterated. Another reason why it's advantageous for a runner to support this is allowing for better fusion. If two ParDos A and B both read from the same GroupByKey, it is nice to be able to fuse them into one logical operator. For this, you'll probably need a shuffle implementation that allows two independent readers from the same shuffle session.

How easy it is to implement reiterables that don't have to fit in memory will depend on the runner.  For Dataflow it's possible because the shuffle session is logically persistent, so the runner can simply reread the shuffle session. For other runners with different shuffle implementations, it might be harder to support both properties. Maybe we should introduce a new @RequiresReiteration annotation on ParDo? That way the Spark runner can see this and switch to the in-memory version just for groupings consumed by those ParDos. Runners that already support reiteration can ignore this annotation, so it should be backwards compatible.

Reuven

On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    I'd like to know the use-case. Why would you *need* to actually
    iterate the grouped elements twice? By definition the first
    iteration would have to extract some statistic (or subset of
    elements that must fit into memory). This statistic can then be
    used as another input for the second iteration. Why not then
    calculate the statistic in a separate branch in the pipeline and
    feed it then into the ParDo as side input? That would be
    definitely more efficient, because the calculation of the
    statistic would be probably combinable (not sure if it is
    absolutely required to be combinable, but it seems probable). Even
    if the calculation would not be combinable, it is not less
    efficient than reiterating twice. Why then support multiple
    iterations (besides the fact that output of GBK is Iterable). Am I
    missing something?

    Jan

    On 9/27/19 6:36 PM, Reuven Lax wrote:
    This should be an element in the compatibility matrix as well.

    On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org
    <mailto:k...@apache.org>> wrote:

        I am pretty surprised that we do not have
        a @Category(ValidatesRunner) test in GroupByKeyTest that
        iterates multiple times. That is a major oversight. We should
        have this test, and it can be disabled by the SparkRunner's
        configuration.

        Kenn

        On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com
        <mailto:re...@google.com>> wrote:

            The Dataflow version does not spill to disk. However
            Spark's design might require spilling to disk if you want
            that to be implemented properly.

            On Fri, Sep 27, 2019 at 9:08 AM David Morávek
            <d...@apache.org <mailto:d...@apache.org>> wrote:

                Hi,

                Spark's GBK is currently implemented using
                `sortBy(key and value).mapPartition(...)` for
                non-merging windowing in order to support large keys
                and large scale shuffles. Merging windowing is
                implemented using standard GBK (underlying spark
                impl. uses ListCombiner + Hash Grouping), which is by
                design unable to support large keys.

                As Jan noted, problem with mapPartition is, that its
                UDF receives an Iterator. Only option here is to wrap
                this iterator to one that spills to disk once an
                internal buffer is exceeded (the approach suggested
                by Reuven). This unfortunately comes with a cost in
                some cases. The best approach would be to somehow
                determine, that user wants multiple iterations and
                than wrap it in "re-iterator" if necessary. Does
                anyone have any ideas how to approach this?

                D.

                On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax
                <re...@google.com <mailto:re...@google.com>> wrote:

                    The Beam API was written to support multiple
                    iterations, and there are definitely transforms
                    that do so. I believe that CoGroupByKey may do
                    this as well with the resulting iterator.

                    I know that the Dataflow runner is able to
                    handles iterators larger than available memory by
                    paging them in from shuffle, which still allows
                    for reiterating. It sounds like Spark is less
                    flexible here?

                    Reuven

                    On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský
                    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                        +dev <dev@beam.apache.org>
                        <mailto:dev@beam.apache.org>

                        Lukasz, why do you think that users expect to
                        be able to iterate multiple times grouped
                        elements? Besides that it obviously suggests
                        the 'Iterable'? The way that spark behaves is
                        pretty much analogous to how MapReduce used
                        to work - in certain cases it calles
                        repartitionAndSortWithinPartitions and then
                        does mapPartition, which accepts Iterator -
                        that is because internally it merge sorts pre
                        sorted segments. This approach enables to
                        GroupByKey data sets that are too big to fit
                        into memory (per key).

                        If multiple iterations should be expected by
                        users, we probably should:

                         a) include that in @ValidatesRunner tests

                         b) store values in memory on spark, which
                        will break for certain pipelines

                        Because of (b) I think that it would be much
                        better to remove this "expectation" and
                        clearly document that the Iterable is not
                        supposed to be iterated multiple times.

                        Jan

                        On 9/27/19 9:27 AM, Jan Lukavský wrote:

                        I pretty much think so, because that is how
                        Spark works. The Iterable inside is really
                        an Iterator, which cannot be iterated
                        multiple times.

                        Jan

                        On 9/27/19 2:00 AM, Lukasz Cwik wrote:
                        Jan, in Beam users expect to be able to
                        iterate the GBK output multiple times even
                        from within the same ParDo.
                        Is this something that Beam on Spark Runner
                        never supported?

                        On Thu, Sep 26, 2019 at 6:50 AM Jan
                        Lukavský <je...@seznam.cz
                        <mailto:je...@seznam.cz>> wrote:

                            Hi Gershi,

                            could you please outline the pipeline
                            you are trying to execute? Basically,
                            you cannot iterate the Iterable
                            multiple times in single ParDo. It
                            should be possible, though, to apply
                            multiple ParDos to output from GroupByKey.

                            Jan

                            On 9/26/19 3:32 PM, Gershi, Noam wrote:

                            Hi,

                            I want to iterate multiple times on
                            the Iterable<V> (the output of
                            GroupByKey transformation)

                            When my Runner is SparkRunner, I get
                            an exception:

                            Caused by:
                            java.lang.IllegalStateException:
                            ValueIterator can't be iterated more
                            than once,otherwise there could be
                            data lost

                            at
                            
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)

                            at
                            java.lang.Iterable.spliterator(Iterable.java:101)

                            I understood I can branch the pipeline
                            after GroupByKey into multiple
                            transformation and iterate in each of
                            them once on the Iterable<V>.

                            Is there a better way for that?

                            citi_logo_mailciti_logo_mail*Noam Gershi*

                            Software Developer

                            *T*:+972 (3) 7405718
                            <tel:+972%203-740-5718>

                            Mail_signature_blue

Reply via email to