On 10/2/19 4:30 AM, Reuven Lax wrote:


On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    > The fact that the annotation on the ParDo "changes" the
    GroupByKey implementation is very specific to the Spark runner
    implementation.

    I don't quite agree. It is not very specific to Spark, it is
    specific to generally all runners, that produce grouped elements
    in a way that is not reiterable. That is the key property. The
    example you gave with HDFS does not satisfy this condition (files
    on HDFS are certainly reiterable), and that's why no change to the
    GBK is needed (it actually already has the required property). A
    quick look at what FlinkRunner (at least non portable does) is
    that it implements GBK using reducing elements into List. That is
    going to crash on big PCollection, which is even nicely documented:

        * <p>For internal use to translate {@link GroupByKey}. For a large 
{@link PCollection} this is
        * expected to crash!

    If this is fixed, then it is likely to start behave the same as
    Spark. So actually I think the opposite is true - Dataflow is a
    special case, because of how its internal shuffle service works.


I think you misunderstood - I was not trying to dish on the Spark runner. Rather my point is that whether the GroupByKey implementation is affected or not is runner dependent. In some runners it is and in others it isn't. However in all cases the /semantics/ of the ParDo is affected. Since Beam tries as much as possible to be runner agnostic, we should default to making the change where there is an obvious semantic difference.


I understand that, but I just don't think, that a semantics should be affected by this. If outcome of GBK is Iterable, then it should be reiterable, that is how Iterable works, so I now lean more towards a conclusion, that the current behavior of Spark runner simply breaks this contract. Solution of this would be to introduce the proposed GroupByKeyOneShot or Reduce(By|Per)Key.



    > In general I sympathize with the worry about non-local effects.
    Beam is already full of them (e.g. a Window.into statement effects
    downstream GroupByKeys). In each case where they were added there
    was extensive debate and discussion (Windowing semantics were
    debated for many months), exactly because there was concern over
    adding these non-local effects. In every case, no other good
    solution could be found. For the case of windowing for example, it
    was often easy to propose simple local APIs (e.g. just pass the
    window fn as a parameter to GroupByKey), however all of these
    local solutions ended up not working for important use cases when
    we analyzed them more deeply.

    That is very interesting. Could you elaborate more about some
    examples of the use cases which didn't work? I'd like to try to
    match it against how Euphoria is structured, it should be more
    resistant to this non-local effects, because it very often bundles
    together multiple Beam's primitives to single transform -
    ReduceByKey is one example of this, if is actually mix of
    Window.into() + GBK + ParDo, Although it might look like if this
    transform can be broken down to something else, then it is not
    primitive (euphoria has no native equivalent of GBK itself), but
    it has several other nice implications - that is that Combine now
    becomes a special case of RBK. It now becomes only a question of
    where and how you can "run" the reduce function. The logic is
    absolutely equal. This can be worked in more detail and actually
    show, that even Combine and RBK can be decribed by a more general
    stateful operation (ReduceStateByKey), and so finally Euphoria
    actually has only two really "primitive" operations - these are
    FlatMap (basically stateless ParDo) and RSBK. As I already
    mentioned on some other thread, when stateful ParDo would support
    merging windows, it can be shown that both Combine and GBK become
    special cases of this.

    > As you mentioned below, I do think it's perfectly reasonable for a
    DSL to impose its own semantics. Scio already does this - the raw
    Beam API is used by a DSL as a substrate, but the DSL does not
    need to blindly mirror the semantics of the raw Beam API - at
    least in my opinion!

    Sure, but currently, there is no way for DSL to "hook" into
    runner, so it has to use raw Beam SDK, and so this will fail in
    cases like this - where Beam actually has stronger guarantees than
    it is required by the DSL. It would be cool if we could find a way
    to do that - this pretty much aligns with another question raised
    on ML, about the possibility to override a default implementation
    of a PTransform for specific pipeline.

    Jan


    On 9/29/19 7:46 PM, Reuven Lax wrote:
    Jan,

    The fact that the annotation on the ParDo "changes" the
    GroupByKey implementation is very specific to the Spark runner
    implementation. You can imagine another runner that simply writes
    out files in HDFS to implement a GroupByKey - this GroupByKey
    implementation is agnostic whether the result will be reiterated
    or not; in this case it is very much the ParDo implementation
    that changes to implement a reiterable. vI think you don't like
    the fact that an annotation on the ParDo will have a non-local
    effect on the implementation of the GroupByKey upstream. However
    arguably the non-local effect is just a quirk of how the Spark
    runner is implemented - other runners might have a local effect.

    In general I sympathize with the worry about non-local effects.
    Beam is already full of them (e.g. a Window.into statement
    effects downstream GroupByKeys). In each case where they were
    added there was extensive debate and discussion (Windowing
    semantics were debated for many months), exactly because there
    was concern over adding these non-local effects. In every case,
    no other good solution could be found. For the case of windowing
    for example, it was often easy to propose simple local APIs (e.g.
    just pass the window fn as a parameter to GroupByKey), however
    all of these local solutions ended up not working for important
    use cases when we analyzed them more deeply.

    As you mentioned below, I do think it's perfectly reasonable for
    a DSL to impose its own semantics. Scio already does this - the
    raw Beam API is used by a DSL as a substrate, but the DSL does
    not need to blindly mirror the semantics of the raw Beam API - at
    least in my opinion!

    Reuven

    On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        I understand the concerns. Still, it looks a little like we
        want to be able to modify behavior of an object from inside a
        submodule - quite like if my subprogram would accept a Map
        interface, but internally I would say "hey, this is supposed
        to be a HashMap, please change it so". Because of how
        pipeline is constructed, we can do that, the question is if
        there really isn't a better solution.

        What I do not like about the proposed solution:

         1) to specify that the grouped elements are supposed to be
        iterated only once can be done only on ParDo, although there
        are other (higher level) PTransforms, that can consume output
        of GBK

         2) the annontation on ParDo is by definition generic - i.e.
        can be used on input which is not output of GBK, which makes
        no sense

         3) we modify the behavior to unlock some optimizations (or
        change of behavior of the GBK itself), users will not
        understand that

         4) the annotation somewhat arbitrarily modifies data types
        passed, that is counter-intuitive and will be source of confusion

        I think that a solution that solves the above problems (but
        brings somoe new, as always :-)), could be to change the
        output of GBK from PCollection<K, Iterable<V>> to
        GroupedPCollection<K, V>. That way we can control which
        operators (and how) consume the grouping, and we can enable
        these transforms to specify additional parameters (like how
        they want to consume the grouping). It is obviously a
        breaking change (although can be probably made backwards
        compatible) and it would very much likely involve a
        substantial work. But maybe there are some other not yet
        discussed options.

        Jan

        On 9/28/19 6:46 AM, Reuven Lax wrote:
        In many cases, the writer of the ParDo has no access to the
        GBK (e.g. the GBK is hidden inside an upstream PTransform
        that they cannot modify). This is the same reason why
        RequiresStableInput was made a property of the ParDo,
        because the GroupByKey is quite often inaccessible.

        The car analogy doesn't quite apply here, because the runner
        does have a full view of the pipeline so can satisfy all
        constraints. The car dealership generally cannot read your
        mind (thankfully!), so you have to specify what you want. Or
        to put it another way, the various transforms in a Beam
        pipeline do not live in isolation. The full pipeline graph
        is what is executed, and the runner already has to analyze
        the full graph to run the pipeline (as well as to optimize
        the pipeline).

        Reuven

        On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

            I'd suggest Stream instead of Iterator, it has the same
            semantics and much better API.

            Still not sure, what is wrong on letting the GBK to
            decide this. I have an analogy - if I decide to buy a
            car, I have to decide *what* car I'm going to buy (by
            think about how I'm going to use it) *before* I buy it.
            I cannot just buy "a car" and then change it from
            minivan to sport car based on my actual need. Same with
            the GBK - if I want to be able to reiterate the result,
            then I should tell it in advance.

            Jan

            On 9/27/19 10:50 PM, Kenneth Knowles wrote:
            Good point about sibling fusion requiring this.

            The type PTransform<KV<K, V>, KV<K, Iterable<V>>>
            already does imply that the output iterable can be
            iterated arbitrarily many times.

            I think this should remain the default for all the
            reasons mentioned.

            We could have opt-in to the weaker KV<K, Iterator<V>>
            version. Agree that this is a property of the ParDo. A
            particular use of a GBK has no idea what is downstream.
            If you owned the whole pipeline, a special
            ParDo<Iterator<V>, Foo> would work. But to make the
            types line up, this would require changes upstream,
            which is not good.

            Maybe something like this:

            ParDo<Iterable<V>, Foo> {
              @ProcessElement
              void process(@OneShotIterator Iterator<V> iter) {
                ...
              }
            }

            I've described all of this in terms of Java SDK. So we
            would need a portable representation for all this metadata.

            Kenn

            On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax
            <re...@google.com <mailto:re...@google.com>> wrote:

                I think the behavior to make explicit is the need
                to reiterate, not the need to handle large results.
                How large of a result can be handled will always be
                dependent on the runner, and each runner will
                probably have a different definition of large keys.
                Reiteration however is a logical difference in the
                programming API. Therefore I think it makes sense
                to specify the latter. The need to reiterate is a
                property of the downstream ParDo, so it should be
                specified there - not on the GBK.

                Reuven

                On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský
                <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                    Ok, I think I understand there might be some
                    benefits of this. Then I'd propose we make this
                    clear on the GBK. If we would support somehing
                    like this:

                     PCollection<?> input = ....;

                     input.apply(GroupByKey.withLargeKeys());

                    then SparkRunner could expand this to
                    repartitionAndSortWithinPartitions only on this
                    PTransform, and fallback to the default (in
                    memory) in other situations. The default
                    expansion of LargeGroupByKey (let's say) would
                    be classic GBK, so that only runners that need
                    to make sure that they don't break reiterations
                    can expand this.

                    WDYT?

                    Jan

                    On 9/27/19 8:56 PM, Reuven Lax wrote:
                    As I mentioned above, CoGroupByKey already
                    takes advantage of this. Reiterating is not
                    the most common use case, but it's definitely
                    one that comes up. Also keep in mind that this
                    API has supported reiterating for the past
                    five years (since even before the SDK was
                    donated to Apache). Therefore you should
                    assume that users are relying on it, in ways
                    we might not expect.

                    Historically, Google's Flume system had
                    collections that did not support reiterating
                    (they were even called OneShotCollections to
                    make it clear). This was the source of
                    problems and user frustration, which was one
                    reason that in the original Dataflow SDK we
                    made sure that these iterables could be
                    reiterated. Another reason why it's
                    advantageous for a runner to support this is
                    allowing for better fusion. If two ParDos A
                    and B both read from the same GroupByKey, it
                    is nice to be able to fuse them into one
                    logical operator. For this, you'll probably
                    need a shuffle implementation that allows two
                    independent readers from the same shuffle session.

                    How easy it is to implement reiterables that
                    don't have to fit in memory will depend on the
                    runner.  For Dataflow it's possible because
                    the shuffle session is logically persistent,
                    so the runner can simply reread the shuffle
                    session. For other runners with different
                    shuffle implementations, it might be harder to
                    support both properties. Maybe we should
                    introduce a new @RequiresReiteration
                    annotation on ParDo? That way the Spark runner
                    can see this and switch to the in-memory
                    version just for groupings consumed by those
                    ParDos. Runners that already support
                    reiteration can ignore this annotation, so it
                    should be backwards compatible.

                    Reuven

                    On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský
                    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                        I'd like to know the use-case. Why would
                        you *need* to actually iterate the grouped
                        elements twice? By definition the first
                        iteration would have to extract some
                        statistic (or subset of elements that must
                        fit into memory). This statistic can then
                        be used as another input for the second
                        iteration. Why not then calculate the
                        statistic in a separate branch in the
                        pipeline and feed it then into the ParDo
                        as side input? That would be definitely
                        more efficient, because the calculation of
                        the statistic would be probably combinable
                        (not sure if it is absolutely required to
                        be combinable, but it seems probable).
                        Even if the calculation would not be
                        combinable, it is not less efficient than
                        reiterating twice. Why then support
                        multiple iterations (besides the fact that
                        output of GBK is Iterable). Am I missing
                        something?

                        Jan

                        On 9/27/19 6:36 PM, Reuven Lax wrote:
                        This should be an element in the
                        compatibility matrix as well.

                        On Fri, Sep 27, 2019 at 9:26 AM Kenneth
                        Knowles <k...@apache.org
                        <mailto:k...@apache.org>> wrote:

                            I am pretty surprised that we do not
                            have a @Category(ValidatesRunner)
                            test in GroupByKeyTest that iterates
                            multiple times. That is a major
                            oversight. We should have this test,
                            and it can be disabled by the
                            SparkRunner's configuration.

                            Kenn

                            On Fri, Sep 27, 2019 at 9:24 AM
                            Reuven Lax <re...@google.com
                            <mailto:re...@google.com>> wrote:

                                The Dataflow version does not
                                spill to disk. However Spark's
                                design might require spilling to
                                disk if you want that to be
                                implemented properly.

                                On Fri, Sep 27, 2019 at 9:08 AM
                                David Morávek <d...@apache.org
                                <mailto:d...@apache.org>> wrote:

                                    Hi,

                                    Spark's GBK is currently
                                    implemented using `sortBy(key
                                    and value).mapPartition(...)`
                                    for non-merging windowing in
                                    order to support large keys
                                    and large scale shuffles.
                                    Merging windowing is
                                    implemented using standard
                                    GBK (underlying spark impl.
                                    uses ListCombiner + Hash
                                    Grouping), which is by design
                                    unable to support large keys.

                                    As Jan noted, problem with
                                    mapPartition is, that its UDF
                                    receives an Iterator. Only
                                    option here is to wrap this
                                    iterator to one that spills
                                    to disk once an internal
                                    buffer is exceeded (the
                                    approach suggested by
                                    Reuven). This unfortunately
                                    comes with a cost in some
                                    cases. The best approach
                                    would be to somehow
                                    determine, that user wants
                                    multiple iterations and than
                                    wrap it in "re-iterator" if
                                    necessary. Does anyone have
                                    any ideas how to approach this?

                                    D.

                                    On Fri, Sep 27, 2019 at 5:46
                                    PM Reuven Lax
                                    <re...@google.com
                                    <mailto:re...@google.com>> wrote:

                                        The Beam API was written
                                        to support multiple
                                        iterations, and there are
                                        definitely transforms
                                        that do so. I believe
                                        that CoGroupByKey may do
                                        this as well with the
                                        resulting iterator.

                                        I know that the Dataflow
                                        runner is able to handles
                                        iterators larger than
                                        available memory by
                                        paging them in from
                                        shuffle, which still
                                        allows for reiterating.
                                        It sounds like Spark is
                                        less flexible here?

                                        Reuven

                                        On Fri, Sep 27, 2019 at
                                        3:04 AM Jan Lukavský
                                        <je...@seznam.cz
                                        <mailto:je...@seznam.cz>>
                                        wrote:

                                            +dev
                                            <dev@beam.apache.org>
                                            <mailto:dev@beam.apache.org>


                                            Lukasz, why do you
                                            think that users
                                            expect to be able to
                                            iterate multiple
                                            times grouped
                                            elements? Besides
                                            that it obviously
                                            suggests the
                                            'Iterable'? The way
                                            that spark behaves is
                                            pretty much analogous
                                            to how MapReduce used
                                            to work - in certain
                                            cases it calles
                                            repartitionAndSortWithinPartitions
                                            and then does
                                            mapPartition, which
                                            accepts Iterator -
                                            that is because
                                            internally it merge
                                            sorts pre sorted
                                            segments. This
                                            approach enables to
                                            GroupByKey data sets
                                            that are too big to
                                            fit into memory (per
                                            key).

                                            If multiple
                                            iterations should be
                                            expected by users, we
                                            probably should:

                                             a) include that in
                                            @ValidatesRunner tests

                                             b) store values in
                                            memory on spark,
                                            which will break for
                                            certain pipelines

                                            Because of (b) I
                                            think that it would
                                            be much better to
                                            remove this
                                            "expectation" and
                                            clearly document that
                                            the Iterable is not
                                            supposed to be
                                            iterated multiple times.

                                            Jan

                                            On 9/27/19 9:27 AM,
                                            Jan Lukavský wrote:

                                            I pretty much think
                                            so, because that is
                                            how Spark works. The
                                            Iterable inside is
                                            really an Iterator,
                                            which cannot be
                                            iterated multiple times.

                                            Jan

                                            On 9/27/19 2:00 AM,
                                            Lukasz Cwik wrote:
                                            Jan, in Beam users
                                            expect to be able
                                            to iterate the GBK
                                            output multiple
                                            times even from
                                            within the same ParDo.
                                            Is this something
                                            that Beam on Spark
                                            Runner never supported?

                                            On Thu, Sep 26,
                                            2019 at 6:50 AM Jan
                                            Lukavský
                                            <je...@seznam.cz
                                            <mailto:je...@seznam.cz>>
                                            wrote:

                                                Hi Gershi,

                                                could you
                                                please outline
                                                the pipeline
                                                you are trying
                                                to execute?
                                                Basically, you
                                                cannot iterate
                                                the Iterable
                                                multiple times
                                                in single
                                                ParDo. It
                                                should be
                                                possible,
                                                though, to
                                                apply multiple
                                                ParDos to
                                                output from
                                                GroupByKey.

                                                Jan

                                                On 9/26/19 3:32
                                                PM, Gershi,
                                                Noam wrote:

                                                Hi,

                                                I want to
                                                iterate
                                                multiple times
                                                on the
                                                Iterable<V>
                                                (the output of
                                                GroupByKey
                                                transformation)

                                                When my Runner
                                                is
                                                SparkRunner, I
                                                get an exception:

                                                Caused by:
                                                java.lang.IllegalStateException:
                                                ValueIterator
                                                can't be
                                                iterated more
                                                than
                                                once,otherwise
                                                there could be
                                                data lost

                                                at
                                                
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)

                                                at
                                                
java.lang.Iterable.spliterator(Iterable.java:101)

                                                I understood I
                                                can branch the
                                                pipeline after
                                                GroupByKey
                                                into multiple
                                                transformation
                                                and iterate in
                                                each of them
                                                once on the
                                                Iterable<V>.

                                                Is there a
                                                better way for
                                                that?

                                                
citi_logo_mailciti_logo_mail*Noam
                                                Gershi*

                                                Software Developer

                                                *T*:+972 (3)
                                                7405718
                                                <tel:+972%203-740-5718>

                                                Mail_signature_blue

Reply via email to