On 10/2/19 4:30 AM, Reuven Lax wrote:
On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
> The fact that the annotation on the ParDo "changes" the
GroupByKey implementation is very specific to the Spark runner
implementation.
I don't quite agree. It is not very specific to Spark, it is
specific to generally all runners, that produce grouped elements
in a way that is not reiterable. That is the key property. The
example you gave with HDFS does not satisfy this condition (files
on HDFS are certainly reiterable), and that's why no change to the
GBK is needed (it actually already has the required property). A
quick look at what FlinkRunner (at least non portable does) is
that it implements GBK using reducing elements into List. That is
going to crash on big PCollection, which is even nicely documented:
* <p>For internal use to translate {@link GroupByKey}. For a large
{@link PCollection} this is
* expected to crash!
If this is fixed, then it is likely to start behave the same as
Spark. So actually I think the opposite is true - Dataflow is a
special case, because of how its internal shuffle service works.
I think you misunderstood - I was not trying to dish on the Spark
runner. Rather my point is that whether the GroupByKey implementation
is affected or not is runner dependent. In some runners it is and in
others it isn't. However in all cases the /semantics/ of the ParDo is
affected. Since Beam tries as much as possible to be runner agnostic,
we should default to making the change where there is an obvious
semantic difference.
I understand that, but I just don't think, that a semantics should be
affected by this. If outcome of GBK is Iterable, then it should be
reiterable, that is how Iterable works, so I now lean more towards a
conclusion, that the current behavior of Spark runner simply breaks this
contract. Solution of this would be to introduce the proposed
GroupByKeyOneShot or Reduce(By|Per)Key.
> In general I sympathize with the worry about non-local effects.
Beam is already full of them (e.g. a Window.into statement effects
downstream GroupByKeys). In each case where they were added there
was extensive debate and discussion (Windowing semantics were
debated for many months), exactly because there was concern over
adding these non-local effects. In every case, no other good
solution could be found. For the case of windowing for example, it
was often easy to propose simple local APIs (e.g. just pass the
window fn as a parameter to GroupByKey), however all of these
local solutions ended up not working for important use cases when
we analyzed them more deeply.
That is very interesting. Could you elaborate more about some
examples of the use cases which didn't work? I'd like to try to
match it against how Euphoria is structured, it should be more
resistant to this non-local effects, because it very often bundles
together multiple Beam's primitives to single transform -
ReduceByKey is one example of this, if is actually mix of
Window.into() + GBK + ParDo, Although it might look like if this
transform can be broken down to something else, then it is not
primitive (euphoria has no native equivalent of GBK itself), but
it has several other nice implications - that is that Combine now
becomes a special case of RBK. It now becomes only a question of
where and how you can "run" the reduce function. The logic is
absolutely equal. This can be worked in more detail and actually
show, that even Combine and RBK can be decribed by a more general
stateful operation (ReduceStateByKey), and so finally Euphoria
actually has only two really "primitive" operations - these are
FlatMap (basically stateless ParDo) and RSBK. As I already
mentioned on some other thread, when stateful ParDo would support
merging windows, it can be shown that both Combine and GBK become
special cases of this.
> As you mentioned below, I do think it's perfectly reasonable for a
DSL to impose its own semantics. Scio already does this - the raw
Beam API is used by a DSL as a substrate, but the DSL does not
need to blindly mirror the semantics of the raw Beam API - at
least in my opinion!
Sure, but currently, there is no way for DSL to "hook" into
runner, so it has to use raw Beam SDK, and so this will fail in
cases like this - where Beam actually has stronger guarantees than
it is required by the DSL. It would be cool if we could find a way
to do that - this pretty much aligns with another question raised
on ML, about the possibility to override a default implementation
of a PTransform for specific pipeline.
Jan
On 9/29/19 7:46 PM, Reuven Lax wrote:
Jan,
The fact that the annotation on the ParDo "changes" the
GroupByKey implementation is very specific to the Spark runner
implementation. You can imagine another runner that simply writes
out files in HDFS to implement a GroupByKey - this GroupByKey
implementation is agnostic whether the result will be reiterated
or not; in this case it is very much the ParDo implementation
that changes to implement a reiterable. vI think you don't like
the fact that an annotation on the ParDo will have a non-local
effect on the implementation of the GroupByKey upstream. However
arguably the non-local effect is just a quirk of how the Spark
runner is implemented - other runners might have a local effect.
In general I sympathize with the worry about non-local effects.
Beam is already full of them (e.g. a Window.into statement
effects downstream GroupByKeys). In each case where they were
added there was extensive debate and discussion (Windowing
semantics were debated for many months), exactly because there
was concern over adding these non-local effects. In every case,
no other good solution could be found. For the case of windowing
for example, it was often easy to propose simple local APIs (e.g.
just pass the window fn as a parameter to GroupByKey), however
all of these local solutions ended up not working for important
use cases when we analyzed them more deeply.
As you mentioned below, I do think it's perfectly reasonable for
a DSL to impose its own semantics. Scio already does this - the
raw Beam API is used by a DSL as a substrate, but the DSL does
not need to blindly mirror the semantics of the raw Beam API - at
least in my opinion!
Reuven
On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
I understand the concerns. Still, it looks a little like we
want to be able to modify behavior of an object from inside a
submodule - quite like if my subprogram would accept a Map
interface, but internally I would say "hey, this is supposed
to be a HashMap, please change it so". Because of how
pipeline is constructed, we can do that, the question is if
there really isn't a better solution.
What I do not like about the proposed solution:
1) to specify that the grouped elements are supposed to be
iterated only once can be done only on ParDo, although there
are other (higher level) PTransforms, that can consume output
of GBK
2) the annontation on ParDo is by definition generic - i.e.
can be used on input which is not output of GBK, which makes
no sense
3) we modify the behavior to unlock some optimizations (or
change of behavior of the GBK itself), users will not
understand that
4) the annotation somewhat arbitrarily modifies data types
passed, that is counter-intuitive and will be source of confusion
I think that a solution that solves the above problems (but
brings somoe new, as always :-)), could be to change the
output of GBK from PCollection<K, Iterable<V>> to
GroupedPCollection<K, V>. That way we can control which
operators (and how) consume the grouping, and we can enable
these transforms to specify additional parameters (like how
they want to consume the grouping). It is obviously a
breaking change (although can be probably made backwards
compatible) and it would very much likely involve a
substantial work. But maybe there are some other not yet
discussed options.
Jan
On 9/28/19 6:46 AM, Reuven Lax wrote:
In many cases, the writer of the ParDo has no access to the
GBK (e.g. the GBK is hidden inside an upstream PTransform
that they cannot modify). This is the same reason why
RequiresStableInput was made a property of the ParDo,
because the GroupByKey is quite often inaccessible.
The car analogy doesn't quite apply here, because the runner
does have a full view of the pipeline so can satisfy all
constraints. The car dealership generally cannot read your
mind (thankfully!), so you have to specify what you want. Or
to put it another way, the various transforms in a Beam
pipeline do not live in isolation. The full pipeline graph
is what is executed, and the runner already has to analyze
the full graph to run the pipeline (as well as to optimize
the pipeline).
Reuven
On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
I'd suggest Stream instead of Iterator, it has the same
semantics and much better API.
Still not sure, what is wrong on letting the GBK to
decide this. I have an analogy - if I decide to buy a
car, I have to decide *what* car I'm going to buy (by
think about how I'm going to use it) *before* I buy it.
I cannot just buy "a car" and then change it from
minivan to sport car based on my actual need. Same with
the GBK - if I want to be able to reiterate the result,
then I should tell it in advance.
Jan
On 9/27/19 10:50 PM, Kenneth Knowles wrote:
Good point about sibling fusion requiring this.
The type PTransform<KV<K, V>, KV<K, Iterable<V>>>
already does imply that the output iterable can be
iterated arbitrarily many times.
I think this should remain the default for all the
reasons mentioned.
We could have opt-in to the weaker KV<K, Iterator<V>>
version. Agree that this is a property of the ParDo. A
particular use of a GBK has no idea what is downstream.
If you owned the whole pipeline, a special
ParDo<Iterator<V>, Foo> would work. But to make the
types line up, this would require changes upstream,
which is not good.
Maybe something like this:
ParDo<Iterable<V>, Foo> {
@ProcessElement
void process(@OneShotIterator Iterator<V> iter) {
...
}
}
I've described all of this in terms of Java SDK. So we
would need a portable representation for all this metadata.
Kenn
On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
I think the behavior to make explicit is the need
to reiterate, not the need to handle large results.
How large of a result can be handled will always be
dependent on the runner, and each runner will
probably have a different definition of large keys.
Reiteration however is a logical difference in the
programming API. Therefore I think it makes sense
to specify the latter. The need to reiterate is a
property of the downstream ParDo, so it should be
specified there - not on the GBK.
Reuven
On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Ok, I think I understand there might be some
benefits of this. Then I'd propose we make this
clear on the GBK. If we would support somehing
like this:
PCollection<?> input = ....;
input.apply(GroupByKey.withLargeKeys());
then SparkRunner could expand this to
repartitionAndSortWithinPartitions only on this
PTransform, and fallback to the default (in
memory) in other situations. The default
expansion of LargeGroupByKey (let's say) would
be classic GBK, so that only runners that need
to make sure that they don't break reiterations
can expand this.
WDYT?
Jan
On 9/27/19 8:56 PM, Reuven Lax wrote:
As I mentioned above, CoGroupByKey already
takes advantage of this. Reiterating is not
the most common use case, but it's definitely
one that comes up. Also keep in mind that this
API has supported reiterating for the past
five years (since even before the SDK was
donated to Apache). Therefore you should
assume that users are relying on it, in ways
we might not expect.
Historically, Google's Flume system had
collections that did not support reiterating
(they were even called OneShotCollections to
make it clear). This was the source of
problems and user frustration, which was one
reason that in the original Dataflow SDK we
made sure that these iterables could be
reiterated. Another reason why it's
advantageous for a runner to support this is
allowing for better fusion. If two ParDos A
and B both read from the same GroupByKey, it
is nice to be able to fuse them into one
logical operator. For this, you'll probably
need a shuffle implementation that allows two
independent readers from the same shuffle session.
How easy it is to implement reiterables that
don't have to fit in memory will depend on the
runner. For Dataflow it's possible because
the shuffle session is logically persistent,
so the runner can simply reread the shuffle
session. For other runners with different
shuffle implementations, it might be harder to
support both properties. Maybe we should
introduce a new @RequiresReiteration
annotation on ParDo? That way the Spark runner
can see this and switch to the in-memory
version just for groupings consumed by those
ParDos. Runners that already support
reiteration can ignore this annotation, so it
should be backwards compatible.
Reuven
On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
I'd like to know the use-case. Why would
you *need* to actually iterate the grouped
elements twice? By definition the first
iteration would have to extract some
statistic (or subset of elements that must
fit into memory). This statistic can then
be used as another input for the second
iteration. Why not then calculate the
statistic in a separate branch in the
pipeline and feed it then into the ParDo
as side input? That would be definitely
more efficient, because the calculation of
the statistic would be probably combinable
(not sure if it is absolutely required to
be combinable, but it seems probable).
Even if the calculation would not be
combinable, it is not less efficient than
reiterating twice. Why then support
multiple iterations (besides the fact that
output of GBK is Iterable). Am I missing
something?
Jan
On 9/27/19 6:36 PM, Reuven Lax wrote:
This should be an element in the
compatibility matrix as well.
On Fri, Sep 27, 2019 at 9:26 AM Kenneth
Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
I am pretty surprised that we do not
have a @Category(ValidatesRunner)
test in GroupByKeyTest that iterates
multiple times. That is a major
oversight. We should have this test,
and it can be disabled by the
SparkRunner's configuration.
Kenn
On Fri, Sep 27, 2019 at 9:24 AM
Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
The Dataflow version does not
spill to disk. However Spark's
design might require spilling to
disk if you want that to be
implemented properly.
On Fri, Sep 27, 2019 at 9:08 AM
David Morávek <d...@apache.org
<mailto:d...@apache.org>> wrote:
Hi,
Spark's GBK is currently
implemented using `sortBy(key
and value).mapPartition(...)`
for non-merging windowing in
order to support large keys
and large scale shuffles.
Merging windowing is
implemented using standard
GBK (underlying spark impl.
uses ListCombiner + Hash
Grouping), which is by design
unable to support large keys.
As Jan noted, problem with
mapPartition is, that its UDF
receives an Iterator. Only
option here is to wrap this
iterator to one that spills
to disk once an internal
buffer is exceeded (the
approach suggested by
Reuven). This unfortunately
comes with a cost in some
cases. The best approach
would be to somehow
determine, that user wants
multiple iterations and than
wrap it in "re-iterator" if
necessary. Does anyone have
any ideas how to approach this?
D.
On Fri, Sep 27, 2019 at 5:46
PM Reuven Lax
<re...@google.com
<mailto:re...@google.com>> wrote:
The Beam API was written
to support multiple
iterations, and there are
definitely transforms
that do so. I believe
that CoGroupByKey may do
this as well with the
resulting iterator.
I know that the Dataflow
runner is able to handles
iterators larger than
available memory by
paging them in from
shuffle, which still
allows for reiterating.
It sounds like Spark is
less flexible here?
Reuven
On Fri, Sep 27, 2019 at
3:04 AM Jan Lukavský
<je...@seznam.cz
<mailto:je...@seznam.cz>>
wrote:
+dev
<dev@beam.apache.org>
<mailto:dev@beam.apache.org>
Lukasz, why do you
think that users
expect to be able to
iterate multiple
times grouped
elements? Besides
that it obviously
suggests the
'Iterable'? The way
that spark behaves is
pretty much analogous
to how MapReduce used
to work - in certain
cases it calles
repartitionAndSortWithinPartitions
and then does
mapPartition, which
accepts Iterator -
that is because
internally it merge
sorts pre sorted
segments. This
approach enables to
GroupByKey data sets
that are too big to
fit into memory (per
key).
If multiple
iterations should be
expected by users, we
probably should:
a) include that in
@ValidatesRunner tests
b) store values in
memory on spark,
which will break for
certain pipelines
Because of (b) I
think that it would
be much better to
remove this
"expectation" and
clearly document that
the Iterable is not
supposed to be
iterated multiple times.
Jan
On 9/27/19 9:27 AM,
Jan Lukavský wrote:
I pretty much think
so, because that is
how Spark works. The
Iterable inside is
really an Iterator,
which cannot be
iterated multiple times.
Jan
On 9/27/19 2:00 AM,
Lukasz Cwik wrote:
Jan, in Beam users
expect to be able
to iterate the GBK
output multiple
times even from
within the same ParDo.
Is this something
that Beam on Spark
Runner never supported?
On Thu, Sep 26,
2019 at 6:50 AM Jan
Lukavský
<je...@seznam.cz
<mailto:je...@seznam.cz>>
wrote:
Hi Gershi,
could you
please outline
the pipeline
you are trying
to execute?
Basically, you
cannot iterate
the Iterable
multiple times
in single
ParDo. It
should be
possible,
though, to
apply multiple
ParDos to
output from
GroupByKey.
Jan
On 9/26/19 3:32
PM, Gershi,
Noam wrote:
Hi,
I want to
iterate
multiple times
on the
Iterable<V>
(the output of
GroupByKey
transformation)
When my Runner
is
SparkRunner, I
get an exception:
Caused by:
java.lang.IllegalStateException:
ValueIterator
can't be
iterated more
than
once,otherwise
there could be
data lost
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
at
java.lang.Iterable.spliterator(Iterable.java:101)
I understood I
can branch the
pipeline after
GroupByKey
into multiple
transformation
and iterate in
each of them
once on the
Iterable<V>.
Is there a
better way for
that?
citi_logo_mailciti_logo_mail*Noam
Gershi*
Software Developer
*T*:+972 (3)
7405718
<tel:+972%203-740-5718>
Mail_signature_blue