Ok, I think I understand there might be some benefits of this. Then I'd
propose we make this clear on the GBK. If we would support somehing like
this:
PCollection<?> input = ....;
input.apply(GroupByKey.withLargeKeys());
then SparkRunner could expand this to repartitionAndSortWithinPartitions
only on this PTransform, and fallback to the default (in memory) in
other situations. The default expansion of LargeGroupByKey (let's say)
would be classic GBK, so that only runners that need to make sure that
they don't break reiterations can expand this.
WDYT?
Jan
On 9/27/19 8:56 PM, Reuven Lax wrote:
As I mentioned above, CoGroupByKey already takes advantage of this.
Reiterating is not the most common use case, but it's definitely one
that comes up. Also keep in mind that this API has supported
reiterating for the past five years (since even before the SDK was
donated to Apache). Therefore you should assume that users are relying
on it, in ways we might not expect.
Historically, Google's Flume system had collections that did not
support reiterating (they were even called OneShotCollections to make
it clear). This was the source of problems and user frustration, which
was one reason that in the original Dataflow SDK we made sure that
these iterables could be reiterated. Another reason why it's
advantageous for a runner to support this is allowing for better
fusion. If two ParDos A and B both read from the same GroupByKey, it
is nice to be able to fuse them into one logical operator. For this,
you'll probably need a shuffle implementation that allows two
independent readers from the same shuffle session.
How easy it is to implement reiterables that don't have to fit in
memory will depend on the runner. For Dataflow it's possible because
the shuffle session is logically persistent, so the runner can simply
reread the shuffle session. For other runners with different shuffle
implementations, it might be harder to support both properties. Maybe
we should introduce a new @RequiresReiteration annotation on ParDo?
That way the Spark runner can see this and switch to the in-memory
version just for groupings consumed by those ParDos. Runners that
already support reiteration can ignore this annotation, so it should
be backwards compatible.
Reuven
On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
I'd like to know the use-case. Why would you *need* to actually
iterate the grouped elements twice? By definition the first
iteration would have to extract some statistic (or subset of
elements that must fit into memory). This statistic can then be
used as another input for the second iteration. Why not then
calculate the statistic in a separate branch in the pipeline and
feed it then into the ParDo as side input? That would be
definitely more efficient, because the calculation of the
statistic would be probably combinable (not sure if it is
absolutely required to be combinable, but it seems probable). Even
if the calculation would not be combinable, it is not less
efficient than reiterating twice. Why then support multiple
iterations (besides the fact that output of GBK is Iterable). Am I
missing something?
Jan
On 9/27/19 6:36 PM, Reuven Lax wrote:
This should be an element in the compatibility matrix as well.
On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
I am pretty surprised that we do not have
a @Category(ValidatesRunner) test in GroupByKeyTest that
iterates multiple times. That is a major oversight. We should
have this test, and it can be disabled by the SparkRunner's
configuration.
Kenn
On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
The Dataflow version does not spill to disk. However
Spark's design might require spilling to disk if you want
that to be implemented properly.
On Fri, Sep 27, 2019 at 9:08 AM David Morávek
<d...@apache.org <mailto:d...@apache.org>> wrote:
Hi,
Spark's GBK is currently implemented using
`sortBy(key and value).mapPartition(...)` for
non-merging windowing in order to support large keys
and large scale shuffles. Merging windowing is
implemented using standard GBK (underlying spark
impl. uses ListCombiner + Hash Grouping), which is by
design unable to support large keys.
As Jan noted, problem with mapPartition is, that its
UDF receives an Iterator. Only option here is to wrap
this iterator to one that spills to disk once an
internal buffer is exceeded (the approach suggested
by Reuven). This unfortunately comes with a cost in
some cases. The best approach would be to somehow
determine, that user wants multiple iterations and
than wrap it in "re-iterator" if necessary. Does
anyone have any ideas how to approach this?
D.
On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
The Beam API was written to support multiple
iterations, and there are definitely transforms
that do so. I believe that CoGroupByKey may do
this as well with the resulting iterator.
I know that the Dataflow runner is able to
handles iterators larger than available memory by
paging them in from shuffle, which still allows
for reiterating. It sounds like Spark is less
flexible here?
Reuven
On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
+dev <dev@beam.apache.org>
<mailto:dev@beam.apache.org>
Lukasz, why do you think that users expect to
be able to iterate multiple times grouped
elements? Besides that it obviously suggests
the 'Iterable'? The way that spark behaves is
pretty much analogous to how MapReduce used
to work - in certain cases it calles
repartitionAndSortWithinPartitions and then
does mapPartition, which accepts Iterator -
that is because internally it merge sorts pre
sorted segments. This approach enables to
GroupByKey data sets that are too big to fit
into memory (per key).
If multiple iterations should be expected by
users, we probably should:
a) include that in @ValidatesRunner tests
b) store values in memory on spark, which
will break for certain pipelines
Because of (b) I think that it would be much
better to remove this "expectation" and
clearly document that the Iterable is not
supposed to be iterated multiple times.
Jan
On 9/27/19 9:27 AM, Jan Lukavský wrote:
I pretty much think so, because that is how
Spark works. The Iterable inside is really
an Iterator, which cannot be iterated
multiple times.
Jan
On 9/27/19 2:00 AM, Lukasz Cwik wrote:
Jan, in Beam users expect to be able to
iterate the GBK output multiple times even
from within the same ParDo.
Is this something that Beam on Spark Runner
never supported?
On Thu, Sep 26, 2019 at 6:50 AM Jan
Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi Gershi,
could you please outline the pipeline
you are trying to execute? Basically,
you cannot iterate the Iterable
multiple times in single ParDo. It
should be possible, though, to apply
multiple ParDos to output from GroupByKey.
Jan
On 9/26/19 3:32 PM, Gershi, Noam wrote:
Hi,
I want to iterate multiple times on
the Iterable<V> (the output of
GroupByKey transformation)
When my Runner is SparkRunner, I get
an exception:
Caused by:
java.lang.IllegalStateException:
ValueIterator can't be iterated more
than once,otherwise there could be
data lost
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
at
java.lang.Iterable.spliterator(Iterable.java:101)
I understood I can branch the pipeline
after GroupByKey into multiple
transformation and iterate in each of
them once on the Iterable<V>.
Is there a better way for that?
citi_logo_mailciti_logo_mail*Noam Gershi*
Software Developer
*T*:+972 (3) 7405718
<tel:+972%203-740-5718>
Mail_signature_blue