+dev <dev@beam.apache.org>

Lukasz, why do you think that users expect to be able to iterate multiple times grouped elements? Besides that it obviously suggests the 'Iterable'? The way that spark behaves is pretty much analogous to how MapReduce used to work - in certain cases it calles repartitionAndSortWithinPartitions and then does mapPartition, which accepts Iterator - that is because internally it merge sorts pre sorted segments. This approach enables to GroupByKey data sets that are too big to fit into memory (per key).

If multiple iterations should be expected by users, we probably should:

 a) include that in @ValidatesRunner tests

 b) store values in memory on spark, which will break for certain pipelines

Because of (b) I think that it would be much better to remove this "expectation" and clearly document that the Iterable is not supposed to be iterated multiple times.

Jan

On 9/27/19 9:27 AM, Jan Lukavský wrote:

I pretty much think so, because that is how Spark works. The Iterable inside is really an Iterator, which cannot be iterated multiple times.

Jan

On 9/27/19 2:00 AM, Lukasz Cwik wrote:
Jan, in Beam users expect to be able to iterate the GBK output multiple times even from within the same ParDo.
Is this something that Beam on Spark Runner never supported?

On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi Gershi,

    could you please outline the pipeline you are trying to execute?
    Basically, you cannot iterate the Iterable multiple times in
    single ParDo. It should be possible, though, to apply multiple
    ParDos to output from GroupByKey.

    Jan

    On 9/26/19 3:32 PM, Gershi, Noam wrote:

    Hi,

    I want to iterate multiple times on the Iterable<V> (the output
    of GroupByKey transformation)

    When my Runner is SparkRunner, I get an exception:

    Caused by: java.lang.IllegalStateException: ValueIterator can't
    be iterated more than once,otherwise there could be data lost

                    at
    
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)

                    at java.lang.Iterable.spliterator(Iterable.java:101)

    I understood I can branch the pipeline after GroupByKey into
    multiple transformation and iterate in each of them once on the
    Iterable<V>.

    Is there a better way for that?

    citi_logo_mailciti_logo_mail*Noam Gershi*

    Software Developer

    *T*:+972 (3) 7405718 <tel:+972%203-740-5718>

    Mail_signature_blue

Reply via email to