I understand the concerns. Still, it looks a little like we want to be
able to modify behavior of an object from inside a submodule - quite
like if my subprogram would accept a Map interface, but internally I
would say "hey, this is supposed to be a HashMap, please change it so".
Because of how pipeline is constructed, we can do that, the question is
if there really isn't a better solution.
What I do not like about the proposed solution:
1) to specify that the grouped elements are supposed to be iterated
only once can be done only on ParDo, although there are other (higher
level) PTransforms, that can consume output of GBK
2) the annontation on ParDo is by definition generic - i.e. can be
used on input which is not output of GBK, which makes no sense
3) we modify the behavior to unlock some optimizations (or change of
behavior of the GBK itself), users will not understand that
4) the annotation somewhat arbitrarily modifies data types passed,
that is counter-intuitive and will be source of confusion
I think that a solution that solves the above problems (but brings somoe
new, as always :-)), could be to change the output of GBK from
PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can
control which operators (and how) consume the grouping, and we can
enable these transforms to specify additional parameters (like how they
want to consume the grouping). It is obviously a breaking change
(although can be probably made backwards compatible) and it would very
much likely involve a substantial work. But maybe there are some other
not yet discussed options.
Jan
On 9/28/19 6:46 AM, Reuven Lax wrote:
In many cases, the writer of the ParDo has no access to the GBK (e.g.
the GBK is hidden inside an upstream PTransform that they cannot
modify). This is the same reason why RequiresStableInput was made a
property of the ParDo, because the GroupByKey is quite often
inaccessible.
The car analogy doesn't quite apply here, because the runner does have
a full view of the pipeline so can satisfy all constraints. The car
dealership generally cannot read your mind (thankfully!), so you
have to specify what you want. Or to put it another way, the various
transforms in a Beam pipeline do not live in isolation. The full
pipeline graph is what is executed, and the runner already has to
analyze the full graph to run the pipeline (as well as to optimize the
pipeline).
Reuven
On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
I'd suggest Stream instead of Iterator, it has the same semantics
and much better API.
Still not sure, what is wrong on letting the GBK to decide this. I
have an analogy - if I decide to buy a car, I have to decide
*what* car I'm going to buy (by think about how I'm going to use
it) *before* I buy it. I cannot just buy "a car" and then change
it from minivan to sport car based on my actual need. Same with
the GBK - if I want to be able to reiterate the result, then I
should tell it in advance.
Jan
On 9/27/19 10:50 PM, Kenneth Knowles wrote:
Good point about sibling fusion requiring this.
The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does
imply that the output iterable can be iterated arbitrarily many
times.
I think this should remain the default for all the reasons mentioned.
We could have opt-in to the weaker KV<K, Iterator<V>> version.
Agree that this is a property of the ParDo. A particular use of a
GBK has no idea what is downstream. If you owned the whole
pipeline, a special ParDo<Iterator<V>, Foo> would work. But to
make the types line up, this would require changes upstream,
which is not good.
Maybe something like this:
ParDo<Iterable<V>, Foo> {
@ProcessElement
void process(@OneShotIterator Iterator<V> iter) {
...
}
}
I've described all of this in terms of Java SDK. So we would need
a portable representation for all this metadata.
Kenn
On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
I think the behavior to make explicit is the need to
reiterate, not the need to handle large results. How large of
a result can be handled will always be dependent on the
runner, and each runner will probably have a different
definition of large keys. Reiteration however is a logical
difference in the programming API. Therefore I think it makes
sense to specify the latter. The need to reiterate is a
property of the downstream ParDo, so it should be specified
there - not on the GBK.
Reuven
On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Ok, I think I understand there might be some benefits of
this. Then I'd propose we make this clear on the GBK. If
we would support somehing like this:
PCollection<?> input = ....;
input.apply(GroupByKey.withLargeKeys());
then SparkRunner could expand this to
repartitionAndSortWithinPartitions only on this
PTransform, and fallback to the default (in memory) in
other situations. The default expansion of
LargeGroupByKey (let's say) would be classic GBK, so that
only runners that need to make sure that they don't break
reiterations can expand this.
WDYT?
Jan
On 9/27/19 8:56 PM, Reuven Lax wrote:
As I mentioned above, CoGroupByKey already takes
advantage of this. Reiterating is not the most common
use case, but it's definitely one that comes up. Also
keep in mind that this API has supported reiterating for
the past five years (since even before the SDK was
donated to Apache). Therefore you should assume that
users are relying on it, in ways we might not expect.
Historically, Google's Flume system had collections that
did not support reiterating (they were even called
OneShotCollections to make it clear). This was the
source of problems and user frustration, which was one
reason that in the original Dataflow SDK we made sure
that these iterables could be reiterated. Another reason
why it's advantageous for a runner to support this is
allowing for better fusion. If two ParDos A and B both
read from the same GroupByKey, it is nice to be able to
fuse them into one logical operator. For this, you'll
probably need a shuffle implementation that allows two
independent readers from the same shuffle session.
How easy it is to implement reiterables that don't have
to fit in memory will depend on the runner. For
Dataflow it's possible because the shuffle session is
logically persistent, so the runner can simply reread
the shuffle session. For other runners with different
shuffle implementations, it might be harder to support
both properties. Maybe we should introduce a
new @RequiresReiteration annotation on ParDo? That way
the Spark runner can see this and switch to the
in-memory version just for groupings consumed by those
ParDos. Runners that already support reiteration can
ignore this annotation, so it should be backwards
compatible.
Reuven
On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
I'd like to know the use-case. Why would you *need*
to actually iterate the grouped elements twice? By
definition the first iteration would have to extract
some statistic (or subset of elements that must fit
into memory). This statistic can then be used as
another input for the second iteration. Why not then
calculate the statistic in a separate branch in the
pipeline and feed it then into the ParDo as side
input? That would be definitely more efficient,
because the calculation of the statistic would be
probably combinable (not sure if it is absolutely
required to be combinable, but it seems probable).
Even if the calculation would not be combinable, it
is not less efficient than reiterating twice. Why
then support multiple iterations (besides the fact
that output of GBK is Iterable). Am I missing something?
Jan
On 9/27/19 6:36 PM, Reuven Lax wrote:
This should be an element in the compatibility
matrix as well.
On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
I am pretty surprised that we do not have
a @Category(ValidatesRunner) test
in GroupByKeyTest that iterates multiple times.
That is a major oversight. We should have this
test, and it can be disabled by the
SparkRunner's configuration.
Kenn
On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
The Dataflow version does not spill to
disk. However Spark's design might require
spilling to disk if you want that to be
implemented properly.
On Fri, Sep 27, 2019 at 9:08 AM David
Morávek <d...@apache.org
<mailto:d...@apache.org>> wrote:
Hi,
Spark's GBK is currently implemented
using `sortBy(key and
value).mapPartition(...)` for
non-merging windowing in order to
support large keys and large scale
shuffles. Merging windowing is
implemented using standard GBK
(underlying spark impl. uses
ListCombiner + Hash Grouping), which is
by design unable to support large keys.
As Jan noted, problem with mapPartition
is, that its UDF receives an Iterator.
Only option here is to wrap this
iterator to one that spills to disk
once an internal buffer is exceeded
(the approach suggested by Reuven).
This unfortunately comes with a cost in
some cases. The best approach would be
to somehow determine, that user wants
multiple iterations and than wrap it in
"re-iterator" if necessary. Does anyone
have any ideas how to approach this?
D.
On Fri, Sep 27, 2019 at 5:46 PM Reuven
Lax <re...@google.com
<mailto:re...@google.com>> wrote:
The Beam API was written to support
multiple iterations, and there are
definitely transforms that do so. I
believe that CoGroupByKey may do
this as well with the resulting
iterator.
I know that the Dataflow runner is
able to handles iterators larger
than available memory by paging
them in from shuffle, which still
allows for reiterating. It sounds
like Spark is less flexible here?
Reuven
On Fri, Sep 27, 2019 at 3:04 AM Jan
Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
+dev <dev@beam.apache.org>
<mailto:dev@beam.apache.org>
Lukasz, why do you think that
users expect to be able to
iterate multiple times grouped
elements? Besides that it
obviously suggests the
'Iterable'? The way that spark
behaves is pretty much
analogous to how MapReduce used
to work - in certain cases it
calles
repartitionAndSortWithinPartitions
and then does mapPartition,
which accepts Iterator - that
is because internally it merge
sorts pre sorted segments. This
approach enables to GroupByKey
data sets that are too big to
fit into memory (per key).
If multiple iterations should
be expected by users, we
probably should:
a) include that in
@ValidatesRunner tests
b) store values in memory on
spark, which will break for
certain pipelines
Because of (b) I think that it
would be much better to remove
this "expectation" and clearly
document that the Iterable is
not supposed to be iterated
multiple times.
Jan
On 9/27/19 9:27 AM, Jan
Lukavský wrote:
I pretty much think so,
because that is how Spark
works. The Iterable inside is
really an Iterator, which
cannot be iterated multiple times.
Jan
On 9/27/19 2:00 AM, Lukasz
Cwik wrote:
Jan, in Beam users expect to
be able to iterate the GBK
output multiple times even
from within the same ParDo.
Is this something that Beam
on Spark Runner never supported?
On Thu, Sep 26, 2019 at 6:50
AM Jan Lukavský
<je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi Gershi,
could you please outline
the pipeline you are
trying to execute?
Basically, you cannot
iterate the Iterable
multiple times in single
ParDo. It should be
possible, though, to
apply multiple ParDos to
output from GroupByKey.
Jan
On 9/26/19 3:32 PM,
Gershi, Noam wrote:
Hi,
I want to iterate
multiple times on the
Iterable<V> (the output
of GroupByKey
transformation)
When my Runner is
SparkRunner, I get an
exception:
Caused by:
java.lang.IllegalStateException:
ValueIterator can't be
iterated more than
once,otherwise there
could be data lost
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
at
java.lang.Iterable.spliterator(Iterable.java:101)
I understood I can
branch the pipeline
after GroupByKey into
multiple transformation
and iterate in each of
them once on the
Iterable<V>.
Is there a better way
for that?
citi_logo_mailciti_logo_mail*Noam
Gershi*
Software Developer
*T*:+972 (3) 7405718
<tel:+972%203-740-5718>
Mail_signature_blue