I'd like to know the use-case. Why would you *need* to actually iterate the grouped elements twice? By definition the first iteration would have to extract some statistic (or subset of elements that must fit into memory). This statistic can then be used as another input for the second iteration. Why not then calculate the statistic in a separate branch in the pipeline and feed it then into the ParDo as side input? That would be definitely more efficient, because the calculation of the statistic would be probably combinable (not sure if it is absolutely required to be combinable, but it seems probable). Even if the calculation would not be combinable, it is not less efficient than reiterating twice. Why then support multiple iterations (besides the fact that output of GBK is Iterable). Am I missing something?

Jan

On 9/27/19 6:36 PM, Reuven Lax wrote:
This should be an element in the compatibility matrix as well.

On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> wrote:

    I am pretty surprised that we do not have
    a @Category(ValidatesRunner) test in GroupByKeyTest that iterates
    multiple times. That is a major oversight. We should have this
    test, and it can be disabled by the SparkRunner's configuration.

    Kenn

    On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com
    <mailto:re...@google.com>> wrote:

        The Dataflow version does not spill to disk. However Spark's
        design might require spilling to disk if you want that to be
        implemented properly.

        On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org
        <mailto:d...@apache.org>> wrote:

            Hi,

            Spark's GBK is currently implemented using `sortBy(key and
            value).mapPartition(...)` for non-merging windowing in
            order to support large keys and large scale shuffles.
            Merging windowing is implemented using standard GBK
            (underlying spark impl. uses ListCombiner + Hash
            Grouping), which is by design unable to support large keys.

            As Jan noted, problem with mapPartition is, that its UDF
            receives an Iterator. Only option here is to wrap this
            iterator to one that spills to disk once an internal
            buffer is exceeded (the approach suggested by Reuven).
            This unfortunately comes with a cost in some cases. The
            best approach would be to somehow determine, that user
            wants multiple iterations and than wrap it in
            "re-iterator" if necessary. Does anyone have any ideas how
            to approach this?

            D.

            On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax
            <re...@google.com <mailto:re...@google.com>> wrote:

                The Beam API was written to support multiple
                iterations, and there are definitely transforms that
                do so. I believe that CoGroupByKey may do this as well
                with the resulting iterator.

                I know that the Dataflow runner is able to handles
                iterators larger than available memory by paging them
                in from shuffle, which still allows for reiterating.
                It sounds like Spark is less flexible here?

                Reuven

                On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský
                <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                    +dev <dev@beam.apache.org>
                    <mailto:dev@beam.apache.org>

                    Lukasz, why do you think that users expect to be
                    able to iterate multiple times grouped elements?
                    Besides that it obviously suggests the 'Iterable'?
                    The way that spark behaves is pretty much
                    analogous to how MapReduce used to work - in
                    certain cases it calles
                    repartitionAndSortWithinPartitions and then does
                    mapPartition, which accepts Iterator - that is
                    because internally it merge sorts pre sorted
                    segments. This approach enables to GroupByKey data
                    sets that are too big to fit into memory (per key).

                    If multiple iterations should be expected by
                    users, we probably should:

                     a) include that in @ValidatesRunner tests

                     b) store values in memory on spark, which will
                    break for certain pipelines

                    Because of (b) I think that it would be much
                    better to remove this "expectation" and clearly
                    document that the Iterable is not supposed to be
                    iterated multiple times.

                    Jan

                    On 9/27/19 9:27 AM, Jan Lukavský wrote:

                    I pretty much think so, because that is how Spark
                    works. The Iterable inside is really an Iterator,
                    which cannot be iterated multiple times.

                    Jan

                    On 9/27/19 2:00 AM, Lukasz Cwik wrote:
                    Jan, in Beam users expect to be able to iterate
                    the GBK output multiple times even from within
                    the same ParDo.
                    Is this something that Beam on Spark Runner
                    never supported?

                    On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský
                    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                        Hi Gershi,

                        could you please outline the pipeline you
                        are trying to execute? Basically, you cannot
                        iterate the Iterable multiple times in
                        single ParDo. It should be possible, though,
                        to apply multiple ParDos to output from
                        GroupByKey.

                        Jan

                        On 9/26/19 3:32 PM, Gershi, Noam wrote:

                        Hi,

                        I want to iterate multiple times on the
                        Iterable<V> (the output of GroupByKey
                        transformation)

                        When my Runner is SparkRunner, I get an
                        exception:

                        Caused by: java.lang.IllegalStateException:
                        ValueIterator can't be iterated more than
                        once,otherwise there could be data lost

                        at
                        
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)

                        at
                        java.lang.Iterable.spliterator(Iterable.java:101)

                        I understood I can branch the pipeline
                        after GroupByKey into multiple
                        transformation and iterate in each of them
                        once on the Iterable<V>.

                        Is there a better way for that?

                        citi_logo_mailciti_logo_mail*Noam Gershi*

                        Software Developer

                        *T*:+972 (3) 7405718 <tel:+972%203-740-5718>

                        Mail_signature_blue

Reply via email to