I understand the concerns. Still, it looks a little like we want to be able to modify behavior of an object from inside a submodule - quite like if my subprogram would accept a Map interface, but internally I would say "hey, this is supposed to be a HashMap, please change it so". Because of how pipeline is constructed, we can do that, the question is if there really isn't a better solution.

What I do not like about the proposed solution:

 1) to specify that the grouped elements are supposed to be iterated only once can be done only on ParDo, although there are other (higher level) PTransforms, that can consume output of GBK

 2) the annontation on ParDo is by definition generic - i.e. can be used on input which is not output of GBK, which makes no sense

 3) we modify the behavior to unlock some optimizations (or change of behavior of the GBK itself), users will not understand that

 4) the annotation somewhat arbitrarily modifies data types passed, that is counter-intuitive and will be source of confusion

I think that a solution that solves the above problems (but brings somoe new, as always :-)), could be to change the output of GBK from PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can control which operators (and how) consume the grouping, and we can enable these transforms to specify additional parameters (like how they want to consume the grouping). It is obviously a breaking change (although can be probably made backwards compatible) and it would very much likely involve a substantial work. But maybe there are some other not yet discussed options.

Jan

On 9/28/19 6:46 AM, Reuven Lax wrote:
In many cases, the writer of the ParDo has no access to the GBK (e.g. the GBK is hidden inside an upstream PTransform that they cannot modify). This is the same reason why RequiresStableInput was made a property of the ParDo, because the GroupByKey is quite often inaccessible.

The car analogy doesn't quite apply here, because the runner does have a full view of the pipeline so can satisfy all constraints. The car dealership generally cannot read your mind (thankfully!), so you have to specify what you want. Or to put it another way, the various transforms in a Beam pipeline do not live in isolation. The full pipeline graph is what is executed, and the runner already has to analyze the full graph to run the pipeline (as well as to optimize the pipeline).

Reuven

On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    I'd suggest Stream instead of Iterator, it has the same semantics
    and much better API.

    Still not sure, what is wrong on letting the GBK to decide this. I
    have an analogy - if I decide to buy a car, I have to decide
    *what* car I'm going to buy (by think about how I'm going to use
    it) *before* I buy it. I cannot just buy "a car" and then change
    it from minivan to sport car based on my actual need. Same with
    the GBK - if I want to be able to reiterate the result, then I
    should tell it in advance.

    Jan

    On 9/27/19 10:50 PM, Kenneth Knowles wrote:
    Good point about sibling fusion requiring this.

    The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does
    imply that the output iterable can be iterated arbitrarily many
    times.

    I think this should remain the default for all the reasons mentioned.

    We could have opt-in to the weaker KV<K, Iterator<V>> version.
    Agree that this is a property of the ParDo. A particular use of a
    GBK has no idea what is downstream. If you owned the whole
    pipeline, a special ParDo<Iterator<V>, Foo> would work. But to
    make the types line up, this would require changes upstream,
    which is not good.

    Maybe something like this:

    ParDo<Iterable<V>, Foo> {
      @ProcessElement
      void process(@OneShotIterator Iterator<V> iter) {
        ...
      }
    }

    I've described all of this in terms of Java SDK. So we would need
    a portable representation for all this metadata.

    Kenn

    On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com
    <mailto:re...@google.com>> wrote:

        I think the behavior to make explicit is the need to
        reiterate, not the need to handle large results. How large of
        a result can be handled will always be dependent on the
        runner, and each runner will probably have a different
        definition of large keys. Reiteration however is a logical
        difference in the programming API. Therefore I think it makes
        sense to specify the latter. The need to reiterate is a
        property of the downstream ParDo, so it should be specified
        there - not on the GBK.

        Reuven

        On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

            Ok, I think I understand there might be some benefits of
            this. Then I'd propose we make this clear on the GBK. If
            we would support somehing like this:

             PCollection<?> input = ....;

             input.apply(GroupByKey.withLargeKeys());

            then SparkRunner could expand this to
            repartitionAndSortWithinPartitions only on this
            PTransform, and fallback to the default (in memory) in
            other situations. The default expansion of
            LargeGroupByKey (let's say) would be classic GBK, so that
            only runners that need to make sure that they don't break
            reiterations can expand this.

            WDYT?

            Jan

            On 9/27/19 8:56 PM, Reuven Lax wrote:
            As I mentioned above, CoGroupByKey already takes
            advantage of this. Reiterating is not the most common
            use case, but it's definitely one that comes up. Also
            keep in mind that this API has supported reiterating for
            the past five years (since even before the SDK was
            donated to Apache). Therefore you should assume that
            users are relying on it, in ways we might not expect.

            Historically, Google's Flume system had collections that
            did not support reiterating (they were even called
            OneShotCollections to make it clear). This was the
            source of problems and user frustration, which was one
            reason that in the original Dataflow SDK we made sure
            that these iterables could be reiterated. Another reason
            why it's advantageous for a runner to support this is
            allowing for better fusion. If two ParDos A and B both
            read from the same GroupByKey, it is nice to be able to
            fuse them into one logical operator. For this, you'll
            probably need a shuffle implementation that allows two
            independent readers from the same shuffle session.

            How easy it is to implement reiterables that don't have
            to fit in memory will depend on the runner.  For
            Dataflow it's possible because the shuffle session is
            logically persistent, so the runner can simply reread
            the shuffle session. For other runners with different
            shuffle implementations, it might be harder to support
            both properties. Maybe we should introduce a
            new @RequiresReiteration annotation on ParDo? That way
            the Spark runner can see this and switch to the
            in-memory version just for groupings consumed by those
            ParDos. Runners that already support reiteration can
            ignore this annotation, so it should be backwards
            compatible.

            Reuven

            On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                I'd like to know the use-case. Why would you *need*
                to actually iterate the grouped elements twice? By
                definition the first iteration would have to extract
                some statistic (or subset of elements that must fit
                into memory). This statistic can then be used as
                another input for the second iteration. Why not then
                calculate the statistic in a separate branch in the
                pipeline and feed it then into the ParDo as side
                input? That would be definitely more efficient,
                because the calculation of the statistic would be
                probably combinable (not sure if it is absolutely
                required to be combinable, but it seems probable).
                Even if the calculation would not be combinable, it
                is not less efficient than reiterating twice. Why
                then support multiple iterations (besides the fact
                that output of GBK is Iterable). Am I missing something?

                Jan

                On 9/27/19 6:36 PM, Reuven Lax wrote:
                This should be an element in the compatibility
                matrix as well.

                On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles
                <k...@apache.org <mailto:k...@apache.org>> wrote:

                    I am pretty surprised that we do not have
                    a @Category(ValidatesRunner) test
                    in GroupByKeyTest that iterates multiple times.
                    That is a major oversight. We should have this
                    test, and it can be disabled by the
                    SparkRunner's configuration.

                    Kenn

                    On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax
                    <re...@google.com <mailto:re...@google.com>> wrote:

                        The Dataflow version does not spill to
                        disk. However Spark's design might require
                        spilling to disk if you want that to be
                        implemented properly.

                        On Fri, Sep 27, 2019 at 9:08 AM David
                        Morávek <d...@apache.org
                        <mailto:d...@apache.org>> wrote:

                            Hi,

                            Spark's GBK is currently implemented
                            using `sortBy(key and
                            value).mapPartition(...)` for
                            non-merging windowing in order to
                            support large keys and large scale
                            shuffles. Merging windowing is
                            implemented using standard GBK
                            (underlying spark impl. uses
                            ListCombiner + Hash Grouping), which is
                            by design unable to support large keys.

                            As Jan noted, problem with mapPartition
                            is, that its UDF receives an Iterator.
                            Only option here is to wrap this
                            iterator to one that spills to disk
                            once an internal buffer is exceeded
                            (the approach suggested by Reuven).
                            This unfortunately comes with a cost in
                            some cases. The best approach would be
                            to somehow determine, that user wants
                            multiple iterations and than wrap it in
                            "re-iterator" if necessary. Does anyone
                            have any ideas how to approach this?

                            D.

                            On Fri, Sep 27, 2019 at 5:46 PM Reuven
                            Lax <re...@google.com
                            <mailto:re...@google.com>> wrote:

                                The Beam API was written to support
                                multiple iterations, and there are
                                definitely transforms that do so. I
                                believe that CoGroupByKey may do
                                this as well with the resulting
                                iterator.

                                I know that the Dataflow runner is
                                able to handles iterators larger
                                than available memory by paging
                                them in from shuffle, which still
                                allows for reiterating. It sounds
                                like Spark is less flexible here?

                                Reuven

                                On Fri, Sep 27, 2019 at 3:04 AM Jan
                                Lukavský <je...@seznam.cz
                                <mailto:je...@seznam.cz>> wrote:

                                    +dev <dev@beam.apache.org>
                                    <mailto:dev@beam.apache.org>

                                    Lukasz, why do you think that
                                    users expect to be able to
                                    iterate multiple times grouped
                                    elements? Besides that it
                                    obviously suggests the
                                    'Iterable'? The way that spark
                                    behaves is pretty much
                                    analogous to how MapReduce used
                                    to work - in certain cases it
                                    calles
                                    repartitionAndSortWithinPartitions
                                    and then does mapPartition,
                                    which accepts Iterator - that
                                    is because internally it merge
                                    sorts pre sorted segments. This
                                    approach enables to GroupByKey
                                    data sets that are too big to
                                    fit into memory (per key).

                                    If multiple iterations should
                                    be expected by users, we
                                    probably should:

                                     a) include that in
                                    @ValidatesRunner tests

                                     b) store values in memory on
                                    spark, which will break for
                                    certain pipelines

                                    Because of (b) I think that it
                                    would be much better to remove
                                    this "expectation" and clearly
                                    document that the Iterable is
                                    not supposed to be iterated
                                    multiple times.

                                    Jan

                                    On 9/27/19 9:27 AM, Jan
                                    Lukavský wrote:

                                    I pretty much think so,
                                    because that is how Spark
                                    works. The Iterable inside is
                                    really an Iterator, which
                                    cannot be iterated multiple times.

                                    Jan

                                    On 9/27/19 2:00 AM, Lukasz
                                    Cwik wrote:
                                    Jan, in Beam users expect to
                                    be able to iterate the GBK
                                    output multiple times even
                                    from within the same ParDo.
                                    Is this something that Beam
                                    on Spark Runner never supported?

                                    On Thu, Sep 26, 2019 at 6:50
                                    AM Jan Lukavský
                                    <je...@seznam.cz
                                    <mailto:je...@seznam.cz>> wrote:

                                        Hi Gershi,

                                        could you please outline
                                        the pipeline you are
                                        trying to execute?
                                        Basically, you cannot
                                        iterate the Iterable
                                        multiple times in single
                                        ParDo. It should be
                                        possible, though, to
                                        apply multiple ParDos to
                                        output from GroupByKey.

                                        Jan

                                        On 9/26/19 3:32 PM,
                                        Gershi, Noam wrote:

                                        Hi,

                                        I want to iterate
                                        multiple times on the
                                        Iterable<V> (the output
                                        of GroupByKey
                                        transformation)

                                        When my Runner is
                                        SparkRunner, I get an
                                        exception:

                                        Caused by:
                                        java.lang.IllegalStateException:
                                        ValueIterator can't be
                                        iterated more than
                                        once,otherwise there
                                        could be data lost

                                        at
                                        
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)

                                        at
                                        
java.lang.Iterable.spliterator(Iterable.java:101)

                                        I understood I can
                                        branch the pipeline
                                        after GroupByKey into
                                        multiple transformation
                                        and iterate in each of
                                        them once on the
                                        Iterable<V>.

                                        Is there a better way
                                        for that?

                                        citi_logo_mailciti_logo_mail*Noam
                                        Gershi*

                                        Software Developer

                                        *T*:+972 (3) 7405718
                                        <tel:+972%203-740-5718>

                                        Mail_signature_blue

Reply via email to