Good point about sibling fusion requiring this.
The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does
imply that the output iterable can be iterated arbitrarily many
times.
I think this should remain the default for all the reasons
mentioned.
We could have opt-in to the weaker KV<K, Iterator<V>> version.
Agree that this is a property of the ParDo. A particular use of
a GBK has no idea what is downstream. If you owned the whole
pipeline, a special ParDo<Iterator<V>, Foo> would work. But to
make the types line up, this would require changes upstream,
which is not good.
Maybe something like this:
ParDo<Iterable<V>, Foo> {
@ProcessElement
void process(@OneShotIterator Iterator<V> iter) {
...
}
}
I've described all of this in terms of Java SDK. So we would
need a portable representation for all this metadata.
Kenn
On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
I think the behavior to make explicit is the need to
reiterate, not the need to handle large results. How
large of a result can be handled will always be dependent on
the runner, and each runner will probably have a different
definition of large keys. Reiteration however is a logical
difference in the programming API. Therefore I think it
makes sense to specify the latter. The need to reiterate is
a property of the downstream ParDo, so it should be
specified there - not on the GBK.
Reuven
On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Ok, I think I understand there might be some benefits of
this. Then I'd propose we make this clear on the GBK. If
we would support somehing like this:
PCollection<?> input = ....;
input.apply(GroupByKey.withLargeKeys());
then SparkRunner could expand this to
repartitionAndSortWithinPartitions only on this
PTransform, and fallback to the default (in memory) in
other situations. The default expansion of
LargeGroupByKey (let's say) would be classic GBK, so
that only runners that need to make sure that they don't
break reiterations can expand this.
WDYT?
Jan
On 9/27/19 8:56 PM, Reuven Lax wrote:
As I mentioned above, CoGroupByKey already takes
advantage of this. Reiterating is not the most common
use case, but it's definitely one that comes up. Also
keep in mind that this API has supported reiterating
for the past five years (since even before the SDK was
donated to Apache). Therefore you should assume that
users are relying on it, in ways we might not expect.
Historically, Google's Flume system had collections
that did not support reiterating (they were even called
OneShotCollections to make it clear). This was the
source of problems and user frustration, which was one
reason that in the original Dataflow SDK we made sure
that these iterables could be reiterated. Another
reason why it's advantageous for a runner to support
this is allowing for better fusion. If two ParDos A and
B both read from the same GroupByKey, it is nice to be
able to fuse them into one logical operator. For this,
you'll probably need a shuffle implementation that
allows two independent readers from the same shuffle
session.
How easy it is to implement reiterables that don't have
to fit in memory will depend on the runner. For
Dataflow it's possible because the shuffle session is
logically persistent, so the runner can simply reread
the shuffle session. For other runners with different
shuffle implementations, it might be harder to support
both properties. Maybe we should introduce a
new @RequiresReiteration annotation on ParDo? That way
the Spark runner can see this and switch to the
in-memory version just for groupings consumed by those
ParDos. Runners that already support reiteration can
ignore this annotation, so it should be backwards
compatible.
Reuven
On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
I'd like to know the use-case. Why would you *need*
to actually iterate the grouped elements twice? By
definition the first iteration would have to
extract some statistic (or subset of elements that
must fit into memory). This statistic can then be
used as another input for the second iteration. Why
not then calculate the statistic in a separate
branch in the pipeline and feed it then into the
ParDo as side input? That would be definitely more
efficient, because the calculation of the statistic
would be probably combinable (not sure if it is
absolutely required to be combinable, but it seems
probable). Even if the calculation would not be
combinable, it is not less efficient than
reiterating twice. Why then support multiple
iterations (besides the fact that output of GBK is
Iterable). Am I missing something?
Jan
On 9/27/19 6:36 PM, Reuven Lax wrote:
This should be an element in the compatibility
matrix as well.
On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
I am pretty surprised that we do not have
a @Category(ValidatesRunner) test
in GroupByKeyTest that iterates multiple
times. That is a major oversight. We should
have this test, and it can be disabled by the
SparkRunner's configuration.
Kenn
On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>>
wrote:
The Dataflow version does not spill to
disk. However Spark's design might require
spilling to disk if you want that to be
implemented properly.
On Fri, Sep 27, 2019 at 9:08 AM David
Morávek <d...@apache.org
<mailto:d...@apache.org>> wrote:
Hi,
Spark's GBK is currently implemented
using `sortBy(key and
value).mapPartition(...)` for
non-merging windowing in order to
support large keys and large scale
shuffles. Merging windowing is
implemented using standard GBK
(underlying spark impl. uses
ListCombiner + Hash Grouping), which
is by design unable to support large keys.
As Jan noted, problem with
mapPartition is, that its UDF receives
an Iterator. Only option here is to
wrap this iterator to one that spills
to disk once an internal buffer is
exceeded (the approach suggested by
Reuven). This unfortunately comes with
a cost in some cases. The best
approach would be to somehow
determine, that user wants multiple
iterations and than wrap it in
"re-iterator" if necessary. Does
anyone have any ideas how to approach
this?
D.
On Fri, Sep 27, 2019 at 5:46 PM Reuven
Lax <re...@google.com
<mailto:re...@google.com>> wrote:
The Beam API was written to
support multiple iterations, and
there are definitely transforms
that do so. I believe that
CoGroupByKey may do this as well
with the resulting iterator.
I know that the Dataflow runner is
able to handles iterators larger
than available memory by paging
them in from shuffle, which still
allows for reiterating. It sounds
like Spark is less flexible here?
Reuven
On Fri, Sep 27, 2019 at 3:04 AM
Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
+dev <dev@beam.apache.org>
<mailto:dev@beam.apache.org>
Lukasz, why do you think that
users expect to be able to
iterate multiple times grouped
elements? Besides that it
obviously suggests the
'Iterable'? The way that spark
behaves is pretty much
analogous to how MapReduce
used to work - in certain
cases it calles
repartitionAndSortWithinPartitions
and then does mapPartition,
which accepts Iterator - that
is because internally it merge
sorts pre sorted segments.
This approach enables to
GroupByKey data sets that are
too big to fit into memory
(per key).
If multiple iterations should
be expected by users, we
probably should:
a) include that in
@ValidatesRunner tests
b) store values in memory on
spark, which will break for
certain pipelines
Because of (b) I think that it
would be much better to remove
this "expectation" and clearly
document that the Iterable is
not supposed to be iterated
multiple times.
Jan
On 9/27/19 9:27 AM, Jan
Lukavský wrote:
I pretty much think so,
because that is how Spark
works. The Iterable inside is
really an Iterator, which
cannot be iterated multiple
times.
Jan
On 9/27/19 2:00 AM, Lukasz
Cwik wrote:
Jan, in Beam users expect to
be able to iterate the GBK
output multiple times even
from within the same ParDo.
Is this something that Beam
on Spark Runner never supported?
On Thu, Sep 26, 2019 at 6:50
AM Jan Lukavský
<je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi Gershi,
could you please outline
the pipeline you are
trying to execute?
Basically, you cannot
iterate the Iterable
multiple times in single
ParDo. It should be
possible, though, to
apply multiple ParDos to
output from GroupByKey.
Jan
On 9/26/19 3:32 PM,
Gershi, Noam wrote:
Hi,
I want to iterate
multiple times on the
Iterable<V> (the output
of GroupByKey
transformation)
When my Runner is
SparkRunner, I get an
exception:
Caused by:
java.lang.IllegalStateException:
ValueIterator can't be
iterated more than
once,otherwise there
could be data lost
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
at
java.lang.Iterable.spliterator(Iterable.java:101)
I understood I can
branch the pipeline
after GroupByKey into
multiple transformation
and iterate in each of
them once on the
Iterable<V>.
Is there a better way
for that?
citi_logo_mailciti_logo_mail*Noam
Gershi*
Software Developer
*T*:+972 (3) 7405718
<tel:+972%203-740-5718>
Mail_signature_blue