I see two main options here. Create an in memory Iterable as you do your first iteration. (poor implementation imo)
Separate your iterations as separate DoFn and call them separately with the PCollection output from Shuffle. There are many different paths but finding the most parallel way is probably the best. - Shannon On Fri, Sep 27, 2019 at 5:04 AM Jan Lukavský <[email protected]> wrote: > +dev <[email protected]> <[email protected]> > > Lukasz, why do you think that users expect to be able to iterate multiple > times grouped elements? Besides that it obviously suggests the 'Iterable'? > The way that spark behaves is pretty much analogous to how MapReduce used > to work - in certain cases it calles repartitionAndSortWithinPartitions and > then does mapPartition, which accepts Iterator - that is because internally > it merge sorts pre sorted segments. This approach enables to GroupByKey > data sets that are too big to fit into memory (per key). > > If multiple iterations should be expected by users, we probably should: > > a) include that in @ValidatesRunner tests > > b) store values in memory on spark, which will break for certain pipelines > > Because of (b) I think that it would be much better to remove this > "expectation" and clearly document that the Iterable is not supposed to be > iterated multiple times. > > Jan > On 9/27/19 9:27 AM, Jan Lukavský wrote: > > I pretty much think so, because that is how Spark works. The Iterable > inside is really an Iterator, which cannot be iterated multiple times. > > Jan > On 9/27/19 2:00 AM, Lukasz Cwik wrote: > > Jan, in Beam users expect to be able to iterate the GBK output multiple > times even from within the same ParDo. > Is this something that Beam on Spark Runner never supported? > > On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <[email protected]> wrote: > >> Hi Gershi, >> >> could you please outline the pipeline you are trying to execute? >> Basically, you cannot iterate the Iterable multiple times in single ParDo. >> It should be possible, though, to apply multiple ParDos to output from >> GroupByKey. >> >> Jan >> On 9/26/19 3:32 PM, Gershi, Noam wrote: >> >> Hi, >> >> >> >> I want to iterate multiple times on the Iterable<V> (the output of >> GroupByKey transformation) >> >> When my Runner is SparkRunner, I get an exception: >> >> >> >> Caused by: java.lang.IllegalStateException: ValueIterator can't be >> iterated more than once,otherwise there could be data lost >> >> at >> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221) >> >> at java.lang.Iterable.spliterator(Iterable.java:101) >> >> >> >> >> >> I understood I can branch the pipeline after GroupByKey into multiple >> transformation and iterate in each of them once on the Iterable<V>. >> >> >> >> Is there a better way for that? >> >> >> >> >> >> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi* >> >> Software Developer >> >> *T*: +972 (3) 7405718 <+972%203-740-5718> >> >> [image: Mail_signature_blue] >> >> >> >>
