Hi,

Thanx for the reply.

So – re-iteration on grouped elements is a runner-dependent. Flink & DataFlow 
allows it, while  Spark isn’t.

Since we investigating  here the runners also, Does anyone have a list which 
runner allow\not-allow re-iteration?

Noam


From: [apache.org] Kenneth Knowles <[email protected]>
Sent: Friday, September 27, 2019 7:26 PM
To: dev
Cc: user
Subject: Re: Multiple iterations after GroupByKey with SparkRunner

I am pretty surprised that we do not have a @Category(ValidatesRunner) test in 
GroupByKeyTest that iterates multiple times. That is a major oversight. We 
should have this test, and it can be disabled by the SparkRunner's 
configuration.

Kenn

On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax 
<[email protected]<mailto:[email protected]>> wrote:
The Dataflow version does not spill to disk. However Spark's design might 
require spilling to disk if you want that to be implemented properly.

On Fri, Sep 27, 2019 at 9:08 AM David Morávek 
<[email protected]<mailto:[email protected]>> wrote:
Hi,

Spark's GBK is currently implemented using `sortBy(key and 
value).mapPartition(...)` for non-merging windowing in order to support large 
keys and large scale shuffles. Merging windowing is implemented using standard 
GBK (underlying spark impl. uses ListCombiner + Hash Grouping), which is by 
design unable to support large keys.

As Jan noted, problem with mapPartition is, that its UDF receives an Iterator. 
Only option here is to wrap this iterator to one that spills to disk once an 
internal buffer is exceeded (the approach suggested by Reuven). This 
unfortunately comes with a cost in some cases. The best approach would be to 
somehow determine, that user wants multiple iterations and than wrap it in 
"re-iterator" if necessary. Does anyone have any ideas how to approach this?

D.

On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax 
<[email protected]<mailto:[email protected]>> wrote:
The Beam API was written to support multiple iterations, and there are 
definitely transforms that do so. I believe that CoGroupByKey may do this as 
well with the resulting iterator.

I know that the Dataflow runner is able to handles iterators larger than 
available memory by paging them in from shuffle, which still allows for 
reiterating. It sounds like Spark is less flexible here?

Reuven

On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský 
<[email protected]<mailto:[email protected]>> wrote:

+dev <[email protected]><mailto:[email protected]>

Lukasz, why do you think that users expect to be able to iterate multiple times 
grouped elements? Besides that it obviously suggests the 'Iterable'? The way 
that spark behaves is pretty much analogous to how MapReduce used to work - in 
certain cases it calles repartitionAndSortWithinPartitions and then does 
mapPartition, which accepts Iterator - that is because internally it merge 
sorts pre sorted segments. This approach enables to GroupByKey data sets that 
are too big to fit into memory (per key).

If multiple iterations should be expected by users, we probably should:

 a) include that in @ValidatesRunner tests

 b) store values in memory on spark, which will break for certain pipelines

Because of (b) I think that it would be much better to remove this 
"expectation" and clearly document that the Iterable is not supposed to be 
iterated multiple times.

Jan
On 9/27/19 9:27 AM, Jan Lukavský wrote:

I pretty much think so, because that is how Spark works. The Iterable inside is 
really an Iterator, which cannot be iterated multiple times.

Jan
On 9/27/19 2:00 AM, Lukasz Cwik wrote:
Jan, in Beam users expect to be able to iterate the GBK output multiple times 
even from within the same ParDo.
Is this something that Beam on Spark Runner never supported?

On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský 
<[email protected]<mailto:[email protected]>> wrote:

Hi Gershi,

could you please outline the pipeline you are trying to execute? Basically, you 
cannot iterate the Iterable multiple times in single ParDo. It should be 
possible, though, to apply multiple ParDos to output from GroupByKey.

Jan
On 9/26/19 3:32 PM, Gershi, Noam wrote:
Hi,

I want to iterate multiple times on the Iterable<V> (the output of GroupByKey 
transformation)
When my Runner is SparkRunner, I get an exception:

Caused by: java.lang.IllegalStateException: ValueIterator can't be iterated 
more than once,otherwise there could be data lost
                at 
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java<https://urldefense.proofpoint.com/v2/url?u=http-3A__GroupNonMergingWindowsFunctions.java&d=DwQFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=fQWOlVJ0K7fMH1V-V2wbcwjQBpBYjRv-8EtoFNYcAZU&s=Ow9AGQZRtbMkhnvxbK4yzNOsKXIBsbO9l-MBvYh_PDs&e=>:221)
                at 
java.lang.Iterable.spliterator(Iterable.java<https://urldefense.proofpoint.com/v2/url?u=http-3A__Iterable.java&d=DwQFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=fQWOlVJ0K7fMH1V-V2wbcwjQBpBYjRv-8EtoFNYcAZU&s=p1jPFPtbIpWO-mRa5ACZ-OQD97zpImY-FLSpBnvsqSM&e=>:101)


I understood I can branch the pipeline after GroupByKey into multiple 
transformation and iterate in each of them once on the Iterable<V>.

Is there a better way for that?


[citi_logo_mail][citi_logo_mail]Noam Gershi
Software Developer
T: +972 (3) 7405718<tel:+972%203-740-5718>
[Mail_signature_blue]

Reply via email to