I'd suggest Stream instead of Iterator, it has the same semantics and
much better API.
Still not sure, what is wrong on letting the GBK to decide this. I have
an analogy - if I decide to buy a car, I have to decide *what* car I'm
going to buy (by think about how I'm going to use it) *before* I buy it.
I cannot just buy "a car" and then change it from minivan to sport car
based on my actual need. Same with the GBK - if I want to be able to
reiterate the result, then I should tell it in advance.
Jan
On 9/27/19 10:50 PM, Kenneth Knowles wrote:
Good point about sibling fusion requiring this.
The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does imply
that the output iterable can be iterated arbitrarily many times.
I think this should remain the default for all the reasons mentioned.
We could have opt-in to the weaker KV<K, Iterator<V>> version. Agree
that this is a property of the ParDo. A particular use of a GBK has no
idea what is downstream. If you owned the whole pipeline, a special
ParDo<Iterator<V>, Foo> would work. But to make the types line up,
this would require changes upstream, which is not good.
Maybe something like this:
ParDo<Iterable<V>, Foo> {
@ProcessElement
void process(@OneShotIterator Iterator<V> iter) {
...
}
}
I've described all of this in terms of Java SDK. So we would need a
portable representation for all this metadata.
Kenn
On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
I think the behavior to make explicit is the need to reiterate,
not the need to handle large results. How large of a result can be
handled will always be dependent on the runner, and each runner
will probably have a different definition of large keys.
Reiteration however is a logical difference in the programming
API. Therefore I think it makes sense to specify the latter. The
need to reiterate is a property of the downstream ParDo, so it
should be specified there - not on the GBK.
Reuven
On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Ok, I think I understand there might be some benefits of this.
Then I'd propose we make this clear on the GBK. If we would
support somehing like this:
PCollection<?> input = ....;
input.apply(GroupByKey.withLargeKeys());
then SparkRunner could expand this to
repartitionAndSortWithinPartitions only on this PTransform,
and fallback to the default (in memory) in other situations.
The default expansion of LargeGroupByKey (let's say) would be
classic GBK, so that only runners that need to make sure that
they don't break reiterations can expand this.
WDYT?
Jan
On 9/27/19 8:56 PM, Reuven Lax wrote:
As I mentioned above, CoGroupByKey already takes advantage of
this. Reiterating is not the most common use case, but it's
definitely one that comes up. Also keep in mind that this API
has supported reiterating for the past five years (since even
before the SDK was donated to Apache). Therefore you should
assume that users are relying on it, in ways we might not
expect.
Historically, Google's Flume system had collections that did
not support reiterating (they were even called
OneShotCollections to make it clear). This was the source of
problems and user frustration, which was one reason that in
the original Dataflow SDK we made sure that these iterables
could be reiterated. Another reason why it's advantageous for
a runner to support this is allowing for better fusion. If
two ParDos A and B both read from the same GroupByKey, it is
nice to be able to fuse them into one logical operator. For
this, you'll probably need a shuffle implementation that
allows two independent readers from the same shuffle session.
How easy it is to implement reiterables that don't have to
fit in memory will depend on the runner. For Dataflow it's
possible because the shuffle session is logically persistent,
so the runner can simply reread the shuffle session. For
other runners with different shuffle implementations, it
might be harder to support both properties. Maybe we should
introduce a new @RequiresReiteration annotation on ParDo?
That way the Spark runner can see this and switch to the
in-memory version just for groupings consumed by those
ParDos. Runners that already support reiteration can ignore
this annotation, so it should be backwards compatible.
Reuven
On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
I'd like to know the use-case. Why would you *need* to
actually iterate the grouped elements twice? By
definition the first iteration would have to extract some
statistic (or subset of elements that must fit into
memory). This statistic can then be used as another input
for the second iteration. Why not then calculate the
statistic in a separate branch in the pipeline and feed
it then into the ParDo as side input? That would be
definitely more efficient, because the calculation of the
statistic would be probably combinable (not sure if it is
absolutely required to be combinable, but it seems
probable). Even if the calculation would not be
combinable, it is not less efficient than reiterating
twice. Why then support multiple iterations (besides the
fact that output of GBK is Iterable). Am I missing something?
Jan
On 9/27/19 6:36 PM, Reuven Lax wrote:
This should be an element in the compatibility matrix as
well.
On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
I am pretty surprised that we do not have
a @Category(ValidatesRunner) test in GroupByKeyTest
that iterates multiple times. That is a major
oversight. We should have this test, and it can be
disabled by the SparkRunner's configuration.
Kenn
On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
The Dataflow version does not spill to disk.
However Spark's design might require spilling to
disk if you want that to be implemented properly.
On Fri, Sep 27, 2019 at 9:08 AM David Morávek
<d...@apache.org <mailto:d...@apache.org>> wrote:
Hi,
Spark's GBK is currently implemented using
`sortBy(key and value).mapPartition(...)`
for non-merging windowing in order to
support large keys and large scale shuffles.
Merging windowing is implemented using
standard GBK (underlying spark impl. uses
ListCombiner + Hash Grouping), which is by
design unable to support large keys.
As Jan noted, problem with mapPartition is,
that its UDF receives an Iterator. Only
option here is to wrap this iterator to one
that spills to disk once an internal buffer
is exceeded (the approach suggested by
Reuven). This unfortunately comes with a
cost in some cases. The best approach would
be to somehow determine, that user wants
multiple iterations and than wrap it in
"re-iterator" if necessary. Does anyone have
any ideas how to approach this?
D.
On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax
<re...@google.com <mailto:re...@google.com>>
wrote:
The Beam API was written to support
multiple iterations, and there are
definitely transforms that do so. I
believe that CoGroupByKey may do this as
well with the resulting iterator.
I know that the Dataflow runner is able
to handles iterators larger than
available memory by paging them in from
shuffle, which still allows for
reiterating. It sounds like Spark is
less flexible here?
Reuven
On Fri, Sep 27, 2019 at 3:04 AM Jan
Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
+dev <dev@beam.apache.org>
<mailto:dev@beam.apache.org>
Lukasz, why do you think that users
expect to be able to iterate
multiple times grouped elements?
Besides that it obviously suggests
the 'Iterable'? The way that spark
behaves is pretty much analogous to
how MapReduce used to work - in
certain cases it calles
repartitionAndSortWithinPartitions
and then does mapPartition, which
accepts Iterator - that is because
internally it merge sorts pre sorted
segments. This approach enables to
GroupByKey data sets that are too
big to fit into memory (per key).
If multiple iterations should be
expected by users, we probably should:
a) include that in @ValidatesRunner
tests
b) store values in memory on spark,
which will break for certain pipelines
Because of (b) I think that it would
be much better to remove this
"expectation" and clearly document
that the Iterable is not supposed to
be iterated multiple times.
Jan
On 9/27/19 9:27 AM, Jan Lukavský wrote:
I pretty much think so, because
that is how Spark works. The
Iterable inside is really an
Iterator, which cannot be iterated
multiple times.
Jan
On 9/27/19 2:00 AM, Lukasz Cwik wrote:
Jan, in Beam users expect to be
able to iterate the GBK output
multiple times even from within
the same ParDo.
Is this something that Beam on
Spark Runner never supported?
On Thu, Sep 26, 2019 at 6:50 AM
Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi Gershi,
could you please outline the
pipeline you are trying to
execute? Basically, you cannot
iterate the Iterable multiple
times in single ParDo. It
should be possible, though, to
apply multiple ParDos to
output from GroupByKey.
Jan
On 9/26/19 3:32 PM, Gershi,
Noam wrote:
Hi,
I want to iterate multiple
times on the Iterable<V> (the
output of GroupByKey
transformation)
When my Runner is
SparkRunner, I get an exception:
Caused by:
java.lang.IllegalStateException:
ValueIterator can't be
iterated more than
once,otherwise there could be
data lost
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
at
java.lang.Iterable.spliterator(Iterable.java:101)
I understood I can branch the
pipeline after GroupByKey
into multiple transformation
and iterate in each of them
once on the Iterable<V>.
Is there a better way for that?
citi_logo_mailciti_logo_mail*Noam
Gershi*
Software Developer
*T*:+972 (3) 7405718
<tel:+972%203-740-5718>
Mail_signature_blue