> But - does it imply that it is actually required O(n^2)

I meant O(n) iterations, O(n^2) operations on elements.

On 9/27/19 8:31 PM, Jan Lukavský wrote:

Okay, the self-join example is understandable. But - does it imply that it is actually required O(n^2) iterations (maybe caching can somehow help, but asymptotically, the complexity will be this)? If so, that seems to be very prohibitively slow (for large inputs that don't fit into memory), and actually materializing the cartesian product might help parallelize the process?

On 9/27/19 8:19 PM, Kenneth Knowles wrote:
CoGroupByKey is one example. To perform a CoGroupByKey based join requires multiple iterations (caching is key to getting performance). You could make up other calculations that require it, most of which would look like a self-join, like "output the largest difference between any two elements for each key". In both these examples, multiple iterations avoids materializing a cartesian product but allows more like a nested loop join.

On the philosophical side, an iterable behaves like a "value" (it has well-defined contents, a size, etc) whereas an iterator is more of a mutating process than a value. So you cannot easily reason about a transform "for input X has output Y" when the output is a construct that cannot be interpreted as a value at all.

Kenn

On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    I'd like to know the use-case. Why would you *need* to actually
    iterate the grouped elements twice? By definition the first
    iteration would have to extract some statistic (or subset of
    elements that must fit into memory). This statistic can then be
    used as another input for the second iteration. Why not then
    calculate the statistic in a separate branch in the pipeline and
    feed it then into the ParDo as side input? That would be
    definitely more efficient, because the calculation of the
    statistic would be probably combinable (not sure if it is
    absolutely required to be combinable, but it seems probable).
    Even if the calculation would not be combinable, it is not less
    efficient than reiterating twice. Why then support multiple
    iterations (besides the fact that output of GBK is Iterable). Am
    I missing something?

    Jan

    On 9/27/19 6:36 PM, Reuven Lax wrote:
    This should be an element in the compatibility matrix as well.

    On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org
    <mailto:k...@apache.org>> wrote:

        I am pretty surprised that we do not have
        a @Category(ValidatesRunner) test in GroupByKeyTest that
        iterates multiple times. That is a major oversight. We
        should have this test, and it can be disabled by the
        SparkRunner's configuration.

        Kenn

        On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com
        <mailto:re...@google.com>> wrote:

            The Dataflow version does not spill to disk. However
            Spark's design might require spilling to disk if you
            want that to be implemented properly.

            On Fri, Sep 27, 2019 at 9:08 AM David Morávek
            <d...@apache.org <mailto:d...@apache.org>> wrote:

                Hi,

                Spark's GBK is currently implemented using
                `sortBy(key and value).mapPartition(...)` for
                non-merging windowing in order to support large keys
                and large scale shuffles. Merging windowing is
                implemented using standard GBK (underlying spark
                impl. uses ListCombiner + Hash Grouping), which is
                by design unable to support large keys.

                As Jan noted, problem with mapPartition is, that its
                UDF receives an Iterator. Only option here is to
                wrap this iterator to one that spills to disk once
                an internal buffer is exceeded (the approach
                suggested by Reuven). This unfortunately comes with
                a cost in some cases. The best approach would be to
                somehow determine, that user wants multiple
                iterations and than wrap it in "re-iterator" if
                necessary. Does anyone have any ideas how to
                approach this?

                D.

                On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax
                <re...@google.com <mailto:re...@google.com>> wrote:

                    The Beam API was written to support multiple
                    iterations, and there are definitely transforms
                    that do so. I believe that CoGroupByKey may do
                    this as well with the resulting iterator.

                    I know that the Dataflow runner is able to
                    handles iterators larger than available memory
                    by paging them in from shuffle, which still
                    allows for reiterating. It sounds like Spark is
                    less flexible here?

                    Reuven

                    On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský
                    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                        +dev <dev@beam.apache.org>
                        <mailto:dev@beam.apache.org>

                        Lukasz, why do you think that users expect
                        to be able to iterate multiple times grouped
                        elements? Besides that it obviously suggests
                        the 'Iterable'? The way that spark behaves
                        is pretty much analogous to how MapReduce
                        used to work - in certain cases it calles
                        repartitionAndSortWithinPartitions and then
                        does mapPartition, which accepts Iterator -
                        that is because internally it merge sorts
                        pre sorted segments. This approach enables
                        to GroupByKey data sets that are too big to
                        fit into memory (per key).

                        If multiple iterations should be expected by
                        users, we probably should:

                         a) include that in @ValidatesRunner tests

                         b) store values in memory on spark, which
                        will break for certain pipelines

                        Because of (b) I think that it would be much
                        better to remove this "expectation" and
                        clearly document that the Iterable is not
                        supposed to be iterated multiple times.

                        Jan

                        On 9/27/19 9:27 AM, Jan Lukavský wrote:

                        I pretty much think so, because that is how
                        Spark works. The Iterable inside is really
                        an Iterator, which cannot be iterated
                        multiple times.

                        Jan

                        On 9/27/19 2:00 AM, Lukasz Cwik wrote:
                        Jan, in Beam users expect to be able to
                        iterate the GBK output multiple times even
                        from within the same ParDo.
                        Is this something that Beam on Spark
                        Runner never supported?

                        On Thu, Sep 26, 2019 at 6:50 AM Jan
                        Lukavský <je...@seznam.cz
                        <mailto:je...@seznam.cz>> wrote:

                            Hi Gershi,

                            could you please outline the pipeline
                            you are trying to execute? Basically,
                            you cannot iterate the Iterable
                            multiple times in single ParDo. It
                            should be possible, though, to apply
                            multiple ParDos to output from GroupByKey.

                            Jan

                            On 9/26/19 3:32 PM, Gershi, Noam wrote:

                            Hi,

                            I want to iterate multiple times on
                            the Iterable<V> (the output of
                            GroupByKey transformation)

                            When my Runner is SparkRunner, I get
                            an exception:

                            Caused by:
                            java.lang.IllegalStateException:
                            ValueIterator can't be iterated more
                            than once,otherwise there could be
                            data lost

                            at
                            
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)

                            at
                            java.lang.Iterable.spliterator(Iterable.java:101)

                            I understood I can branch the
                            pipeline after GroupByKey into
                            multiple transformation and iterate
                            in each of them once on the Iterable<V>.

                            Is there a better way for that?

                            citi_logo_mailciti_logo_mail*Noam Gershi*

                            Software Developer

                            *T*:+972 (3) 7405718
                            <tel:+972%203-740-5718>

                            Mail_signature_blue

Reply via email to