Thinking about it a little more - maybe there could be another systematic approach to described problems. We (currently) have multiple layers of SDK/DSLs. It is imaginable, that each layer can have somewhat different requirements and guarantees - actually, this might be even logical, as each "layer" should serve a somewhat different purpose. I was always thinking about it, as it should be so that SDK should be the most generic, highest level of abstraction. That is implied by that DSLs should be expandable into "primitive" transforms from SDK.

On the other hand, it is absolutely imaginable, that SDK and DSLs be somewhat complementary. It would require DSLs to provide default expansions, but to override them in runners that do not satisfy the requirements (Spark runner in this case).

What I mean by that - in Euphoria, we have a concept called ReduceByKey, which is a essentially a chain of GroupByKey and ParDo. It is already written to use java.util.stream.Stream, therefore cannot confuse users regarding multiple iterations. What would be needed is to directly override this PTransform in SparkRunner. This override would then use the specific optimization, that is currently responsible for not being able to reiterate result from GBK. It would mean, that if user wants to reiterate, or is okay with using the default Spark implementation, which stores grouped elements in memory, it could use raw SDK (GBK + ParDo). If user wanted to make use of the optimization for large keys, he would have to use Euphoria's RBK. It would be change to pipeline, but pretty much a drop-in replacement of chained GBK + ParDo. I think this solves all the issues and can be easily documented and explained to users.

It would be even possible to enable runners to provide interface for injecting these overrides for specific DSLs, which would be cool, because the PTransform definition and associated runner override could be located outside of core and the runner itself. By that we could even solve another issue, that different DSLs can have different guarantees regarding various properties of stream processing - to mention one of the most obvious, sorting by timestamp in stateful processing, although supporting for that in core SDK probably would not be that much intrusive.

Jan

On 9/28/19 9:25 AM, Jan Lukavský wrote:

I understand the concerns. Still, it looks a little like we want to be able to modify behavior of an object from inside a submodule - quite like if my subprogram would accept a Map interface, but internally I would say "hey, this is supposed to be a HashMap, please change it so". Because of how pipeline is constructed, we can do that, the question is if there really isn't a better solution.

What I do not like about the proposed solution:

 1) to specify that the grouped elements are supposed to be iterated only once can be done only on ParDo, although there are other (higher level) PTransforms, that can consume output of GBK

 2) the annontation on ParDo is by definition generic - i.e. can be used on input which is not output of GBK, which makes no sense

 3) we modify the behavior to unlock some optimizations (or change of behavior of the GBK itself), users will not understand that

 4) the annotation somewhat arbitrarily modifies data types passed, that is counter-intuitive and will be source of confusion

I think that a solution that solves the above problems (but brings somoe new, as always :-)), could be to change the output of GBK from PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can control which operators (and how) consume the grouping, and we can enable these transforms to specify additional parameters (like how they want to consume the grouping). It is obviously a breaking change (although can be probably made backwards compatible) and it would very much likely involve a substantial work. But maybe there are some other not yet discussed options.

Jan

On 9/28/19 6:46 AM, Reuven Lax wrote:
In many cases, the writer of the ParDo has no access to the GBK (e.g. the GBK is hidden inside an upstream PTransform that they cannot modify). This is the same reason why RequiresStableInput was made a property of the ParDo, because the GroupByKey is quite often inaccessible.

The car analogy doesn't quite apply here, because the runner does have a full view of the pipeline so can satisfy all constraints. The car dealership generally cannot read your mind (thankfully!), so you have to specify what you want. Or to put it another way, the various transforms in a Beam pipeline do not live in isolation. The full pipeline graph is what is executed, and the runner already has to analyze the full graph to run the pipeline (as well as to optimize the pipeline).

Reuven

On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    I'd suggest Stream instead of Iterator, it has the same semantics
    and much better API.

    Still not sure, what is wrong on letting the GBK to decide this.
    I have an analogy - if I decide to buy a car, I have to decide
    *what* car I'm going to buy (by think about how I'm going to use
    it) *before* I buy it. I cannot just buy "a car" and then change
    it from minivan to sport car based on my actual need. Same with
    the GBK - if I want to be able to reiterate the result, then I
    should tell it in advance.

    Jan

    On 9/27/19 10:50 PM, Kenneth Knowles wrote:
    Good point about sibling fusion requiring this.

    The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does
    imply that the output iterable can be iterated arbitrarily many
    times.

    I think this should remain the default for all the reasons
    mentioned.

    We could have opt-in to the weaker KV<K, Iterator<V>> version.
    Agree that this is a property of the ParDo. A particular use of
    a GBK has no idea what is downstream. If you owned the whole
    pipeline, a special ParDo<Iterator<V>, Foo> would work. But to
    make the types line up, this would require changes upstream,
    which is not good.

    Maybe something like this:

    ParDo<Iterable<V>, Foo> {
      @ProcessElement
      void process(@OneShotIterator Iterator<V> iter) {
        ...
      }
    }

    I've described all of this in terms of Java SDK. So we would
    need a portable representation for all this metadata.

    Kenn

    On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com
    <mailto:re...@google.com>> wrote:

        I think the behavior to make explicit is the need to
        reiterate, not the need to handle large results. How
        large of a result can be handled will always be dependent on
        the runner, and each runner will probably have a different
        definition of large keys. Reiteration however is a logical
        difference in the programming API. Therefore I think it
        makes sense to specify the latter. The need to reiterate is
        a property of the downstream ParDo, so it should be
        specified there - not on the GBK.

        Reuven

        On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

            Ok, I think I understand there might be some benefits of
            this. Then I'd propose we make this clear on the GBK. If
            we would support somehing like this:

             PCollection<?> input = ....;

             input.apply(GroupByKey.withLargeKeys());

            then SparkRunner could expand this to
            repartitionAndSortWithinPartitions only on this
            PTransform, and fallback to the default (in memory) in
            other situations. The default expansion of
            LargeGroupByKey (let's say) would be classic GBK, so
            that only runners that need to make sure that they don't
            break reiterations can expand this.

            WDYT?

            Jan

            On 9/27/19 8:56 PM, Reuven Lax wrote:
            As I mentioned above, CoGroupByKey already takes
            advantage of this. Reiterating is not the most common
            use case, but it's definitely one that comes up. Also
            keep in mind that this API has supported reiterating
            for the past five years (since even before the SDK was
            donated to Apache). Therefore you should assume that
            users are relying on it, in ways we might not expect.

            Historically, Google's Flume system had collections
            that did not support reiterating (they were even called
            OneShotCollections to make it clear). This was the
            source of problems and user frustration, which was one
            reason that in the original Dataflow SDK we made sure
            that these iterables could be reiterated. Another
            reason why it's advantageous for a runner to support
            this is allowing for better fusion. If two ParDos A and
            B both read from the same GroupByKey, it is nice to be
            able to fuse them into one logical operator. For this,
            you'll probably need a shuffle implementation that
            allows two independent readers from the same shuffle
            session.

            How easy it is to implement reiterables that don't have
            to fit in memory will depend on the runner.  For
            Dataflow it's possible because the shuffle session is
            logically persistent, so the runner can simply reread
            the shuffle session. For other runners with different
            shuffle implementations, it might be harder to support
            both properties. Maybe we should introduce a
            new @RequiresReiteration annotation on ParDo? That way
            the Spark runner can see this and switch to the
            in-memory version just for groupings consumed by those
            ParDos. Runners that already support reiteration can
            ignore this annotation, so it should be backwards
            compatible.

            Reuven

            On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                I'd like to know the use-case. Why would you *need*
                to actually iterate the grouped elements twice? By
                definition the first iteration would have to
                extract some statistic (or subset of elements that
                must fit into memory). This statistic can then be
                used as another input for the second iteration. Why
                not then calculate the statistic in a separate
                branch in the pipeline and feed it then into the
                ParDo as side input? That would be definitely more
                efficient, because the calculation of the statistic
                would be probably combinable (not sure if it is
                absolutely required to be combinable, but it seems
                probable). Even if the calculation would not be
                combinable, it is not less efficient than
                reiterating twice. Why then support multiple
                iterations (besides the fact that output of GBK is
                Iterable). Am I missing something?

                Jan

                On 9/27/19 6:36 PM, Reuven Lax wrote:
                This should be an element in the compatibility
                matrix as well.

                On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles
                <k...@apache.org <mailto:k...@apache.org>> wrote:

                    I am pretty surprised that we do not have
                    a @Category(ValidatesRunner) test
                    in GroupByKeyTest that iterates multiple
                    times. That is a major oversight. We should
                    have this test, and it can be disabled by the
                    SparkRunner's configuration.

                    Kenn

                    On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax
                    <re...@google.com <mailto:re...@google.com>>
                    wrote:

                        The Dataflow version does not spill to
                        disk. However Spark's design might require
                        spilling to disk if you want that to be
                        implemented properly.

                        On Fri, Sep 27, 2019 at 9:08 AM David
                        Morávek <d...@apache.org
                        <mailto:d...@apache.org>> wrote:

                            Hi,

                            Spark's GBK is currently implemented
                            using `sortBy(key and
                            value).mapPartition(...)` for
                            non-merging windowing in order to
                            support large keys and large scale
                            shuffles. Merging windowing is
                            implemented using standard GBK
                            (underlying spark impl. uses
                            ListCombiner + Hash Grouping), which
                            is by design unable to support large keys.

                            As Jan noted, problem with
                            mapPartition is, that its UDF receives
                            an Iterator. Only option here is to
                            wrap this iterator to one that spills
                            to disk once an internal buffer is
                            exceeded (the approach suggested by
                            Reuven). This unfortunately comes with
                            a cost in some cases. The best
                            approach would be to somehow
                            determine, that user wants multiple
                            iterations and than wrap it in
                            "re-iterator" if necessary. Does
                            anyone have any ideas how to approach
                            this?

                            D.

                            On Fri, Sep 27, 2019 at 5:46 PM Reuven
                            Lax <re...@google.com
                            <mailto:re...@google.com>> wrote:

                                The Beam API was written to
                                support multiple iterations, and
                                there are definitely transforms
                                that do so. I believe that
                                CoGroupByKey may do this as well
                                with the resulting iterator.

                                I know that the Dataflow runner is
                                able to handles iterators larger
                                than available memory by paging
                                them in from shuffle, which still
                                allows for reiterating. It sounds
                                like Spark is less flexible here?

                                Reuven

                                On Fri, Sep 27, 2019 at 3:04 AM
                                Jan Lukavský <je...@seznam.cz
                                <mailto:je...@seznam.cz>> wrote:

                                    +dev <dev@beam.apache.org>
                                    <mailto:dev@beam.apache.org>

                                    Lukasz, why do you think that
                                    users expect to be able to
                                    iterate multiple times grouped
                                    elements? Besides that it
                                    obviously suggests the
                                    'Iterable'? The way that spark
                                    behaves is pretty much
                                    analogous to how MapReduce
                                    used to work - in certain
                                    cases it calles
                                    repartitionAndSortWithinPartitions
                                    and then does mapPartition,
                                    which accepts Iterator - that
                                    is because internally it merge
                                    sorts pre sorted segments.
                                    This approach enables to
                                    GroupByKey data sets that are
                                    too big to fit into memory
                                    (per key).

                                    If multiple iterations should
                                    be expected by users, we
                                    probably should:

                                     a) include that in
                                    @ValidatesRunner tests

                                     b) store values in memory on
                                    spark, which will break for
                                    certain pipelines

                                    Because of (b) I think that it
                                    would be much better to remove
                                    this "expectation" and clearly
                                    document that the Iterable is
                                    not supposed to be iterated
                                    multiple times.

                                    Jan

                                    On 9/27/19 9:27 AM, Jan
                                    Lukavský wrote:

                                    I pretty much think so,
                                    because that is how Spark
                                    works. The Iterable inside is
                                    really an Iterator, which
                                    cannot be iterated multiple
                                    times.

                                    Jan

                                    On 9/27/19 2:00 AM, Lukasz
                                    Cwik wrote:
                                    Jan, in Beam users expect to
                                    be able to iterate the GBK
                                    output multiple times even
                                    from within the same ParDo.
                                    Is this something that Beam
                                    on Spark Runner never supported?

                                    On Thu, Sep 26, 2019 at 6:50
                                    AM Jan Lukavský
                                    <je...@seznam.cz
                                    <mailto:je...@seznam.cz>> wrote:

                                        Hi Gershi,

                                        could you please outline
                                        the pipeline you are
                                        trying to execute?
                                        Basically, you cannot
                                        iterate the Iterable
                                        multiple times in single
                                        ParDo. It should be
                                        possible, though, to
                                        apply multiple ParDos to
                                        output from GroupByKey.

                                        Jan

                                        On 9/26/19 3:32 PM,
                                        Gershi, Noam wrote:

                                        Hi,

                                        I want to iterate
                                        multiple times on the
                                        Iterable<V> (the output
                                        of GroupByKey
                                        transformation)

                                        When my Runner is
                                        SparkRunner, I get an
                                        exception:

                                        Caused by:
                                        java.lang.IllegalStateException:
                                        ValueIterator can't be
                                        iterated more than
                                        once,otherwise there
                                        could be data lost

                                        at
                                        
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)

                                        at
                                        
java.lang.Iterable.spliterator(Iterable.java:101)

                                        I understood I can
                                        branch the pipeline
                                        after GroupByKey into
                                        multiple transformation
                                        and iterate in each of
                                        them once on the
                                        Iterable<V>.

                                        Is there a better way
                                        for that?

                                        citi_logo_mailciti_logo_mail*Noam
                                        Gershi*

                                        Software Developer

                                        *T*:+972 (3) 7405718
                                        <tel:+972%203-740-5718>

                                        Mail_signature_blue

Reply via email to