> But - does it imply that it is actually required O(n^2)
I meant O(n) iterations, O(n^2) operations on elements.
On 9/27/19 8:31 PM, Jan Lukavský wrote:
Okay, the self-join example is understandable. But - does it imply
that it is actually required O(n^2) iterations (maybe caching can
somehow help, but asymptotically, the complexity will be this)? If so,
that seems to be very prohibitively slow (for large inputs that don't
fit into memory), and actually materializing the cartesian product
might help parallelize the process?
On 9/27/19 8:19 PM, Kenneth Knowles wrote:
CoGroupByKey is one example. To perform a CoGroupByKey based join
requires multiple iterations (caching is key to getting performance).
You could make up other calculations that require it, most of which
would look like a self-join, like "output the largest difference
between any two elements for each key". In both these examples,
multiple iterations avoids materializing a cartesian product but
allows more like a nested loop join.
On the philosophical side, an iterable behaves like a "value" (it has
well-defined contents, a size, etc) whereas an iterator is more of a
mutating process than a value. So you cannot easily reason about a
transform "for input X has output Y" when the output is a construct
that cannot be interpreted as a value at all.
Kenn
On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
I'd like to know the use-case. Why would you *need* to actually
iterate the grouped elements twice? By definition the first
iteration would have to extract some statistic (or subset of
elements that must fit into memory). This statistic can then be
used as another input for the second iteration. Why not then
calculate the statistic in a separate branch in the pipeline and
feed it then into the ParDo as side input? That would be
definitely more efficient, because the calculation of the
statistic would be probably combinable (not sure if it is
absolutely required to be combinable, but it seems probable).
Even if the calculation would not be combinable, it is not less
efficient than reiterating twice. Why then support multiple
iterations (besides the fact that output of GBK is Iterable). Am
I missing something?
Jan
On 9/27/19 6:36 PM, Reuven Lax wrote:
This should be an element in the compatibility matrix as well.
On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
I am pretty surprised that we do not have
a @Category(ValidatesRunner) test in GroupByKeyTest that
iterates multiple times. That is a major oversight. We
should have this test, and it can be disabled by the
SparkRunner's configuration.
Kenn
On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
The Dataflow version does not spill to disk. However
Spark's design might require spilling to disk if you
want that to be implemented properly.
On Fri, Sep 27, 2019 at 9:08 AM David Morávek
<d...@apache.org <mailto:d...@apache.org>> wrote:
Hi,
Spark's GBK is currently implemented using
`sortBy(key and value).mapPartition(...)` for
non-merging windowing in order to support large keys
and large scale shuffles. Merging windowing is
implemented using standard GBK (underlying spark
impl. uses ListCombiner + Hash Grouping), which is
by design unable to support large keys.
As Jan noted, problem with mapPartition is, that its
UDF receives an Iterator. Only option here is to
wrap this iterator to one that spills to disk once
an internal buffer is exceeded (the approach
suggested by Reuven). This unfortunately comes with
a cost in some cases. The best approach would be to
somehow determine, that user wants multiple
iterations and than wrap it in "re-iterator" if
necessary. Does anyone have any ideas how to
approach this?
D.
On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
The Beam API was written to support multiple
iterations, and there are definitely transforms
that do so. I believe that CoGroupByKey may do
this as well with the resulting iterator.
I know that the Dataflow runner is able to
handles iterators larger than available memory
by paging them in from shuffle, which still
allows for reiterating. It sounds like Spark is
less flexible here?
Reuven
On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
+dev <dev@beam.apache.org>
<mailto:dev@beam.apache.org>
Lukasz, why do you think that users expect
to be able to iterate multiple times grouped
elements? Besides that it obviously suggests
the 'Iterable'? The way that spark behaves
is pretty much analogous to how MapReduce
used to work - in certain cases it calles
repartitionAndSortWithinPartitions and then
does mapPartition, which accepts Iterator -
that is because internally it merge sorts
pre sorted segments. This approach enables
to GroupByKey data sets that are too big to
fit into memory (per key).
If multiple iterations should be expected by
users, we probably should:
a) include that in @ValidatesRunner tests
b) store values in memory on spark, which
will break for certain pipelines
Because of (b) I think that it would be much
better to remove this "expectation" and
clearly document that the Iterable is not
supposed to be iterated multiple times.
Jan
On 9/27/19 9:27 AM, Jan Lukavský wrote:
I pretty much think so, because that is how
Spark works. The Iterable inside is really
an Iterator, which cannot be iterated
multiple times.
Jan
On 9/27/19 2:00 AM, Lukasz Cwik wrote:
Jan, in Beam users expect to be able to
iterate the GBK output multiple times even
from within the same ParDo.
Is this something that Beam on Spark
Runner never supported?
On Thu, Sep 26, 2019 at 6:50 AM Jan
Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi Gershi,
could you please outline the pipeline
you are trying to execute? Basically,
you cannot iterate the Iterable
multiple times in single ParDo. It
should be possible, though, to apply
multiple ParDos to output from GroupByKey.
Jan
On 9/26/19 3:32 PM, Gershi, Noam wrote:
Hi,
I want to iterate multiple times on
the Iterable<V> (the output of
GroupByKey transformation)
When my Runner is SparkRunner, I get
an exception:
Caused by:
java.lang.IllegalStateException:
ValueIterator can't be iterated more
than once,otherwise there could be
data lost
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
at
java.lang.Iterable.spliterator(Iterable.java:101)
I understood I can branch the
pipeline after GroupByKey into
multiple transformation and iterate
in each of them once on the Iterable<V>.
Is there a better way for that?
citi_logo_mailciti_logo_mail*Noam Gershi*
Software Developer
*T*:+972 (3) 7405718
<tel:+972%203-740-5718>
Mail_signature_blue