I'd suggest Stream instead of Iterator, it has the same semantics and much better API.

Still not sure, what is wrong on letting the GBK to decide this. I have an analogy - if I decide to buy a car, I have to decide *what* car I'm going to buy (by think about how I'm going to use it) *before* I buy it. I cannot just buy "a car" and then change it from minivan to sport car based on my actual need. Same with the GBK - if I want to be able to reiterate the result, then I should tell it in advance.

Jan

On 9/27/19 10:50 PM, Kenneth Knowles wrote:
Good point about sibling fusion requiring this.

The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does imply that the output iterable can be iterated arbitrarily many times.

I think this should remain the default for all the reasons mentioned.

We could have opt-in to the weaker KV<K, Iterator<V>> version. Agree that this is a property of the ParDo. A particular use of a GBK has no idea what is downstream. If you owned the whole pipeline, a special ParDo<Iterator<V>, Foo> would work. But to make the types line up, this would require changes upstream, which is not good.

Maybe something like this:

ParDo<Iterable<V>, Foo> {
  @ProcessElement
  void process(@OneShotIterator Iterator<V> iter) {
    ...
  }
}

I've described all of this in terms of Java SDK. So we would need a portable representation for all this metadata.

Kenn

On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com <mailto:re...@google.com>> wrote:

    I think the behavior to make explicit is the need to reiterate,
    not the need to handle large results. How large of a result can be
    handled will always be dependent on the runner, and each runner
    will probably have a different definition of large keys.
    Reiteration however is a logical difference in the programming
    API. Therefore I think it makes sense to specify the latter. The
    need to reiterate is a property of the downstream ParDo, so it
    should be specified there - not on the GBK.

    Reuven

    On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Ok, I think I understand there might be some benefits of this.
        Then I'd propose we make this clear on the GBK. If we would
        support somehing like this:

         PCollection<?> input = ....;

         input.apply(GroupByKey.withLargeKeys());

        then SparkRunner could expand this to
        repartitionAndSortWithinPartitions only on this PTransform,
        and fallback to the default (in memory) in other situations.
        The default expansion of LargeGroupByKey (let's say) would be
        classic GBK, so that only runners that need to make sure that
        they don't break reiterations can expand this.

        WDYT?

        Jan

        On 9/27/19 8:56 PM, Reuven Lax wrote:
        As I mentioned above, CoGroupByKey already takes advantage of
        this. Reiterating is not the most common use case, but it's
        definitely one that comes up. Also keep in mind that this API
        has supported reiterating for the past five years (since even
        before the SDK was donated to Apache). Therefore you should
        assume that users are relying on it, in ways we might not
        expect.

        Historically, Google's Flume system had collections that did
        not support reiterating (they were even called
        OneShotCollections to make it clear). This was the source of
        problems and user frustration, which was one reason that in
        the original Dataflow SDK we made sure that these iterables
        could be reiterated. Another reason why it's advantageous for
        a runner to support this is allowing for better fusion. If
        two ParDos A and B both read from the same GroupByKey, it is
        nice to be able to fuse them into one logical operator. For
        this, you'll probably need a shuffle implementation that
        allows two independent readers from the same shuffle session.

        How easy it is to implement reiterables that don't have to
        fit in memory will depend on the runner.  For Dataflow it's
        possible because the shuffle session is logically persistent,
        so the runner can simply reread the shuffle session. For
        other runners with different shuffle implementations, it
        might be harder to support both properties. Maybe we should
        introduce a new @RequiresReiteration annotation on ParDo?
        That way the Spark runner can see this and switch to the
        in-memory version just for groupings consumed by those
        ParDos. Runners that already support reiteration can ignore
        this annotation, so it should be backwards compatible.

        Reuven

        On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

            I'd like to know the use-case. Why would you *need* to
            actually iterate the grouped elements twice? By
            definition the first iteration would have to extract some
            statistic (or subset of elements that must fit into
            memory). This statistic can then be used as another input
            for the second iteration. Why not then calculate the
            statistic in a separate branch in the pipeline and feed
            it then into the ParDo as side input? That would be
            definitely more efficient, because the calculation of the
            statistic would be probably combinable (not sure if it is
            absolutely required to be combinable, but it seems
            probable). Even if the calculation would not be
            combinable, it is not less efficient than reiterating
            twice. Why then support multiple iterations (besides the
            fact that output of GBK is Iterable). Am I missing something?

            Jan

            On 9/27/19 6:36 PM, Reuven Lax wrote:
            This should be an element in the compatibility matrix as
            well.

            On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles
            <k...@apache.org <mailto:k...@apache.org>> wrote:

                I am pretty surprised that we do not have
                a @Category(ValidatesRunner) test in GroupByKeyTest
                that iterates multiple times. That is a major
                oversight. We should have this test, and it can be
                disabled by the SparkRunner's configuration.

                Kenn

                On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax
                <re...@google.com <mailto:re...@google.com>> wrote:

                    The Dataflow version does not spill to disk.
                    However Spark's design might require spilling to
                    disk if you want that to be implemented properly.

                    On Fri, Sep 27, 2019 at 9:08 AM David Morávek
                    <d...@apache.org <mailto:d...@apache.org>> wrote:

                        Hi,

                        Spark's GBK is currently implemented using
                        `sortBy(key and value).mapPartition(...)`
                        for non-merging windowing in order to
                        support large keys and large scale shuffles.
                        Merging windowing is implemented using
                        standard GBK (underlying spark impl. uses
                        ListCombiner + Hash Grouping), which is by
                        design unable to support large keys.

                        As Jan noted, problem with mapPartition is,
                        that its UDF receives an Iterator. Only
                        option here is to wrap this iterator to one
                        that spills to disk once an internal buffer
                        is exceeded (the approach suggested by
                        Reuven). This unfortunately comes with a
                        cost in some cases. The best approach would
                        be to somehow determine, that user wants
                        multiple iterations and than wrap it in
                        "re-iterator" if necessary. Does anyone have
                        any ideas how to approach this?

                        D.

                        On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax
                        <re...@google.com <mailto:re...@google.com>>
                        wrote:

                            The Beam API was written to support
                            multiple iterations, and there are
                            definitely transforms that do so. I
                            believe that CoGroupByKey may do this as
                            well with the resulting iterator.

                            I know that the Dataflow runner is able
                            to handles iterators larger than
                            available memory by paging them in from
                            shuffle, which still allows for
                            reiterating. It sounds like Spark is
                            less flexible here?

                            Reuven

                            On Fri, Sep 27, 2019 at 3:04 AM Jan
                            Lukavský <je...@seznam.cz
                            <mailto:je...@seznam.cz>> wrote:

                                +dev <dev@beam.apache.org>
                                <mailto:dev@beam.apache.org>

                                Lukasz, why do you think that users
                                expect to be able to iterate
                                multiple times grouped elements?
                                Besides that it obviously suggests
                                the 'Iterable'? The way that spark
                                behaves is pretty much analogous to
                                how MapReduce used to work - in
                                certain cases it calles
                                repartitionAndSortWithinPartitions
                                and then does mapPartition, which
                                accepts Iterator - that is because
                                internally it merge sorts pre sorted
                                segments. This approach enables to
                                GroupByKey data sets that are too
                                big to fit into memory (per key).

                                If multiple iterations should be
                                expected by users, we probably should:

                                 a) include that in @ValidatesRunner
                                tests

                                 b) store values in memory on spark,
                                which will break for certain pipelines

                                Because of (b) I think that it would
                                be much better to remove this
                                "expectation" and clearly document
                                that the Iterable is not supposed to
                                be iterated multiple times.

                                Jan

                                On 9/27/19 9:27 AM, Jan Lukavský wrote:

                                I pretty much think so, because
                                that is how Spark works. The
                                Iterable inside is really an
                                Iterator, which cannot be iterated
                                multiple times.

                                Jan

                                On 9/27/19 2:00 AM, Lukasz Cwik wrote:
                                Jan, in Beam users expect to be
                                able to iterate the GBK output
                                multiple times even from within
                                the same ParDo.
                                Is this something that Beam on
                                Spark Runner never supported?

                                On Thu, Sep 26, 2019 at 6:50 AM
                                Jan Lukavský <je...@seznam.cz
                                <mailto:je...@seznam.cz>> wrote:

                                    Hi Gershi,

                                    could you please outline the
                                    pipeline you are trying to
                                    execute? Basically, you cannot
                                    iterate the Iterable multiple
                                    times in single ParDo. It
                                    should be possible, though, to
                                    apply multiple ParDos to
                                    output from GroupByKey.

                                    Jan

                                    On 9/26/19 3:32 PM, Gershi,
                                    Noam wrote:

                                    Hi,

                                    I want to iterate multiple
                                    times on the Iterable<V> (the
                                    output of GroupByKey
                                    transformation)

                                    When my Runner is
                                    SparkRunner, I get an exception:

                                    Caused by:
                                    java.lang.IllegalStateException:
                                    ValueIterator can't be
                                    iterated more than
                                    once,otherwise there could be
                                    data lost

                                    at
                                    
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)

                                    at
                                    
java.lang.Iterable.spliterator(Iterable.java:101)

                                    I understood I can branch the
                                    pipeline after GroupByKey
                                    into multiple transformation
                                    and iterate in each of them
                                    once on the Iterable<V>.

                                    Is there a better way for that?

                                    citi_logo_mailciti_logo_mail*Noam
                                    Gershi*

                                    Software Developer

                                    *T*:+972 (3) 7405718
                                    <tel:+972%203-740-5718>

                                    Mail_signature_blue

Reply via email to