Okay, the self-join example is understandable. But - does it imply that
it is actually required O(n^2) iterations (maybe caching can somehow
help, but asymptotically, the complexity will be this)? If so, that
seems to be very prohibitively slow (for large inputs that don't fit
into memory), and actually materializing the cartesian product might
help parallelize the process?
On 9/27/19 8:19 PM, Kenneth Knowles wrote:
CoGroupByKey is one example. To perform a CoGroupByKey based join
requires multiple iterations (caching is key to getting performance).
You could make up other calculations that require it, most of which
would look like a self-join, like "output the largest difference
between any two elements for each key". In both these examples,
multiple iterations avoids materializing a cartesian product but
allows more like a nested loop join.
On the philosophical side, an iterable behaves like a "value" (it has
well-defined contents, a size, etc) whereas an iterator is more of a
mutating process than a value. So you cannot easily reason about a
transform "for input X has output Y" when the output is a construct
that cannot be interpreted as a value at all.
Kenn
On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
I'd like to know the use-case. Why would you *need* to actually
iterate the grouped elements twice? By definition the first
iteration would have to extract some statistic (or subset of
elements that must fit into memory). This statistic can then be
used as another input for the second iteration. Why not then
calculate the statistic in a separate branch in the pipeline and
feed it then into the ParDo as side input? That would be
definitely more efficient, because the calculation of the
statistic would be probably combinable (not sure if it is
absolutely required to be combinable, but it seems probable). Even
if the calculation would not be combinable, it is not less
efficient than reiterating twice. Why then support multiple
iterations (besides the fact that output of GBK is Iterable). Am I
missing something?
Jan
On 9/27/19 6:36 PM, Reuven Lax wrote:
This should be an element in the compatibility matrix as well.
On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <[email protected]
<mailto:[email protected]>> wrote:
I am pretty surprised that we do not have
a @Category(ValidatesRunner) test in GroupByKeyTest that
iterates multiple times. That is a major oversight. We should
have this test, and it can be disabled by the SparkRunner's
configuration.
Kenn
On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <[email protected]
<mailto:[email protected]>> wrote:
The Dataflow version does not spill to disk. However
Spark's design might require spilling to disk if you want
that to be implemented properly.
On Fri, Sep 27, 2019 at 9:08 AM David Morávek
<[email protected] <mailto:[email protected]>> wrote:
Hi,
Spark's GBK is currently implemented using
`sortBy(key and value).mapPartition(...)` for
non-merging windowing in order to support large keys
and large scale shuffles. Merging windowing is
implemented using standard GBK (underlying spark
impl. uses ListCombiner + Hash Grouping), which is by
design unable to support large keys.
As Jan noted, problem with mapPartition is, that its
UDF receives an Iterator. Only option here is to wrap
this iterator to one that spills to disk once an
internal buffer is exceeded (the approach suggested
by Reuven). This unfortunately comes with a cost in
some cases. The best approach would be to somehow
determine, that user wants multiple iterations and
than wrap it in "re-iterator" if necessary. Does
anyone have any ideas how to approach this?
D.
On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax
<[email protected] <mailto:[email protected]>> wrote:
The Beam API was written to support multiple
iterations, and there are definitely transforms
that do so. I believe that CoGroupByKey may do
this as well with the resulting iterator.
I know that the Dataflow runner is able to
handles iterators larger than available memory by
paging them in from shuffle, which still allows
for reiterating. It sounds like Spark is less
flexible here?
Reuven
On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
+dev <[email protected]>
<mailto:[email protected]>
Lukasz, why do you think that users expect to
be able to iterate multiple times grouped
elements? Besides that it obviously suggests
the 'Iterable'? The way that spark behaves is
pretty much analogous to how MapReduce used
to work - in certain cases it calles
repartitionAndSortWithinPartitions and then
does mapPartition, which accepts Iterator -
that is because internally it merge sorts pre
sorted segments. This approach enables to
GroupByKey data sets that are too big to fit
into memory (per key).
If multiple iterations should be expected by
users, we probably should:
a) include that in @ValidatesRunner tests
b) store values in memory on spark, which
will break for certain pipelines
Because of (b) I think that it would be much
better to remove this "expectation" and
clearly document that the Iterable is not
supposed to be iterated multiple times.
Jan
On 9/27/19 9:27 AM, Jan Lukavský wrote:
I pretty much think so, because that is how
Spark works. The Iterable inside is really
an Iterator, which cannot be iterated
multiple times.
Jan
On 9/27/19 2:00 AM, Lukasz Cwik wrote:
Jan, in Beam users expect to be able to
iterate the GBK output multiple times even
from within the same ParDo.
Is this something that Beam on Spark Runner
never supported?
On Thu, Sep 26, 2019 at 6:50 AM Jan
Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Gershi,
could you please outline the pipeline
you are trying to execute? Basically,
you cannot iterate the Iterable
multiple times in single ParDo. It
should be possible, though, to apply
multiple ParDos to output from GroupByKey.
Jan
On 9/26/19 3:32 PM, Gershi, Noam wrote:
Hi,
I want to iterate multiple times on
the Iterable<V> (the output of
GroupByKey transformation)
When my Runner is SparkRunner, I get
an exception:
Caused by:
java.lang.IllegalStateException:
ValueIterator can't be iterated more
than once,otherwise there could be
data lost
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
at
java.lang.Iterable.spliterator(Iterable.java:101)
I understood I can branch the pipeline
after GroupByKey into multiple
transformation and iterate in each of
them once on the Iterable<V>.
Is there a better way for that?
citi_logo_mailciti_logo_mail*Noam Gershi*
Software Developer
*T*:+972 (3) 7405718
<tel:+972%203-740-5718>
Mail_signature_blue