I'd like to know the use-case. Why would you *need* to actually iterate
the grouped elements twice? By definition the first iteration would have
to extract some statistic (or subset of elements that must fit into
memory). This statistic can then be used as another input for the second
iteration. Why not then calculate the statistic in a separate branch in
the pipeline and feed it then into the ParDo as side input? That would
be definitely more efficient, because the calculation of the statistic
would be probably combinable (not sure if it is absolutely required to
be combinable, but it seems probable). Even if the calculation would not
be combinable, it is not less efficient than reiterating twice. Why then
support multiple iterations (besides the fact that output of GBK is
Iterable). Am I missing something?
Jan
On 9/27/19 6:36 PM, Reuven Lax wrote:
This should be an element in the compatibility matrix as well.
On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
I am pretty surprised that we do not have
a @Category(ValidatesRunner) test in GroupByKeyTest that iterates
multiple times. That is a major oversight. We should have this
test, and it can be disabled by the SparkRunner's configuration.
Kenn
On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
The Dataflow version does not spill to disk. However Spark's
design might require spilling to disk if you want that to be
implemented properly.
On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org
<mailto:d...@apache.org>> wrote:
Hi,
Spark's GBK is currently implemented using `sortBy(key and
value).mapPartition(...)` for non-merging windowing in
order to support large keys and large scale shuffles.
Merging windowing is implemented using standard GBK
(underlying spark impl. uses ListCombiner + Hash
Grouping), which is by design unable to support large keys.
As Jan noted, problem with mapPartition is, that its UDF
receives an Iterator. Only option here is to wrap this
iterator to one that spills to disk once an internal
buffer is exceeded (the approach suggested by Reuven).
This unfortunately comes with a cost in some cases. The
best approach would be to somehow determine, that user
wants multiple iterations and than wrap it in
"re-iterator" if necessary. Does anyone have any ideas how
to approach this?
D.
On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
The Beam API was written to support multiple
iterations, and there are definitely transforms that
do so. I believe that CoGroupByKey may do this as well
with the resulting iterator.
I know that the Dataflow runner is able to handles
iterators larger than available memory by paging them
in from shuffle, which still allows for reiterating.
It sounds like Spark is less flexible here?
Reuven
On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
+dev <dev@beam.apache.org>
<mailto:dev@beam.apache.org>
Lukasz, why do you think that users expect to be
able to iterate multiple times grouped elements?
Besides that it obviously suggests the 'Iterable'?
The way that spark behaves is pretty much
analogous to how MapReduce used to work - in
certain cases it calles
repartitionAndSortWithinPartitions and then does
mapPartition, which accepts Iterator - that is
because internally it merge sorts pre sorted
segments. This approach enables to GroupByKey data
sets that are too big to fit into memory (per key).
If multiple iterations should be expected by
users, we probably should:
a) include that in @ValidatesRunner tests
b) store values in memory on spark, which will
break for certain pipelines
Because of (b) I think that it would be much
better to remove this "expectation" and clearly
document that the Iterable is not supposed to be
iterated multiple times.
Jan
On 9/27/19 9:27 AM, Jan Lukavský wrote:
I pretty much think so, because that is how Spark
works. The Iterable inside is really an Iterator,
which cannot be iterated multiple times.
Jan
On 9/27/19 2:00 AM, Lukasz Cwik wrote:
Jan, in Beam users expect to be able to iterate
the GBK output multiple times even from within
the same ParDo.
Is this something that Beam on Spark Runner
never supported?
On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Hi Gershi,
could you please outline the pipeline you
are trying to execute? Basically, you cannot
iterate the Iterable multiple times in
single ParDo. It should be possible, though,
to apply multiple ParDos to output from
GroupByKey.
Jan
On 9/26/19 3:32 PM, Gershi, Noam wrote:
Hi,
I want to iterate multiple times on the
Iterable<V> (the output of GroupByKey
transformation)
When my Runner is SparkRunner, I get an
exception:
Caused by: java.lang.IllegalStateException:
ValueIterator can't be iterated more than
once,otherwise there could be data lost
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
at
java.lang.Iterable.spliterator(Iterable.java:101)
I understood I can branch the pipeline
after GroupByKey into multiple
transformation and iterate in each of them
once on the Iterable<V>.
Is there a better way for that?
citi_logo_mailciti_logo_mail*Noam Gershi*
Software Developer
*T*:+972 (3) 7405718 <tel:+972%203-740-5718>
Mail_signature_blue