The DoFnSignature is where the information "this ParDo only needs a oneshot" would be recorded. This is what enables a runner to use the GBKOneShot in place of a full GBK.
Kenn On Fri, Oct 4, 2019 at 1:13 AM Reuven Lax <[email protected]> wrote: > Yes - this approach puts compatibility checking on the user. However we > could provide another way for the ParDo to "advertise" the set of states it > will access. This is similar to what Kenn proposed: today there is a > DoFnSignature object that is inferred > reflectively based on annotations. However if there were an API to modify > the DoFnSignature, then a DSL can simply use that API to list a set of > state readers. > > Reuven > > > > On Fri, Oct 4, 2019 at 1:03 AM Jan Lukavský <[email protected]> wrote: > >> +1 >> >> But I'd warn a little against this kind of absolute freedom for the >> process() method. It should probably remain that all states will be created >> before any element passes in, because otherwise it would be hard (if not >> impossible) to do any compatibility checking of state upon pipeline >> upgrades. >> >> Jan >> On 10/4/19 9:47 AM, Reuven Lax wrote: >> >> IMO the fact that Stateful ParDo requires compile-time annotations isn't >> the biggest problem - it's that it requires a static set of them, one for >> each state. This is fine for specific user code, but we really should add >> the ability to pass in a StateAccessor object to a DoFn that allows the >> DoFn to dynamically create different state objects. Something like the >> following: >> >> public void process(StateAccessor stateAccessor, ...) { >> stateAccessor.getValueState("state1", TypeDescriptors.ints()).get(); >> stateAccessor.getMapState("state2', TypeDescriptors.strings(), >> TypeDescriptors.ints()).put(); >> etc. >> } >> >> This would be a bit less type safe than the current approach (someone >> could try and fetch the same state twice with different types). However it >> would be much friendlier to DSLs, and indeed any "generic" PTransform that >> does not statically know all of its states at compile time. >> >> I think we need similar functionality for timers. >> >> Reuven >> >> On Fri, Oct 4, 2019 at 12:36 AM Jan Lukavský <[email protected]> wrote: >> >>> > So to me the interesting part is that there is a DSL that wants to >>> support primitives that are strictly weaker than Beam's, in order to *only* >>> allow the oneshot path. Annotations are quite annoying for DSLs, as you may >>> have noticed for state & timers, so that is not a good fit. But the >>> concepts still work. I would suggest pivot this thread into how to allow a >>> DSL builder to directly provide a DoFnInvoke with DoFnSignature in order to >>> programmatically provide the same information that annotations are used. >>> Essentially exposing an IR to DSL authors rather than forcing them to work >>> with the source language meant for end users. Do you already have a >>> solution for this today? >>> >>> We have been talking about that this would be useful - it is mostly due >>> to the fact that stateful ParDo requires annotations (compile time) why >>> Euphoria lacks stateful processing support. For that, we need exactly what >>> you say, we need to be able to provide runner directly with DoFnSignature. >>> Other solutions would be kind of hackish. >>> >>> On the other hand, this isn't directly related to the discussion about >>> reiterations in GBK, is it? I think DoFnSignatures cannot help us here, >>> because we need to affect the way GBK is translated in runner, not the >>> ParDo. So it quite naturally leads to the RBK, or "streamed GBK". If we >>> have a consensus on that, I can create JIRAs and move it forward. >>> >>> Jan >>> On 10/3/19 7:19 PM, Kenneth Knowles wrote: >>> >>> On Tue, Oct 1, 2019 at 5:35 PM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> For this specific usecase, I would suggest this be done via >>>> PTranform URNs. E.g. one could have a GroupByKeyOneShot whose >>>> implementation is >>>> >>>> input >>>> .apply(GroupByKey.of() >>>> .apply(kv -> KV.of(kv.key(), kv.iterator()) >>>> >>> >>> This is dual to what I clumsily was trying to say in my last paragraph. >>> But I agree that ReduceByKey is better, if we were to add any new primitive >>> transform. I very much dislike PCollection<Iterator> for just the reasons >>> you also mention. >>> >>> I think the annotation route where @ProcessElement can accept a >>> different type of element seems less intrusive and more flexible. >>> >>> >>>> On Tue, Oct 1, 2019 at 2:16 AM Jan Lukavský <[email protected]> wrote: >>>> >>>>> The car analogy was meant to say, that in real world you have to make >>>>> decision before you take any action. There is no retroactivity possible. >>>>> >>>> Reuven pointed out, that it is possible (although it seems a little >>>>> weird to me, but that is the only thing I can tell against it :-)), that >>>>> the way a grouped PCollection is produced might be out of control of a >>>>> consuming operator. One example of this might be, that the grouping is >>>>> produced in a submodule (some library), but still, the consumer wants to >>>>> be >>>>> able to specify if he wants or doesn't want reiterations. >>>>> >>>> Exactly. The person choosing to GroupByKey and the person writing the >>> one-shot ParDo must be assumed to be different people, in general. >>> >>> FWIW I always think of the pipeline as a program and the runner as a >>> planner/optimizer. So always responsible for reordering and making physical >>> planning decisions like whether to create an iterable materialization or >>> just some streamed iterator. >>> >>> If we move on, our next option might be to specify the annotation on the >>>>> consumer (as suggested), but that has all the "not really nice" properties >>>>> of being counter-intuitive, ignoring strong types, etc., etc., for which >>>>> reason I think that this should be ruled out as well. >>>>> >>>> In Beam we have taken the position that type checking the graph after >>> it is constructed is an early enough place to catch type errors (speaking >>> for Java). The validation that ParDo does on the DoFn is basically >>> lightweight, local, type checking. This is how we detect and type check >>> stateful ParDo transforms as well as splittable ParDo transforms. We also >>> catch errors that are not expressible in Java's type system. >>> >>> If we were discussing just this Spark limitation/optimization this is a >>> very natural fit with what we already have: Give the runner all the >>> information about the nature of the transforms and user functions, and let >>> it make the best plan it can. >>> >>> So to me the interesting part is that there is a DSL that wants to >>> support primitives that are strictly weaker than Beam's, in order to *only* >>> allow the oneshot path. Annotations are quite annoying for DSLs, as you may >>> have noticed for state & timers, so that is not a good fit. But the >>> concepts still work. I would suggest pivot this thread into how to allow a >>> DSL builder to directly provide a DoFnInvoke with DoFnSignature in order to >>> programmatically provide the same information that annotations are used. >>> Essentially exposing an IR to DSL authors rather than forcing them to work >>> with the source language meant for end users. Do you already have a >>> solution for this today? >>> >>> Kenn >>> >>> This leaves us with a single option (at least I have not figured out any >>>>> other) - which is we can bundle GBK and associated ParDo into atomic >>>>> PTransform, which can then be overridden by runners that need special >>>>> handling of this situation - these are all runners that need buffer data >>>>> to >>>>> memory in order to support reiterations (spark and flink, note that this >>>>> problem arises only for batch case, because in streaming case, one can >>>>> reasonably assume that the data resides in a state that supports >>>>> reiterations). But - we already have this PTransform in Euphoria, it is >>>>> called ReduceByKey, and has all the required properties (technically, it >>>>> is >>>>> not a PTransform now, but that is a minor detail and can be changed >>>>> trivially). >>>>> >>>>> So, the direction I was trying to take this discussion was - what >>>>> could be the best way for a runner to natively support a PTransform from a >>>>> DSL? I can imagine several options: >>>>> >>>>> a) support it directly and let runners depend on the DSL (compileOnly >>>>> dependency might suffice, because users will include the DSL into their >>>>> code to be able to use it) >>>>> >>>>> b) create an interface in runners for user-code to be able to provide >>>>> translation for user-specified operators (this could be absolutely >>>>> generic, >>>>> DSLs might just use this feature the same way any user could), after all >>>>> runners already use a concept of Translator, but that is pretty much >>>>> copy-pasted, not abstracted into a general purpose one >>>>> >>>>> c) move the operators that need to be translated into core >>>>> >>>>> The option (c) then leaves open questions related to - if we would >>>>> want to move other operators to core, would this be the right time to ask >>>>> questions if our current set of "core" operators is the ideal one? Or >>>>> could >>>>> this be optimized? >>>>> >>>>> Jan >>>>> On 10/1/19 12:32 AM, Kenneth Knowles wrote: >>>>> >>>>> In the car analogy, you have something this: >>>>> >>>>> Iterable: car >>>>> Iterator: taxi ride >>>>> >>>>> They are related, but not as variations of a common concept. >>>>> >>>>> In the discussion of Combine vs RSBK, if the reducer is required to be >>>>> an associative and commutative operator, then it is the same thing under a >>>>> different name. If the reducer can be non-associative or non-commutative, >>>>> then it admits fewer transformations/optimizations. >>>>> >>>>> If you introduce a GroupIteratorsByKey and implement GroupByKey as a >>>>> transform that combines the iterator by concatenation, I think you do get >>>>> an internally consistent system. To execute efficiently, you need to >>>>> always >>>>> identify and replace the GroupByKey operation with a primitive one. It >>>>> does >>>>> make some sense to expose the weakest primitives for the sake of DSLs. But >>>>> they are very poorly suited for end-users, and for GBK on most runners you >>>>> get the more powerful one for free. >>>>> >>>>> Kenn >>>>> >>>>> On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský <[email protected]> wrote: >>>>> >>>>>> > The fact that the annotation on the ParDo "changes" the GroupByKey >>>>>> implementation is very specific to the Spark runner implementation. >>>>>> >>>>>> I don't quite agree. It is not very specific to Spark, it is specific >>>>>> to generally all runners, that produce grouped elements in a way that is >>>>>> not reiterable. That is the key property. The example you gave with HDFS >>>>>> does not satisfy this condition (files on HDFS are certainly reiterable), >>>>>> and that's why no change to the GBK is needed (it actually already has >>>>>> the >>>>>> required property). A quick look at what FlinkRunner (at least non >>>>>> portable >>>>>> does) is that it implements GBK using reducing elements into List. That >>>>>> is >>>>>> going to crash on big PCollection, which is even nicely documented: >>>>>> >>>>>> * <p>For internal use to translate {@link GroupByKey}. For a large >>>>>> {@link PCollection} this is >>>>>> * expected to crash! >>>>>> >>>>>> >>>>>> If this is fixed, then it is likely to start behave the same as >>>>>> Spark. So actually I think the opposite is true - Dataflow is a special >>>>>> case, because of how its internal shuffle service works. >>>>>> >>>>>> > In general I sympathize with the worry about non-local effects. >>>>>> Beam is already full of them (e.g. a Window.into statement effects >>>>>> downstream GroupByKeys). In each case where they were added there was >>>>>> extensive debate and discussion (Windowing semantics were debated for >>>>>> many >>>>>> months), exactly because there was concern over adding these non-local >>>>>> effects. In every case, no other good solution could be found. For the >>>>>> case >>>>>> of windowing for example, it was often easy to propose simple local APIs >>>>>> (e.g. just pass the window fn as a parameter to GroupByKey), however all >>>>>> of >>>>>> these local solutions ended up not working for important use cases when >>>>>> we >>>>>> analyzed them more deeply. >>>>>> >>>>>> That is very interesting. Could you elaborate more about some >>>>>> examples of the use cases which didn't work? I'd like to try to match it >>>>>> against how Euphoria is structured, it should be more resistant to this >>>>>> non-local effects, because it very often bundles together multiple Beam's >>>>>> primitives to single transform - ReduceByKey is one example of this, if >>>>>> is >>>>>> actually mix of Window.into() + GBK + ParDo, Although it might look like >>>>>> if >>>>>> this transform can be broken down to something else, then it is not >>>>>> primitive (euphoria has no native equivalent of GBK itself), but it has >>>>>> several other nice implications - that is that Combine now becomes a >>>>>> special case of RBK. It now becomes only a question of where and how you >>>>>> can "run" the reduce function. The logic is absolutely equal. This can be >>>>>> worked in more detail and actually show, that even Combine and RBK can be >>>>>> decribed by a more general stateful operation (ReduceStateByKey), and so >>>>>> finally Euphoria actually has only two really "primitive" operations - >>>>>> these are FlatMap (basically stateless ParDo) and RSBK. As I already >>>>>> mentioned on some other thread, when stateful ParDo would support merging >>>>>> windows, it can be shown that both Combine and GBK become special cases >>>>>> of >>>>>> this. >>>>>> >>>>>> > As you mentioned below, I do think it's perfectly reasonable for a >>>>>> DSL to impose its own semantics. Scio already does this - the raw Beam >>>>>> API >>>>>> is used by a DSL as a substrate, but the DSL does not need to blindly >>>>>> mirror the semantics of the raw Beam API - at least in my opinion! >>>>>> >>>>>> Sure, but currently, there is no way for DSL to "hook" into runner, >>>>>> so it has to use raw Beam SDK, and so this will fail in cases like this - >>>>>> where Beam actually has stronger guarantees than it is required by the >>>>>> DSL. >>>>>> It would be cool if we could find a way to do that - this pretty much >>>>>> aligns with another question raised on ML, about the possibility to >>>>>> override a default implementation of a PTransform for specific pipeline. >>>>>> >>>>>> Jan >>>>>> >>>>>> >>>>>> On 9/29/19 7:46 PM, Reuven Lax wrote: >>>>>> >>>>>> Jan, >>>>>> >>>>>> The fact that the annotation on the ParDo "changes" the GroupByKey >>>>>> implementation is very specific to the Spark runner implementation. You >>>>>> can >>>>>> imagine another runner that simply writes out files in HDFS to implement >>>>>> a >>>>>> GroupByKey - this GroupByKey implementation is agnostic whether the >>>>>> result >>>>>> will be reiterated or not; in this case it is very much the ParDo >>>>>> implementation that changes to implement a reiterable. vI think you don't >>>>>> like the fact that an annotation on the ParDo will have a non-local >>>>>> effect >>>>>> on the implementation of the GroupByKey upstream. However arguably the >>>>>> non-local effect is just a quirk of how the Spark runner is implemented - >>>>>> other runners might have a local effect. >>>>>> >>>>>> In general I sympathize with the worry about non-local effects. Beam >>>>>> is already full of them (e.g. a Window.into statement effects downstream >>>>>> GroupByKeys). In each case where they were added there was extensive >>>>>> debate >>>>>> and discussion (Windowing semantics were debated for many months), >>>>>> exactly >>>>>> because there was concern over adding these non-local effects. In every >>>>>> case, no other good solution could be found. For the case of windowing >>>>>> for >>>>>> example, it was often easy to propose simple local APIs (e.g. just pass >>>>>> the >>>>>> window fn as a parameter to GroupByKey), however all of these local >>>>>> solutions ended up not working for important use cases when we analyzed >>>>>> them more deeply. >>>>>> >>>>>> As you mentioned below, I do think it's perfectly reasonable for a >>>>>> DSL to impose its own semantics. Scio already does this - the raw Beam >>>>>> API >>>>>> is used by a DSL as a substrate, but the DSL does not need to blindly >>>>>> mirror the semantics of the raw Beam API - at least in my opinion! >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> I understand the concerns. Still, it looks a little like we want to >>>>>>> be able to modify behavior of an object from inside a submodule - quite >>>>>>> like if my subprogram would accept a Map interface, but internally I >>>>>>> would >>>>>>> say "hey, this is supposed to be a HashMap, please change it so". >>>>>>> Because >>>>>>> of how pipeline is constructed, we can do that, the question is if there >>>>>>> really isn't a better solution. >>>>>>> >>>>>>> What I do not like about the proposed solution: >>>>>>> >>>>>>> 1) to specify that the grouped elements are supposed to be iterated >>>>>>> only once can be done only on ParDo, although there are other (higher >>>>>>> level) PTransforms, that can consume output of GBK >>>>>>> >>>>>>> 2) the annontation on ParDo is by definition generic - i.e. can be >>>>>>> used on input which is not output of GBK, which makes no sense >>>>>>> >>>>>>> 3) we modify the behavior to unlock some optimizations (or change >>>>>>> of behavior of the GBK itself), users will not understand that >>>>>>> >>>>>>> 4) the annotation somewhat arbitrarily modifies data types passed, >>>>>>> that is counter-intuitive and will be source of confusion >>>>>>> >>>>>>> I think that a solution that solves the above problems (but brings >>>>>>> somoe new, as always :-)), could be to change the output of GBK from >>>>>>> PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can >>>>>>> control which operators (and how) consume the grouping, and we can >>>>>>> enable >>>>>>> these transforms to specify additional parameters (like how they want to >>>>>>> consume the grouping). It is obviously a breaking change (although can >>>>>>> be >>>>>>> probably made backwards compatible) and it would very much likely >>>>>>> involve a >>>>>>> substantial work. But maybe there are some other not yet discussed >>>>>>> options. >>>>>>> >>>>>>> Jan >>>>>>> On 9/28/19 6:46 AM, Reuven Lax wrote: >>>>>>> >>>>>>> In many cases, the writer of the ParDo has no access to the GBK >>>>>>> (e.g. the GBK is hidden inside an upstream PTransform that they cannot >>>>>>> modify). This is the same reason why RequiresStableInput was made a >>>>>>> property of the ParDo, because the GroupByKey is quite often >>>>>>> inaccessible. >>>>>>> >>>>>>> The car analogy doesn't quite apply here, because the runner does >>>>>>> have a full view of the pipeline so can satisfy all constraints. The car >>>>>>> dealership generally cannot read your mind (thankfully!), so you have to >>>>>>> specify what you want. Or to put it another way, the various transforms >>>>>>> in >>>>>>> a Beam pipeline do not live in isolation. The full pipeline graph is >>>>>>> what >>>>>>> is executed, and the runner already has to analyze the full graph to run >>>>>>> the pipeline (as well as to optimize the pipeline). >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> I'd suggest Stream instead of Iterator, it has the same semantics >>>>>>>> and much better API. >>>>>>>> >>>>>>>> Still not sure, what is wrong on letting the GBK to decide this. I >>>>>>>> have an analogy - if I decide to buy a car, I have to decide *what* >>>>>>>> car I'm >>>>>>>> going to buy (by think about how I'm going to use it) *before* I buy >>>>>>>> it. I >>>>>>>> cannot just buy "a car" and then change it from minivan to sport car >>>>>>>> based >>>>>>>> on my actual need. Same with the GBK - if I want to be able to >>>>>>>> reiterate >>>>>>>> the result, then I should tell it in advance. >>>>>>>> >>>>>>>> Jan >>>>>>>> On 9/27/19 10:50 PM, Kenneth Knowles wrote: >>>>>>>> >>>>>>>> Good point about sibling fusion requiring this. >>>>>>>> >>>>>>>> The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does >>>>>>>> imply that the output iterable can be iterated arbitrarily many times. >>>>>>>> >>>>>>>> I think this should remain the default for all the reasons >>>>>>>> mentioned. >>>>>>>> >>>>>>>> We could have opt-in to the weaker KV<K, Iterator<V>> version. >>>>>>>> Agree that this is a property of the ParDo. A particular use of a GBK >>>>>>>> has >>>>>>>> no idea what is downstream. If you owned the whole pipeline, a special >>>>>>>> ParDo<Iterator<V>, Foo> would work. But to make the types line up, this >>>>>>>> would require changes upstream, which is not good. >>>>>>>> >>>>>>>> Maybe something like this: >>>>>>>> >>>>>>>> ParDo<Iterable<V>, Foo> { >>>>>>>> @ProcessElement >>>>>>>> void process(@OneShotIterator Iterator<V> iter) { >>>>>>>> ... >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> I've described all of this in terms of Java SDK. So we would need a >>>>>>>> portable representation for all this metadata. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I think the behavior to make explicit is the need to reiterate, >>>>>>>>> not the need to handle large results. How large of a result can be >>>>>>>>> handled >>>>>>>>> will always be dependent on the runner, and each runner will probably >>>>>>>>> have >>>>>>>>> a different definition of large keys. Reiteration however is a logical >>>>>>>>> difference in the programming API. Therefore I think it makes sense to >>>>>>>>> specify the latter. The need to reiterate is a property of the >>>>>>>>> downstream >>>>>>>>> ParDo, so it should be specified there - not on the GBK. >>>>>>>>> >>>>>>>>> Reuven >>>>>>>>> >>>>>>>>> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Ok, I think I understand there might be some benefits of this. >>>>>>>>>> Then I'd propose we make this clear on the GBK. If we would support >>>>>>>>>> somehing like this: >>>>>>>>>> >>>>>>>>>> PCollection<?> input = ....; >>>>>>>>>> >>>>>>>>>> input.apply(GroupByKey.withLargeKeys()); >>>>>>>>>> >>>>>>>>>> then SparkRunner could expand this to >>>>>>>>>> repartitionAndSortWithinPartitions only on this PTransform, and >>>>>>>>>> fallback to >>>>>>>>>> the default (in memory) in other situations. The default expansion of >>>>>>>>>> LargeGroupByKey (let's say) would be classic GBK, so that only >>>>>>>>>> runners that >>>>>>>>>> need to make sure that they don't break reiterations can expand this. >>>>>>>>>> >>>>>>>>>> WDYT? >>>>>>>>>> >>>>>>>>>> Jan >>>>>>>>>> On 9/27/19 8:56 PM, Reuven Lax wrote: >>>>>>>>>> >>>>>>>>>> As I mentioned above, CoGroupByKey already takes advantage of >>>>>>>>>> this. Reiterating is not the most common use case, but it's >>>>>>>>>> definitely one >>>>>>>>>> that comes up. Also keep in mind that this API has supported >>>>>>>>>> reiterating >>>>>>>>>> for the past five years (since even before the SDK was donated to >>>>>>>>>> Apache). >>>>>>>>>> Therefore you should assume that users are relying on it, in ways we >>>>>>>>>> might >>>>>>>>>> not expect. >>>>>>>>>> >>>>>>>>>> Historically, Google's Flume system had collections that did not >>>>>>>>>> support reiterating (they were even called OneShotCollections to >>>>>>>>>> make it >>>>>>>>>> clear). This was the source of problems and user frustration, which >>>>>>>>>> was one >>>>>>>>>> reason that in the original Dataflow SDK we made sure that these >>>>>>>>>> iterables >>>>>>>>>> could be reiterated. Another reason why it's advantageous for a >>>>>>>>>> runner to >>>>>>>>>> support this is allowing for better fusion. If two ParDos A and B >>>>>>>>>> both read >>>>>>>>>> from the same GroupByKey, it is nice to be able to fuse them into one >>>>>>>>>> logical operator. For this, you'll probably need a shuffle >>>>>>>>>> implementation >>>>>>>>>> that allows two independent readers from the same shuffle session. >>>>>>>>>> >>>>>>>>>> How easy it is to implement reiterables that don't have to fit in >>>>>>>>>> memory will depend on the runner. For Dataflow it's possible >>>>>>>>>> because the >>>>>>>>>> shuffle session is logically persistent, so the runner can simply >>>>>>>>>> reread >>>>>>>>>> the shuffle session. For other runners with different shuffle >>>>>>>>>> implementations, it might be harder to support both properties. >>>>>>>>>> Maybe we >>>>>>>>>> should introduce a new @RequiresReiteration annotation on ParDo? >>>>>>>>>> That way >>>>>>>>>> the Spark runner can see this and switch to the in-memory version >>>>>>>>>> just for >>>>>>>>>> groupings consumed by those ParDos. Runners that already support >>>>>>>>>> reiteration can ignore this annotation, so it should be backwards >>>>>>>>>> compatible. >>>>>>>>>> >>>>>>>>>> Reuven >>>>>>>>>> >>>>>>>>>> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I'd like to know the use-case. Why would you *need* to actually >>>>>>>>>>> iterate the grouped elements twice? By definition the first >>>>>>>>>>> iteration would >>>>>>>>>>> have to extract some statistic (or subset of elements that must fit >>>>>>>>>>> into >>>>>>>>>>> memory). This statistic can then be used as another input for the >>>>>>>>>>> second >>>>>>>>>>> iteration. Why not then calculate the statistic in a separate >>>>>>>>>>> branch in the >>>>>>>>>>> pipeline and feed it then into the ParDo as side input? That would >>>>>>>>>>> be >>>>>>>>>>> definitely more efficient, because the calculation of the statistic >>>>>>>>>>> would >>>>>>>>>>> be probably combinable (not sure if it is absolutely required to be >>>>>>>>>>> combinable, but it seems probable). Even if the calculation would >>>>>>>>>>> not be >>>>>>>>>>> combinable, it is not less efficient than reiterating twice. Why >>>>>>>>>>> then >>>>>>>>>>> support multiple iterations (besides the fact that output of GBK is >>>>>>>>>>> Iterable). Am I missing something? >>>>>>>>>>> >>>>>>>>>>> Jan >>>>>>>>>>> On 9/27/19 6:36 PM, Reuven Lax wrote: >>>>>>>>>>> >>>>>>>>>>> This should be an element in the compatibility matrix as well. >>>>>>>>>>> >>>>>>>>>>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I am pretty surprised that we do not have >>>>>>>>>>>> a @Category(ValidatesRunner) test in GroupByKeyTest that iterates >>>>>>>>>>>> multiple >>>>>>>>>>>> times. That is a major oversight. We should have this test, and it >>>>>>>>>>>> can be >>>>>>>>>>>> disabled by the SparkRunner's configuration. >>>>>>>>>>>> >>>>>>>>>>>> Kenn >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> The Dataflow version does not spill to disk. However Spark's >>>>>>>>>>>>> design might require spilling to disk if you want that to be >>>>>>>>>>>>> implemented >>>>>>>>>>>>> properly. >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <[email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Spark's GBK is currently implemented using `sortBy(key and >>>>>>>>>>>>>> value).mapPartition(...)` for non-merging windowing in order to >>>>>>>>>>>>>> support >>>>>>>>>>>>>> large keys and large scale shuffles. Merging windowing is >>>>>>>>>>>>>> implemented using >>>>>>>>>>>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash >>>>>>>>>>>>>> Grouping), >>>>>>>>>>>>>> which is by design unable to support large keys. >>>>>>>>>>>>>> >>>>>>>>>>>>>> As Jan noted, problem with mapPartition is, that its UDF >>>>>>>>>>>>>> receives an Iterator. Only option here is to wrap this iterator >>>>>>>>>>>>>> to one that >>>>>>>>>>>>>> spills to disk once an internal buffer is exceeded (the approach >>>>>>>>>>>>>> suggested >>>>>>>>>>>>>> by Reuven). This unfortunately comes with a cost in some cases. >>>>>>>>>>>>>> The best >>>>>>>>>>>>>> approach would be to somehow determine, that user wants multiple >>>>>>>>>>>>>> iterations >>>>>>>>>>>>>> and than wrap it in "re-iterator" if necessary. Does anyone have >>>>>>>>>>>>>> any ideas >>>>>>>>>>>>>> how to approach this? >>>>>>>>>>>>>> >>>>>>>>>>>>>> D. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> The Beam API was written to support multiple iterations, and >>>>>>>>>>>>>>> there are definitely transforms that do so. I believe that >>>>>>>>>>>>>>> CoGroupByKey may >>>>>>>>>>>>>>> do this as well with the resulting iterator. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I know that the Dataflow runner is able to handles iterators >>>>>>>>>>>>>>> larger than available memory by paging them in from shuffle, >>>>>>>>>>>>>>> which still >>>>>>>>>>>>>>> allows for reiterating. It sounds like Spark is less flexible >>>>>>>>>>>>>>> here? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Reuven >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> +dev <[email protected]> <[email protected]> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Lukasz, why do you think that users expect to be able to >>>>>>>>>>>>>>>> iterate multiple times grouped elements? Besides that it >>>>>>>>>>>>>>>> obviously suggests >>>>>>>>>>>>>>>> the 'Iterable'? The way that spark behaves is pretty much >>>>>>>>>>>>>>>> analogous to how >>>>>>>>>>>>>>>> MapReduce used to work - in certain cases it calles >>>>>>>>>>>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, >>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>> accepts Iterator - that is because internally it merge sorts >>>>>>>>>>>>>>>> pre sorted >>>>>>>>>>>>>>>> segments. This approach enables to GroupByKey data sets that >>>>>>>>>>>>>>>> are too big to >>>>>>>>>>>>>>>> fit into memory (per key). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If multiple iterations should be expected by users, we >>>>>>>>>>>>>>>> probably should: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> a) include that in @ValidatesRunner tests >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> b) store values in memory on spark, which will break for >>>>>>>>>>>>>>>> certain pipelines >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Because of (b) I think that it would be much better to >>>>>>>>>>>>>>>> remove this "expectation" and clearly document that the >>>>>>>>>>>>>>>> Iterable is not >>>>>>>>>>>>>>>> supposed to be iterated multiple times. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Jan >>>>>>>>>>>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I pretty much think so, because that is how Spark works. >>>>>>>>>>>>>>>> The Iterable inside is really an Iterator, which cannot be >>>>>>>>>>>>>>>> iterated >>>>>>>>>>>>>>>> multiple times. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Jan >>>>>>>>>>>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Jan, in Beam users expect to be able to iterate the GBK >>>>>>>>>>>>>>>> output multiple times even from within the same ParDo. >>>>>>>>>>>>>>>> Is this something that Beam on Spark Runner never supported? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Gershi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> could you please outline the pipeline you are trying to >>>>>>>>>>>>>>>>> execute? Basically, you cannot iterate the Iterable multiple >>>>>>>>>>>>>>>>> times in >>>>>>>>>>>>>>>>> single ParDo. It should be possible, though, to apply >>>>>>>>>>>>>>>>> multiple ParDos to >>>>>>>>>>>>>>>>> output from GroupByKey. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Jan >>>>>>>>>>>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I want to iterate multiple times on the Iterable<V> (the >>>>>>>>>>>>>>>>> output of GroupByKey transformation) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When my Runner is SparkRunner, I get an exception: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator >>>>>>>>>>>>>>>>> can't be iterated more than once,otherwise there could be >>>>>>>>>>>>>>>>> data lost >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> java.lang.Iterable.spliterator(Iterable.java:101) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I understood I can branch the pipeline after GroupByKey >>>>>>>>>>>>>>>>> into multiple transformation and iterate in each of them once >>>>>>>>>>>>>>>>> on the >>>>>>>>>>>>>>>>> Iterable<V>. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Is there a better way for that? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam >>>>>>>>>>>>>>>>> Gershi* >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Software Developer >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [image: Mail_signature_blue] >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>
