Ok - now I see what you're talking about. You are focusing on the Java
types in the Java SDK, where the output of GBK is an Iterable type (which
should always be reiterable). I was talking more abstractly about the
programming model, i.e. the portability representation of the graph.

In this case I think that Robert's suggestion for the Java SDK is the right
one. Create a new transform, and have runners optimize it away if necessary.

On Wed, Oct 2, 2019 at 2:19 AM Jan Lukavský <je...@seznam.cz> wrote:

>
> On 10/2/19 4:30 AM, Reuven Lax wrote:
>
>
>
> On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> > The fact that the annotation on the ParDo "changes" the GroupByKey
>> implementation is very specific to the Spark runner implementation.
>>
>> I don't quite agree. It is not very specific to Spark, it is specific to
>> generally all runners, that produce grouped elements in a way that is not
>> reiterable. That is the key property. The example you gave with HDFS does
>> not satisfy this condition (files on HDFS are certainly reiterable), and
>> that's why no change to the GBK is needed (it actually already has the
>> required property). A quick look at what FlinkRunner (at least non portable
>> does) is that it implements GBK using reducing elements into List. That is
>> going to crash on big PCollection, which is even nicely documented:
>>
>>    * <p>For internal use to translate {@link GroupByKey}. For a large {@link 
>> PCollection} this is
>>    * expected to crash!
>>
>>
>> If this is fixed, then it is likely to start behave the same as Spark. So
>> actually I think the opposite is true - Dataflow is a special case, because
>> of how its internal shuffle service works.
>>
>
> I think you misunderstood - I was not trying to dish on the Spark runner.
> Rather my point is that whether the GroupByKey implementation is affected
> or not is runner dependent. In some runners it is and in others it isn't.
> However in all cases the *semantics* of the ParDo is affected. Since Beam
> tries as much as possible to be runner agnostic, we should default to
> making the change where there is an obvious semantic difference.
>
>
> I understand that, but I just don't think, that a semantics should be
> affected by this. If outcome of GBK is Iterable, then it should be
> reiterable, that is how Iterable works, so I now lean more towards a
> conclusion, that the current behavior of Spark runner simply breaks this
> contract. Solution of this would be to introduce the proposed
> GroupByKeyOneShot or Reduce(By|Per)Key.
>
>
>
> > In general I sympathize with the worry about non-local effects. Beam is
>> already full of them (e.g. a Window.into statement effects downstream
>> GroupByKeys). In each case where they were added there was extensive debate
>> and discussion (Windowing semantics were debated for many months), exactly
>> because there was concern over adding these non-local effects. In every
>> case, no other good solution could be found. For the case of windowing for
>> example, it was often easy to propose simple local APIs (e.g. just pass the
>> window fn as a parameter to GroupByKey), however all of these local
>> solutions ended up not working for important use cases when we analyzed
>> them more deeply.
>>
>> That is very interesting. Could you elaborate more about some examples of
>> the use cases which didn't work? I'd like to try to match it against how
>> Euphoria is structured, it should be more resistant to this non-local
>> effects, because it very often bundles together multiple Beam's primitives
>> to single transform - ReduceByKey is one example of this, if is actually
>> mix of Window.into() + GBK + ParDo, Although it might look like if this
>> transform can be broken down to something else, then it is not primitive
>> (euphoria has no native equivalent of GBK itself), but it has several other
>> nice implications - that is that Combine now becomes a special case of RBK.
>> It now becomes only a question of where and how you can "run" the reduce
>> function. The logic is absolutely equal. This can be worked in more detail
>> and actually show, that even Combine and RBK can be decribed by a more
>> general stateful operation (ReduceStateByKey), and so finally Euphoria
>> actually has only two really "primitive" operations - these are FlatMap
>> (basically stateless ParDo) and RSBK. As I already mentioned on some other
>> thread, when stateful ParDo would support merging windows, it can be shown
>> that both Combine and GBK become special cases of this.
>>
>> > As you mentioned below, I do think it's perfectly reasonable for a DSL
>> to impose its own semantics. Scio already does this - the raw Beam API is
>> used by a DSL as a substrate, but the DSL does not need to blindly mirror
>> the semantics of the raw Beam API - at least in my opinion!
>>
>> Sure, but currently, there is no way for DSL to "hook" into runner, so it
>> has to use raw Beam SDK, and so this will fail in cases like this - where
>> Beam actually has stronger guarantees than it is required by the DSL. It
>> would be cool if we could find a way to do that - this pretty much aligns
>> with another question raised on ML, about the possibility to override a
>> default implementation of a PTransform for specific pipeline.
>>
>> Jan
>>
>>
>> On 9/29/19 7:46 PM, Reuven Lax wrote:
>>
>> Jan,
>>
>> The fact that the annotation on the ParDo "changes" the GroupByKey
>> implementation is very specific to the Spark runner implementation. You can
>> imagine another runner that simply writes out files in HDFS to implement a
>> GroupByKey - this GroupByKey implementation is agnostic whether the result
>> will be reiterated or not; in this case it is very much the ParDo
>> implementation that changes to implement a reiterable. vI think you don't
>> like the fact that an annotation on the ParDo will have a non-local effect
>> on the implementation of the GroupByKey upstream. However arguably the
>> non-local effect is just a quirk of how the Spark runner is implemented -
>> other runners might have a local effect.
>>
>> In general I sympathize with the worry about non-local effects. Beam is
>> already full of them (e.g. a Window.into statement effects downstream
>> GroupByKeys). In each case where they were added there was extensive debate
>> and discussion (Windowing semantics were debated for many months), exactly
>> because there was concern over adding these non-local effects. In every
>> case, no other good solution could be found. For the case of windowing for
>> example, it was often easy to propose simple local APIs (e.g. just pass the
>> window fn as a parameter to GroupByKey), however all of these local
>> solutions ended up not working for important use cases when we analyzed
>> them more deeply.
>>
>> As you mentioned below, I do think it's perfectly reasonable for a DSL to
>> impose its own semantics. Scio already does this - the raw Beam API is used
>> by a DSL as a substrate, but the DSL does not need to blindly mirror the
>> semantics of the raw Beam API - at least in my opinion!
>>
>> Reuven
>>
>> On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> I understand the concerns. Still, it looks a little like we want to be
>>> able to modify behavior of an object from inside a submodule - quite like
>>> if my subprogram would accept a Map interface, but internally I would say
>>> "hey, this is supposed to be a HashMap, please change it so". Because of
>>> how pipeline is constructed, we can do that, the question is if there
>>> really isn't a better solution.
>>>
>>> What I do not like about the proposed solution:
>>>
>>>  1) to specify that the grouped elements are supposed to be iterated
>>> only once can be done only on ParDo, although there are other (higher
>>> level) PTransforms, that can consume output of GBK
>>>
>>>  2) the annontation on ParDo is by definition generic - i.e. can be used
>>> on input which is not output of GBK, which makes no sense
>>>
>>>  3) we modify the behavior to unlock some optimizations (or change of
>>> behavior of the GBK itself), users will not understand that
>>>
>>>  4) the annotation somewhat arbitrarily modifies data types passed, that
>>> is counter-intuitive and will be source of confusion
>>>
>>> I think that a solution that solves the above problems (but brings somoe
>>> new, as always :-)), could be to change the output of GBK from
>>> PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can
>>> control which operators (and how) consume the grouping, and we can enable
>>> these transforms to specify additional parameters (like how they want to
>>> consume the grouping). It is obviously a breaking change (although can be
>>> probably made backwards compatible) and it would very much likely involve a
>>> substantial work. But maybe there are some other not yet discussed options.
>>>
>>> Jan
>>> On 9/28/19 6:46 AM, Reuven Lax wrote:
>>>
>>> In many cases, the writer of the ParDo has no access to the GBK (e.g.
>>> the GBK is hidden inside an upstream PTransform that they cannot modify).
>>> This is the same reason why RequiresStableInput was made a property of the
>>> ParDo, because the GroupByKey is quite often inaccessible.
>>>
>>> The car analogy doesn't quite apply here, because the runner does have a
>>> full view of the pipeline so can satisfy all constraints. The car
>>> dealership generally cannot read your mind (thankfully!), so you have to
>>> specify what you want. Or to put it another way, the various transforms in
>>> a Beam pipeline do not live in isolation. The full pipeline graph is what
>>> is executed, and the runner already has to analyze the full graph to run
>>> the pipeline (as well as to optimize the pipeline).
>>>
>>> Reuven
>>>
>>> On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> I'd suggest Stream instead of Iterator, it has the same semantics and
>>>> much better API.
>>>>
>>>> Still not sure, what is wrong on letting the GBK to decide this. I have
>>>> an analogy - if I decide to buy a car, I have to decide *what* car I'm
>>>> going to buy (by think about how I'm going to use it) *before* I buy it. I
>>>> cannot just buy "a car" and then change it from minivan to sport car based
>>>> on my actual need. Same with the GBK - if I want to be able to reiterate
>>>> the result, then I should tell it in advance.
>>>>
>>>> Jan
>>>> On 9/27/19 10:50 PM, Kenneth Knowles wrote:
>>>>
>>>> Good point about sibling fusion requiring this.
>>>>
>>>> The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does imply
>>>> that the output iterable can be iterated arbitrarily many times.
>>>>
>>>> I think this should remain the default for all the reasons mentioned.
>>>>
>>>> We could have opt-in to the weaker KV<K, Iterator<V>> version. Agree
>>>> that this is a property of the ParDo. A particular use of a GBK has no idea
>>>> what is downstream. If you owned the whole pipeline, a special
>>>> ParDo<Iterator<V>, Foo> would work. But to make the types line up, this
>>>> would require changes upstream, which is not good.
>>>>
>>>> Maybe something like this:
>>>>
>>>> ParDo<Iterable<V>, Foo> {
>>>>   @ProcessElement
>>>>   void process(@OneShotIterator Iterator<V> iter) {
>>>>     ...
>>>>   }
>>>> }
>>>>
>>>> I've described all of this in terms of Java SDK. So we would need a
>>>> portable representation for all this metadata.
>>>>
>>>> Kenn
>>>>
>>>> On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I think the behavior to make explicit is the need to reiterate, not
>>>>> the need to handle large results. How large of a result can be handled 
>>>>> will
>>>>> always be dependent on the runner, and each runner will probably have a
>>>>> different definition of large keys. Reiteration however is a logical
>>>>> difference in the programming API. Therefore I think it makes sense to
>>>>> specify the latter. The need to reiterate is a property of the downstream
>>>>> ParDo, so it should be specified there - not on the GBK.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> Ok, I think I understand there might be some benefits of this. Then
>>>>>> I'd propose we make this clear on the GBK. If we would support somehing
>>>>>> like this:
>>>>>>
>>>>>>  PCollection<?> input = ....;
>>>>>>
>>>>>>  input.apply(GroupByKey.withLargeKeys());
>>>>>>
>>>>>> then SparkRunner could expand this to
>>>>>> repartitionAndSortWithinPartitions only on this PTransform, and fallback 
>>>>>> to
>>>>>> the default (in memory) in other situations. The default expansion of
>>>>>> LargeGroupByKey (let's say) would be classic GBK, so that only runners 
>>>>>> that
>>>>>> need to make sure that they don't break reiterations can expand this.
>>>>>>
>>>>>> WDYT?
>>>>>>
>>>>>> Jan
>>>>>> On 9/27/19 8:56 PM, Reuven Lax wrote:
>>>>>>
>>>>>> As I mentioned above, CoGroupByKey already takes advantage of this.
>>>>>> Reiterating is not the most common use case, but it's definitely one that
>>>>>> comes up. Also keep in mind that this API has supported reiterating for 
>>>>>> the
>>>>>> past five years (since even before the SDK was donated to Apache).
>>>>>> Therefore you should assume that users are relying on it, in ways we 
>>>>>> might
>>>>>> not expect.
>>>>>>
>>>>>> Historically, Google's Flume system had collections that did not
>>>>>> support reiterating (they were even called OneShotCollections to make it
>>>>>> clear). This was the source of problems and user frustration, which was 
>>>>>> one
>>>>>> reason that in the original Dataflow SDK we made sure that these 
>>>>>> iterables
>>>>>> could be reiterated. Another reason why it's advantageous for a runner to
>>>>>> support this is allowing for better fusion. If two ParDos A and B both 
>>>>>> read
>>>>>> from the same GroupByKey, it is nice to be able to fuse them into one
>>>>>> logical operator. For this, you'll probably need a shuffle implementation
>>>>>> that allows two independent readers from the same shuffle session.
>>>>>>
>>>>>> How easy it is to implement reiterables that don't have to fit in
>>>>>> memory will depend on the runner.  For Dataflow it's possible because the
>>>>>> shuffle session is logically persistent, so the runner can simply reread
>>>>>> the shuffle session. For other runners with different shuffle
>>>>>> implementations, it might be harder to support both properties. Maybe we
>>>>>> should introduce a new @RequiresReiteration annotation on ParDo? That way
>>>>>> the Spark runner can see this and switch to the in-memory version just 
>>>>>> for
>>>>>> groupings consumed by those ParDos. Runners that already support
>>>>>> reiteration can ignore this annotation, so it should be backwards
>>>>>> compatible.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz>
>>>>>> wrote:
>>>>>>
>>>>>>> I'd like to know the use-case. Why would you *need* to actually
>>>>>>> iterate the grouped elements twice? By definition the first iteration 
>>>>>>> would
>>>>>>> have to extract some statistic (or subset of elements that must fit into
>>>>>>> memory). This statistic can then be used as another input for the second
>>>>>>> iteration. Why not then calculate the statistic in a separate branch in 
>>>>>>> the
>>>>>>> pipeline and feed it then into the ParDo as side input? That would be
>>>>>>> definitely more efficient, because the calculation of the statistic 
>>>>>>> would
>>>>>>> be probably combinable (not sure if it is absolutely required to be
>>>>>>> combinable, but it seems probable). Even if the calculation would not be
>>>>>>> combinable, it is not less efficient than reiterating twice. Why then
>>>>>>> support multiple iterations (besides the fact that output of GBK is
>>>>>>> Iterable). Am I missing something?
>>>>>>>
>>>>>>> Jan
>>>>>>> On 9/27/19 6:36 PM, Reuven Lax wrote:
>>>>>>>
>>>>>>> This should be an element in the compatibility matrix as well.
>>>>>>>
>>>>>>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I am pretty surprised that we do not have
>>>>>>>> a @Category(ValidatesRunner) test in GroupByKeyTest that iterates 
>>>>>>>> multiple
>>>>>>>> times. That is a major oversight. We should have this test, and it can 
>>>>>>>> be
>>>>>>>> disabled by the SparkRunner's configuration.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The Dataflow version does not spill to disk. However Spark's
>>>>>>>>> design might require spilling to disk if you want that to be 
>>>>>>>>> implemented
>>>>>>>>> properly.
>>>>>>>>>
>>>>>>>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Spark's GBK is currently implemented using `sortBy(key and
>>>>>>>>>> value).mapPartition(...)` for non-merging windowing in order to 
>>>>>>>>>> support
>>>>>>>>>> large keys and large scale shuffles. Merging windowing is 
>>>>>>>>>> implemented using
>>>>>>>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash 
>>>>>>>>>> Grouping),
>>>>>>>>>> which is by design unable to support large keys.
>>>>>>>>>>
>>>>>>>>>> As Jan noted, problem with mapPartition is, that its UDF receives
>>>>>>>>>> an Iterator. Only option here is to wrap this iterator to one that 
>>>>>>>>>> spills
>>>>>>>>>> to disk once an internal buffer is exceeded (the approach suggested 
>>>>>>>>>> by
>>>>>>>>>> Reuven). This unfortunately comes with a cost in some cases. The best
>>>>>>>>>> approach would be to somehow determine, that user wants multiple 
>>>>>>>>>> iterations
>>>>>>>>>> and than wrap it in "re-iterator" if necessary. Does anyone have any 
>>>>>>>>>> ideas
>>>>>>>>>> how to approach this?
>>>>>>>>>>
>>>>>>>>>> D.
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> The Beam API was written to support multiple iterations, and
>>>>>>>>>>> there are definitely transforms that do so. I believe that 
>>>>>>>>>>> CoGroupByKey may
>>>>>>>>>>> do this as well with the resulting iterator.
>>>>>>>>>>>
>>>>>>>>>>> I know that the Dataflow runner is able to handles iterators
>>>>>>>>>>> larger than available memory by paging them in from shuffle, which 
>>>>>>>>>>> still
>>>>>>>>>>> allows for reiterating. It sounds like Spark is less flexible here?
>>>>>>>>>>>
>>>>>>>>>>> Reuven
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org>
>>>>>>>>>>>>
>>>>>>>>>>>> Lukasz, why do you think that users expect to be able to
>>>>>>>>>>>> iterate multiple times grouped elements? Besides that it obviously 
>>>>>>>>>>>> suggests
>>>>>>>>>>>> the 'Iterable'? The way that spark behaves is pretty much 
>>>>>>>>>>>> analogous to how
>>>>>>>>>>>> MapReduce used to work - in certain cases it calles
>>>>>>>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, 
>>>>>>>>>>>> which
>>>>>>>>>>>> accepts Iterator - that is because internally it merge sorts pre 
>>>>>>>>>>>> sorted
>>>>>>>>>>>> segments. This approach enables to GroupByKey data sets that are 
>>>>>>>>>>>> too big to
>>>>>>>>>>>> fit into memory (per key).
>>>>>>>>>>>>
>>>>>>>>>>>> If multiple iterations should be expected by users, we probably
>>>>>>>>>>>> should:
>>>>>>>>>>>>
>>>>>>>>>>>>  a) include that in @ValidatesRunner tests
>>>>>>>>>>>>
>>>>>>>>>>>>  b) store values in memory on spark, which will break for
>>>>>>>>>>>> certain pipelines
>>>>>>>>>>>>
>>>>>>>>>>>> Because of (b) I think that it would be much better to remove
>>>>>>>>>>>> this "expectation" and clearly document that the Iterable is not 
>>>>>>>>>>>> supposed
>>>>>>>>>>>> to be iterated multiple times.
>>>>>>>>>>>>
>>>>>>>>>>>> Jan
>>>>>>>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> I pretty much think so, because that is how Spark works. The
>>>>>>>>>>>> Iterable inside is really an Iterator, which cannot be iterated 
>>>>>>>>>>>> multiple
>>>>>>>>>>>> times.
>>>>>>>>>>>>
>>>>>>>>>>>> Jan
>>>>>>>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Jan, in Beam users expect to be able to iterate the GBK output
>>>>>>>>>>>> multiple times even from within the same ParDo.
>>>>>>>>>>>> Is this something that Beam on Spark Runner never supported?
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Gershi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> could you please outline the pipeline you are trying to
>>>>>>>>>>>>> execute? Basically, you cannot iterate the Iterable multiple 
>>>>>>>>>>>>> times in
>>>>>>>>>>>>> single ParDo. It should be possible, though, to apply multiple 
>>>>>>>>>>>>> ParDos to
>>>>>>>>>>>>> output from GroupByKey.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jan
>>>>>>>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> I want to iterate multiple times on the Iterable<V> (the
>>>>>>>>>>>>> output of GroupByKey transformation)
>>>>>>>>>>>>>
>>>>>>>>>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator
>>>>>>>>>>>>> can't be iterated more than once,otherwise there could be data 
>>>>>>>>>>>>> lost
>>>>>>>>>>>>>
>>>>>>>>>>>>>                 at
>>>>>>>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>>>>>>>>>
>>>>>>>>>>>>>                 at
>>>>>>>>>>>>> java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> I understood I can branch the pipeline after GroupByKey into
>>>>>>>>>>>>> multiple transformation and iterate in each of them once on the 
>>>>>>>>>>>>> Iterable<V>.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is there a better way for that?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>>>>>>>>>>
>>>>>>>>>>>>> Software Developer
>>>>>>>>>>>>>
>>>>>>>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>>>>>>>>>
>>>>>>>>>>>>> [image: Mail_signature_blue]
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>

Reply via email to