The DoFnSignature is where the information "this ParDo only needs a
oneshot" would be recorded. This is what enables a runner to use the
GBKOneShot in place of a full GBK.

Kenn

On Fri, Oct 4, 2019 at 1:13 AM Reuven Lax <[email protected]> wrote:

> Yes - this approach puts compatibility checking on the user. However we
> could provide another way for the ParDo to "advertise" the set of states it
> will access. This is similar to what Kenn proposed: today there is a
> DoFnSignature object that is inferred
> reflectively based on annotations. However if there were an API to modify
> the DoFnSignature, then a DSL can simply use that API to list a set of
> state readers.
>
> Reuven
>
>
>
> On Fri, Oct 4, 2019 at 1:03 AM Jan Lukavský <[email protected]> wrote:
>
>> +1
>>
>> But I'd warn a little against this kind of absolute freedom for the
>> process() method. It should probably remain that all states will be created
>> before any element passes in, because otherwise it would be hard (if not
>> impossible) to do any compatibility checking of state upon pipeline
>> upgrades.
>>
>> Jan
>> On 10/4/19 9:47 AM, Reuven Lax wrote:
>>
>> IMO the fact that Stateful ParDo requires compile-time annotations isn't
>> the biggest problem - it's that it requires a static set of them, one for
>> each state. This is fine for specific user code, but we really should add
>> the ability to pass in a StateAccessor object to a DoFn that allows the
>> DoFn to dynamically create different state objects. Something like the
>> following:
>>
>> public void process(StateAccessor stateAccessor, ...) {
>>    stateAccessor.getValueState("state1", TypeDescriptors.ints()).get();
>>    stateAccessor.getMapState("state2', TypeDescriptors.strings(),
>> TypeDescriptors.ints()).put();
>>    etc.
>> }
>>
>> This would be a bit less type safe than the current approach (someone
>> could try and fetch the same state twice with different types). However it
>> would be much friendlier to DSLs, and indeed any "generic" PTransform that
>> does not statically know all of its states at compile time.
>>
>> I think we need similar functionality for timers.
>>
>> Reuven
>>
>> On Fri, Oct 4, 2019 at 12:36 AM Jan Lukavský <[email protected]> wrote:
>>
>>> > So to me the interesting part is that there is a DSL that wants to
>>> support primitives that are strictly weaker than Beam's, in order to *only*
>>> allow the oneshot path. Annotations are quite annoying for DSLs, as you may
>>> have noticed for state & timers, so that is not a good fit. But the
>>> concepts still work. I would suggest pivot this thread into how to allow a
>>> DSL builder to directly provide a DoFnInvoke with DoFnSignature in order to
>>> programmatically provide the same information that annotations are used.
>>> Essentially exposing an IR to DSL authors rather than forcing them to work
>>> with the source language meant for end users. Do you already have a
>>> solution for this today?
>>>
>>> We have been talking about that this would be useful - it is mostly due
>>> to the fact that stateful ParDo requires annotations (compile time) why
>>> Euphoria lacks stateful processing support. For that, we need exactly what
>>> you say, we need to be able to provide runner directly with DoFnSignature.
>>> Other solutions would be kind of hackish.
>>>
>>> On the other hand, this isn't directly related to the discussion about
>>> reiterations in GBK, is it? I think DoFnSignatures cannot help us here,
>>> because we need to affect the way GBK is translated in runner, not the
>>> ParDo. So it quite naturally leads to the RBK, or "streamed GBK". If we
>>> have a consensus on that, I can create JIRAs and move it forward.
>>>
>>> Jan
>>> On 10/3/19 7:19 PM, Kenneth Knowles wrote:
>>>
>>> On Tue, Oct 1, 2019 at 5:35 PM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> For this specific usecase, I would suggest this be done via
>>>> PTranform URNs. E.g. one could have a GroupByKeyOneShot whose
>>>> implementation is
>>>>
>>>> input
>>>>     .apply(GroupByKey.of()
>>>>     .apply(kv -> KV.of(kv.key(), kv.iterator())
>>>>
>>>
>>> This is dual to what I clumsily was trying to say in my last paragraph.
>>> But I agree that ReduceByKey is better, if we were to add any new primitive
>>> transform. I very much dislike PCollection<Iterator> for just the reasons
>>> you also mention.
>>>
>>> I think the annotation route where @ProcessElement can accept a
>>> different type of element seems less intrusive and more flexible.
>>>
>>>
>>>> On Tue, Oct 1, 2019 at 2:16 AM Jan Lukavský <[email protected]> wrote:
>>>>
>>>>> The car analogy was meant to say, that in real world you have to make
>>>>> decision before you take any action. There is no retroactivity possible.
>>>>>
>>>> Reuven pointed out, that it is possible (although it seems a little
>>>>> weird to me, but that is the only thing I can tell against it :-)), that
>>>>> the way a grouped PCollection is produced might be out of control of a
>>>>> consuming operator. One example of this might be, that the grouping is
>>>>> produced in a submodule (some library), but still, the consumer wants to 
>>>>> be
>>>>> able to specify if he wants or doesn't want reiterations.
>>>>>
>>>> Exactly. The person choosing to GroupByKey and the person writing the
>>> one-shot ParDo must be assumed to be different people, in general.
>>>
>>> FWIW I always think of the pipeline as a program and the runner as a
>>> planner/optimizer. So always responsible for reordering and making physical
>>> planning decisions like whether to create an iterable materialization or
>>> just some streamed iterator.
>>>
>>> If we move on, our next option might be to specify the annotation on the
>>>>> consumer (as suggested), but that has all the "not really nice" properties
>>>>> of being counter-intuitive, ignoring strong types, etc., etc., for which
>>>>> reason I think that this should be ruled out as well.
>>>>>
>>>> In Beam we have taken the position that type checking the graph after
>>> it is constructed is an early enough place to catch type errors (speaking
>>> for Java). The validation that ParDo does on the DoFn is basically
>>> lightweight, local, type checking. This is how we detect and type check
>>> stateful ParDo transforms as well as splittable ParDo transforms. We also
>>> catch errors that are not expressible in Java's type system.
>>>
>>> If we were discussing just this Spark limitation/optimization this is a
>>> very natural fit with what we already have: Give the runner all the
>>> information about the nature of the transforms and user functions, and let
>>> it make the best plan it can.
>>>
>>> So to me the interesting part is that there is a DSL that wants to
>>> support primitives that are strictly weaker than Beam's, in order to *only*
>>> allow the oneshot path. Annotations are quite annoying for DSLs, as you may
>>> have noticed for state & timers, so that is not a good fit. But the
>>> concepts still work. I would suggest pivot this thread into how to allow a
>>> DSL builder to directly provide a DoFnInvoke with DoFnSignature in order to
>>> programmatically provide the same information that annotations are used.
>>> Essentially exposing an IR to DSL authors rather than forcing them to work
>>> with the source language meant for end users. Do you already have a
>>> solution for this today?
>>>
>>> Kenn
>>>
>>> This leaves us with a single option (at least I have not figured out any
>>>>> other) - which is we can bundle GBK and associated ParDo into atomic
>>>>> PTransform, which can then be overridden by runners that need special
>>>>> handling of this situation - these are all runners that need buffer data 
>>>>> to
>>>>> memory in order to support reiterations (spark and flink, note that this
>>>>> problem arises only for batch case, because in streaming case, one can
>>>>> reasonably assume that the data resides in a state that supports
>>>>> reiterations). But - we already have this PTransform in Euphoria, it is
>>>>> called ReduceByKey, and has all the required properties (technically, it 
>>>>> is
>>>>> not a PTransform now, but that is a minor detail and can be changed
>>>>> trivially).
>>>>>
>>>>> So, the direction I was trying to take this discussion was - what
>>>>> could be the best way for a runner to natively support a PTransform from a
>>>>> DSL? I can imagine several options:
>>>>>
>>>>>  a) support it directly and let runners depend on the DSL (compileOnly
>>>>> dependency might suffice, because users will include the DSL into their
>>>>> code to be able to use it)
>>>>>
>>>>>  b) create an interface in runners for user-code to be able to provide
>>>>> translation for user-specified operators (this could be absolutely 
>>>>> generic,
>>>>> DSLs might just use this feature the same way any user could), after all
>>>>> runners already use a concept of Translator, but that is pretty much
>>>>> copy-pasted, not abstracted into a general purpose one
>>>>>
>>>>>  c) move the operators that need to be translated into core
>>>>>
>>>>> The option (c) then leaves open questions related to - if we would
>>>>> want to move other operators to core, would this be the right time to ask
>>>>> questions if our current set of "core" operators is the ideal one? Or 
>>>>> could
>>>>> this be optimized?
>>>>>
>>>>> Jan
>>>>> On 10/1/19 12:32 AM, Kenneth Knowles wrote:
>>>>>
>>>>> In the car analogy, you have something this:
>>>>>
>>>>>     Iterable: car
>>>>>     Iterator: taxi ride
>>>>>
>>>>> They are related, but not as variations of a common concept.
>>>>>
>>>>> In the discussion of Combine vs RSBK, if the reducer is required to be
>>>>> an associative and commutative operator, then it is the same thing under a
>>>>> different name. If the reducer can be non-associative or non-commutative,
>>>>> then it admits fewer transformations/optimizations.
>>>>>
>>>>> If you introduce a GroupIteratorsByKey and implement GroupByKey as a
>>>>> transform that combines the iterator by concatenation, I think you do get
>>>>> an internally consistent system. To execute efficiently, you need to 
>>>>> always
>>>>> identify and replace the GroupByKey operation with a primitive one. It 
>>>>> does
>>>>> make some sense to expose the weakest primitives for the sake of DSLs. But
>>>>> they are very poorly suited for end-users, and for GBK on most runners you
>>>>> get the more powerful one for free.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský <[email protected]> wrote:
>>>>>
>>>>>> > The fact that the annotation on the ParDo "changes" the GroupByKey
>>>>>> implementation is very specific to the Spark runner implementation.
>>>>>>
>>>>>> I don't quite agree. It is not very specific to Spark, it is specific
>>>>>> to generally all runners, that produce grouped elements in a way that is
>>>>>> not reiterable. That is the key property. The example you gave with HDFS
>>>>>> does not satisfy this condition (files on HDFS are certainly reiterable),
>>>>>> and that's why no change to the GBK is needed (it actually already has 
>>>>>> the
>>>>>> required property). A quick look at what FlinkRunner (at least non 
>>>>>> portable
>>>>>> does) is that it implements GBK using reducing elements into List. That 
>>>>>> is
>>>>>> going to crash on big PCollection, which is even nicely documented:
>>>>>>
>>>>>>    * <p>For internal use to translate {@link GroupByKey}. For a large 
>>>>>> {@link PCollection} this is
>>>>>>    * expected to crash!
>>>>>>
>>>>>>
>>>>>> If this is fixed, then it is likely to start behave the same as
>>>>>> Spark. So actually I think the opposite is true - Dataflow is a special
>>>>>> case, because of how its internal shuffle service works.
>>>>>>
>>>>>> > In general I sympathize with the worry about non-local effects.
>>>>>> Beam is already full of them (e.g. a Window.into statement effects
>>>>>> downstream GroupByKeys). In each case where they were added there was
>>>>>> extensive debate and discussion (Windowing semantics were debated for 
>>>>>> many
>>>>>> months), exactly because there was concern over adding these non-local
>>>>>> effects. In every case, no other good solution could be found. For the 
>>>>>> case
>>>>>> of windowing for example, it was often easy to propose simple local APIs
>>>>>> (e.g. just pass the window fn as a parameter to GroupByKey), however all 
>>>>>> of
>>>>>> these local solutions ended up not working for important use cases when 
>>>>>> we
>>>>>> analyzed them more deeply.
>>>>>>
>>>>>> That is very interesting. Could you elaborate more about some
>>>>>> examples of the use cases which didn't work? I'd like to try to match it
>>>>>> against how Euphoria is structured, it should be more resistant to this
>>>>>> non-local effects, because it very often bundles together multiple Beam's
>>>>>> primitives to single transform - ReduceByKey is one example of this, if 
>>>>>> is
>>>>>> actually mix of Window.into() + GBK + ParDo, Although it might look like 
>>>>>> if
>>>>>> this transform can be broken down to something else, then it is not
>>>>>> primitive (euphoria has no native equivalent of GBK itself), but it has
>>>>>> several other nice implications - that is that Combine now becomes a
>>>>>> special case of RBK. It now becomes only a question of where and how you
>>>>>> can "run" the reduce function. The logic is absolutely equal. This can be
>>>>>> worked in more detail and actually show, that even Combine and RBK can be
>>>>>> decribed by a more general stateful operation (ReduceStateByKey), and so
>>>>>> finally Euphoria actually has only two really "primitive" operations -
>>>>>> these are FlatMap (basically stateless ParDo) and RSBK. As I already
>>>>>> mentioned on some other thread, when stateful ParDo would support merging
>>>>>> windows, it can be shown that both Combine and GBK become special cases 
>>>>>> of
>>>>>> this.
>>>>>>
>>>>>> > As you mentioned below, I do think it's perfectly reasonable for a
>>>>>> DSL to impose its own semantics. Scio already does this - the raw Beam 
>>>>>> API
>>>>>> is used by a DSL as a substrate, but the DSL does not need to blindly
>>>>>> mirror the semantics of the raw Beam API - at least in my opinion!
>>>>>>
>>>>>> Sure, but currently, there is no way for DSL to "hook" into runner,
>>>>>> so it has to use raw Beam SDK, and so this will fail in cases like this -
>>>>>> where Beam actually has stronger guarantees than it is required by the 
>>>>>> DSL.
>>>>>> It would be cool if we could find a way to do that - this pretty much
>>>>>> aligns with another question raised on ML, about the possibility to
>>>>>> override a default implementation of a PTransform for specific pipeline.
>>>>>>
>>>>>> Jan
>>>>>>
>>>>>>
>>>>>> On 9/29/19 7:46 PM, Reuven Lax wrote:
>>>>>>
>>>>>> Jan,
>>>>>>
>>>>>> The fact that the annotation on the ParDo "changes" the GroupByKey
>>>>>> implementation is very specific to the Spark runner implementation. You 
>>>>>> can
>>>>>> imagine another runner that simply writes out files in HDFS to implement 
>>>>>> a
>>>>>> GroupByKey - this GroupByKey implementation is agnostic whether the 
>>>>>> result
>>>>>> will be reiterated or not; in this case it is very much the ParDo
>>>>>> implementation that changes to implement a reiterable. vI think you don't
>>>>>> like the fact that an annotation on the ParDo will have a non-local 
>>>>>> effect
>>>>>> on the implementation of the GroupByKey upstream. However arguably the
>>>>>> non-local effect is just a quirk of how the Spark runner is implemented -
>>>>>> other runners might have a local effect.
>>>>>>
>>>>>> In general I sympathize with the worry about non-local effects. Beam
>>>>>> is already full of them (e.g. a Window.into statement effects downstream
>>>>>> GroupByKeys). In each case where they were added there was extensive 
>>>>>> debate
>>>>>> and discussion (Windowing semantics were debated for many months), 
>>>>>> exactly
>>>>>> because there was concern over adding these non-local effects. In every
>>>>>> case, no other good solution could be found. For the case of windowing 
>>>>>> for
>>>>>> example, it was often easy to propose simple local APIs (e.g. just pass 
>>>>>> the
>>>>>> window fn as a parameter to GroupByKey), however all of these local
>>>>>> solutions ended up not working for important use cases when we analyzed
>>>>>> them more deeply.
>>>>>>
>>>>>> As you mentioned below, I do think it's perfectly reasonable for a
>>>>>> DSL to impose its own semantics. Scio already does this - the raw Beam 
>>>>>> API
>>>>>> is used by a DSL as a substrate, but the DSL does not need to blindly
>>>>>> mirror the semantics of the raw Beam API - at least in my opinion!
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> I understand the concerns. Still, it looks a little like we want to
>>>>>>> be able to modify behavior of an object from inside a submodule - quite
>>>>>>> like if my subprogram would accept a Map interface, but internally I 
>>>>>>> would
>>>>>>> say "hey, this is supposed to be a HashMap, please change it so". 
>>>>>>> Because
>>>>>>> of how pipeline is constructed, we can do that, the question is if there
>>>>>>> really isn't a better solution.
>>>>>>>
>>>>>>> What I do not like about the proposed solution:
>>>>>>>
>>>>>>>  1) to specify that the grouped elements are supposed to be iterated
>>>>>>> only once can be done only on ParDo, although there are other (higher
>>>>>>> level) PTransforms, that can consume output of GBK
>>>>>>>
>>>>>>>  2) the annontation on ParDo is by definition generic - i.e. can be
>>>>>>> used on input which is not output of GBK, which makes no sense
>>>>>>>
>>>>>>>  3) we modify the behavior to unlock some optimizations (or change
>>>>>>> of behavior of the GBK itself), users will not understand that
>>>>>>>
>>>>>>>  4) the annotation somewhat arbitrarily modifies data types passed,
>>>>>>> that is counter-intuitive and will be source of confusion
>>>>>>>
>>>>>>> I think that a solution that solves the above problems (but brings
>>>>>>> somoe new, as always :-)), could be to change the output of GBK from
>>>>>>> PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can
>>>>>>> control which operators (and how) consume the grouping, and we can 
>>>>>>> enable
>>>>>>> these transforms to specify additional parameters (like how they want to
>>>>>>> consume the grouping). It is obviously a breaking change (although can 
>>>>>>> be
>>>>>>> probably made backwards compatible) and it would very much likely 
>>>>>>> involve a
>>>>>>> substantial work. But maybe there are some other not yet discussed 
>>>>>>> options.
>>>>>>>
>>>>>>> Jan
>>>>>>> On 9/28/19 6:46 AM, Reuven Lax wrote:
>>>>>>>
>>>>>>> In many cases, the writer of the ParDo has no access to the GBK
>>>>>>> (e.g. the GBK is hidden inside an upstream PTransform that they cannot
>>>>>>> modify). This is the same reason why RequiresStableInput was made a
>>>>>>> property of the ParDo, because the GroupByKey is quite often 
>>>>>>> inaccessible.
>>>>>>>
>>>>>>> The car analogy doesn't quite apply here, because the runner does
>>>>>>> have a full view of the pipeline so can satisfy all constraints. The car
>>>>>>> dealership generally cannot read your mind (thankfully!), so you have to
>>>>>>> specify what you want. Or to put it another way, the various transforms 
>>>>>>> in
>>>>>>> a Beam pipeline do not live in isolation. The full pipeline graph is 
>>>>>>> what
>>>>>>> is executed, and the runner already has to analyze the full graph to run
>>>>>>> the pipeline (as well as to optimize the pipeline).
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'd suggest Stream instead of Iterator, it has the same semantics
>>>>>>>> and much better API.
>>>>>>>>
>>>>>>>> Still not sure, what is wrong on letting the GBK to decide this. I
>>>>>>>> have an analogy - if I decide to buy a car, I have to decide *what* 
>>>>>>>> car I'm
>>>>>>>> going to buy (by think about how I'm going to use it) *before* I buy 
>>>>>>>> it. I
>>>>>>>> cannot just buy "a car" and then change it from minivan to sport car 
>>>>>>>> based
>>>>>>>> on my actual need. Same with the GBK - if I want to be able to 
>>>>>>>> reiterate
>>>>>>>> the result, then I should tell it in advance.
>>>>>>>>
>>>>>>>> Jan
>>>>>>>> On 9/27/19 10:50 PM, Kenneth Knowles wrote:
>>>>>>>>
>>>>>>>> Good point about sibling fusion requiring this.
>>>>>>>>
>>>>>>>> The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does
>>>>>>>> imply that the output iterable can be iterated arbitrarily many times.
>>>>>>>>
>>>>>>>> I think this should remain the default for all the reasons
>>>>>>>> mentioned.
>>>>>>>>
>>>>>>>> We could have opt-in to the weaker KV<K, Iterator<V>> version.
>>>>>>>> Agree that this is a property of the ParDo. A particular use of a GBK 
>>>>>>>> has
>>>>>>>> no idea what is downstream. If you owned the whole pipeline, a special
>>>>>>>> ParDo<Iterator<V>, Foo> would work. But to make the types line up, this
>>>>>>>> would require changes upstream, which is not good.
>>>>>>>>
>>>>>>>> Maybe something like this:
>>>>>>>>
>>>>>>>> ParDo<Iterable<V>, Foo> {
>>>>>>>>   @ProcessElement
>>>>>>>>   void process(@OneShotIterator Iterator<V> iter) {
>>>>>>>>     ...
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> I've described all of this in terms of Java SDK. So we would need a
>>>>>>>> portable representation for all this metadata.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think the behavior to make explicit is the need to reiterate,
>>>>>>>>> not the need to handle large results. How large of a result can be 
>>>>>>>>> handled
>>>>>>>>> will always be dependent on the runner, and each runner will probably 
>>>>>>>>> have
>>>>>>>>> a different definition of large keys. Reiteration however is a logical
>>>>>>>>> difference in the programming API. Therefore I think it makes sense to
>>>>>>>>> specify the latter. The need to reiterate is a property of the 
>>>>>>>>> downstream
>>>>>>>>> ParDo, so it should be specified there - not on the GBK.
>>>>>>>>>
>>>>>>>>> Reuven
>>>>>>>>>
>>>>>>>>> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Ok, I think I understand there might be some benefits of this.
>>>>>>>>>> Then I'd propose we make this clear on the GBK. If we would support
>>>>>>>>>> somehing like this:
>>>>>>>>>>
>>>>>>>>>>  PCollection<?> input = ....;
>>>>>>>>>>
>>>>>>>>>>  input.apply(GroupByKey.withLargeKeys());
>>>>>>>>>>
>>>>>>>>>> then SparkRunner could expand this to
>>>>>>>>>> repartitionAndSortWithinPartitions only on this PTransform, and 
>>>>>>>>>> fallback to
>>>>>>>>>> the default (in memory) in other situations. The default expansion of
>>>>>>>>>> LargeGroupByKey (let's say) would be classic GBK, so that only 
>>>>>>>>>> runners that
>>>>>>>>>> need to make sure that they don't break reiterations can expand this.
>>>>>>>>>>
>>>>>>>>>> WDYT?
>>>>>>>>>>
>>>>>>>>>> Jan
>>>>>>>>>> On 9/27/19 8:56 PM, Reuven Lax wrote:
>>>>>>>>>>
>>>>>>>>>> As I mentioned above, CoGroupByKey already takes advantage of
>>>>>>>>>> this. Reiterating is not the most common use case, but it's 
>>>>>>>>>> definitely one
>>>>>>>>>> that comes up. Also keep in mind that this API has supported 
>>>>>>>>>> reiterating
>>>>>>>>>> for the past five years (since even before the SDK was donated to 
>>>>>>>>>> Apache).
>>>>>>>>>> Therefore you should assume that users are relying on it, in ways we 
>>>>>>>>>> might
>>>>>>>>>> not expect.
>>>>>>>>>>
>>>>>>>>>> Historically, Google's Flume system had collections that did not
>>>>>>>>>> support reiterating (they were even called OneShotCollections to 
>>>>>>>>>> make it
>>>>>>>>>> clear). This was the source of problems and user frustration, which 
>>>>>>>>>> was one
>>>>>>>>>> reason that in the original Dataflow SDK we made sure that these 
>>>>>>>>>> iterables
>>>>>>>>>> could be reiterated. Another reason why it's advantageous for a 
>>>>>>>>>> runner to
>>>>>>>>>> support this is allowing for better fusion. If two ParDos A and B 
>>>>>>>>>> both read
>>>>>>>>>> from the same GroupByKey, it is nice to be able to fuse them into one
>>>>>>>>>> logical operator. For this, you'll probably need a shuffle 
>>>>>>>>>> implementation
>>>>>>>>>> that allows two independent readers from the same shuffle session.
>>>>>>>>>>
>>>>>>>>>> How easy it is to implement reiterables that don't have to fit in
>>>>>>>>>> memory will depend on the runner.  For Dataflow it's possible 
>>>>>>>>>> because the
>>>>>>>>>> shuffle session is logically persistent, so the runner can simply 
>>>>>>>>>> reread
>>>>>>>>>> the shuffle session. For other runners with different shuffle
>>>>>>>>>> implementations, it might be harder to support both properties. 
>>>>>>>>>> Maybe we
>>>>>>>>>> should introduce a new @RequiresReiteration annotation on ParDo? 
>>>>>>>>>> That way
>>>>>>>>>> the Spark runner can see this and switch to the in-memory version 
>>>>>>>>>> just for
>>>>>>>>>> groupings consumed by those ParDos. Runners that already support
>>>>>>>>>> reiteration can ignore this annotation, so it should be backwards
>>>>>>>>>> compatible.
>>>>>>>>>>
>>>>>>>>>> Reuven
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'd like to know the use-case. Why would you *need* to actually
>>>>>>>>>>> iterate the grouped elements twice? By definition the first 
>>>>>>>>>>> iteration would
>>>>>>>>>>> have to extract some statistic (or subset of elements that must fit 
>>>>>>>>>>> into
>>>>>>>>>>> memory). This statistic can then be used as another input for the 
>>>>>>>>>>> second
>>>>>>>>>>> iteration. Why not then calculate the statistic in a separate 
>>>>>>>>>>> branch in the
>>>>>>>>>>> pipeline and feed it then into the ParDo as side input? That would 
>>>>>>>>>>> be
>>>>>>>>>>> definitely more efficient, because the calculation of the statistic 
>>>>>>>>>>> would
>>>>>>>>>>> be probably combinable (not sure if it is absolutely required to be
>>>>>>>>>>> combinable, but it seems probable). Even if the calculation would 
>>>>>>>>>>> not be
>>>>>>>>>>> combinable, it is not less efficient than reiterating twice. Why 
>>>>>>>>>>> then
>>>>>>>>>>> support multiple iterations (besides the fact that output of GBK is
>>>>>>>>>>> Iterable). Am I missing something?
>>>>>>>>>>>
>>>>>>>>>>> Jan
>>>>>>>>>>> On 9/27/19 6:36 PM, Reuven Lax wrote:
>>>>>>>>>>>
>>>>>>>>>>> This should be an element in the compatibility matrix as well.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I am pretty surprised that we do not have
>>>>>>>>>>>> a @Category(ValidatesRunner) test in GroupByKeyTest that iterates 
>>>>>>>>>>>> multiple
>>>>>>>>>>>> times. That is a major oversight. We should have this test, and it 
>>>>>>>>>>>> can be
>>>>>>>>>>>> disabled by the SparkRunner's configuration.
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> The Dataflow version does not spill to disk. However Spark's
>>>>>>>>>>>>> design might require spilling to disk if you want that to be 
>>>>>>>>>>>>> implemented
>>>>>>>>>>>>> properly.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Spark's GBK is currently implemented using `sortBy(key and
>>>>>>>>>>>>>> value).mapPartition(...)` for non-merging windowing in order to 
>>>>>>>>>>>>>> support
>>>>>>>>>>>>>> large keys and large scale shuffles. Merging windowing is 
>>>>>>>>>>>>>> implemented using
>>>>>>>>>>>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash 
>>>>>>>>>>>>>> Grouping),
>>>>>>>>>>>>>> which is by design unable to support large keys.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As Jan noted, problem with mapPartition is, that its UDF
>>>>>>>>>>>>>> receives an Iterator. Only option here is to wrap this iterator 
>>>>>>>>>>>>>> to one that
>>>>>>>>>>>>>> spills to disk once an internal buffer is exceeded (the approach 
>>>>>>>>>>>>>> suggested
>>>>>>>>>>>>>> by Reuven). This unfortunately comes with a cost in some cases. 
>>>>>>>>>>>>>> The best
>>>>>>>>>>>>>> approach would be to somehow determine, that user wants multiple 
>>>>>>>>>>>>>> iterations
>>>>>>>>>>>>>> and than wrap it in "re-iterator" if necessary. Does anyone have 
>>>>>>>>>>>>>> any ideas
>>>>>>>>>>>>>> how to approach this?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> D.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <[email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The Beam API was written to support multiple iterations, and
>>>>>>>>>>>>>>> there are definitely transforms that do so. I believe that 
>>>>>>>>>>>>>>> CoGroupByKey may
>>>>>>>>>>>>>>> do this as well with the resulting iterator.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I know that the Dataflow runner is able to handles iterators
>>>>>>>>>>>>>>> larger than available memory by paging them in from shuffle, 
>>>>>>>>>>>>>>> which still
>>>>>>>>>>>>>>> allows for reiterating. It sounds like Spark is less flexible 
>>>>>>>>>>>>>>> here?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Reuven
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> +dev <[email protected]> <[email protected]>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Lukasz, why do you think that users expect to be able to
>>>>>>>>>>>>>>>> iterate multiple times grouped elements? Besides that it 
>>>>>>>>>>>>>>>> obviously suggests
>>>>>>>>>>>>>>>> the 'Iterable'? The way that spark behaves is pretty much 
>>>>>>>>>>>>>>>> analogous to how
>>>>>>>>>>>>>>>> MapReduce used to work - in certain cases it calles
>>>>>>>>>>>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, 
>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>> accepts Iterator - that is because internally it merge sorts 
>>>>>>>>>>>>>>>> pre sorted
>>>>>>>>>>>>>>>> segments. This approach enables to GroupByKey data sets that 
>>>>>>>>>>>>>>>> are too big to
>>>>>>>>>>>>>>>> fit into memory (per key).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If multiple iterations should be expected by users, we
>>>>>>>>>>>>>>>> probably should:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  a) include that in @ValidatesRunner tests
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  b) store values in memory on spark, which will break for
>>>>>>>>>>>>>>>> certain pipelines
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Because of (b) I think that it would be much better to
>>>>>>>>>>>>>>>> remove this "expectation" and clearly document that the 
>>>>>>>>>>>>>>>> Iterable is not
>>>>>>>>>>>>>>>> supposed to be iterated multiple times.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Jan
>>>>>>>>>>>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I pretty much think so, because that is how Spark works.
>>>>>>>>>>>>>>>> The Iterable inside is really an Iterator, which cannot be 
>>>>>>>>>>>>>>>> iterated
>>>>>>>>>>>>>>>> multiple times.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Jan
>>>>>>>>>>>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Jan, in Beam users expect to be able to iterate the GBK
>>>>>>>>>>>>>>>> output multiple times even from within the same ParDo.
>>>>>>>>>>>>>>>> Is this something that Beam on Spark Runner never supported?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Gershi,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> could you please outline the pipeline you are trying to
>>>>>>>>>>>>>>>>> execute? Basically, you cannot iterate the Iterable multiple 
>>>>>>>>>>>>>>>>> times in
>>>>>>>>>>>>>>>>> single ParDo. It should be possible, though, to apply 
>>>>>>>>>>>>>>>>> multiple ParDos to
>>>>>>>>>>>>>>>>> output from GroupByKey.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Jan
>>>>>>>>>>>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I want to iterate multiple times on the Iterable<V> (the
>>>>>>>>>>>>>>>>> output of GroupByKey transformation)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator
>>>>>>>>>>>>>>>>> can't be iterated more than once,otherwise there could be 
>>>>>>>>>>>>>>>>> data lost
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>                 at
>>>>>>>>>>>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>                 at
>>>>>>>>>>>>>>>>> java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I understood I can branch the pipeline after GroupByKey
>>>>>>>>>>>>>>>>> into multiple transformation and iterate in each of them once 
>>>>>>>>>>>>>>>>> on the
>>>>>>>>>>>>>>>>> Iterable<V>.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is there a better way for that?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam
>>>>>>>>>>>>>>>>> Gershi*
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Software Developer
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [image: Mail_signature_blue]
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>

Reply via email to