Putting a stateful dofn after a GBK is not completely redundant - the element type changes, so it is different than just having .a stateful dofn. However it is a weird thing to do, and usually not optimal (especially because many runners might insert two shuffles in this case).
On Wed, Oct 2, 2019 at 2:24 AM Jan Lukavský <je...@seznam.cz> wrote: > +1 > > The difference between GroupByKeyOneShot and Reduce(By|Per)Key is probably > only in that in the first case one can pass the result to a stateful ParDo. > The latter has a more strict semantics, so that user is a little limited > about what he can do with the result of the grouping. It seems to me, > though, that applying a stateful operation on result of grouping makes > little sense, because stateful operation performs this grouping (keying) > automatically, so the preceding GroupBeyKeyOneShot would be somewhat > redundant. But maybe someone can provide a different insight. > On 10/2/19 2:34 AM, Robert Bradshaw wrote: > > For this specific usecase, I would suggest this be done via > PTranform URNs. E.g. one could have a GroupByKeyOneShot whose > implementation is > > input > .apply(GroupByKey.of() > .apply(kv -> KV.of(kv.key(), kv.iterator()) > > A runner would be free to recognize and optimize this in the graph (based > on its urn) and swap out a more efficient implementation. Of course a > Coder<Iterator> would have to be introduced, and the semantics of > PCollection<Iterator> are a bit odd due to the inherently mutable nature of > Iterators. (Possibly a ReducePerKey transform would be a better > abstraction.) > > > On Tue, Oct 1, 2019 at 2:16 AM Jan Lukavský <je...@seznam.cz> wrote: > >> The car analogy was meant to say, that in real world you have to make >> decision before you take any action. There is no retroactivity possible. >> >> Reuven pointed out, that it is possible (although it seems a little weird >> to me, but that is the only thing I can tell against it :-)), that the way >> a grouped PCollection is produced might be out of control of a consuming >> operator. One example of this might be, that the grouping is produced in a >> submodule (some library), but still, the consumer wants to be able to >> specify if he wants or doesn't want reiterations. There still is a >> "classical" solution to this - the library might expose an interface to >> specify a factory for the grouped PCollection, so that the user of the >> library will be able to specify what he wants. But we can say, that we >> don't want to force users (or authors of libraries) to do that. That's okay >> for me. >> >> If we move on, our next option might be to specify the annotation on the >> consumer (as suggested), but that has all the "not really nice" properties >> of being counter-intuitive, ignoring strong types, etc., etc., for which >> reason I think that this should be ruled out as well. >> >> This leaves us with a single option (at least I have not figured out any >> other) - which is we can bundle GBK and associated ParDo into atomic >> PTransform, which can then be overridden by runners that need special >> handling of this situation - these are all runners that need buffer data to >> memory in order to support reiterations (spark and flink, note that this >> problem arises only for batch case, because in streaming case, one can >> reasonably assume that the data resides in a state that supports >> reiterations). But - we already have this PTransform in Euphoria, it is >> called ReduceByKey, and has all the required properties (technically, it is >> not a PTransform now, but that is a minor detail and can be changed >> trivially). >> >> So, the direction I was trying to take this discussion was - what could >> be the best way for a runner to natively support a PTransform from a DSL? I >> can imagine several options: >> >> a) support it directly and let runners depend on the DSL (compileOnly >> dependency might suffice, because users will include the DSL into their >> code to be able to use it) >> >> b) create an interface in runners for user-code to be able to provide >> translation for user-specified operators (this could be absolutely generic, >> DSLs might just use this feature the same way any user could), after all >> runners already use a concept of Translator, but that is pretty much >> copy-pasted, not abstracted into a general purpose one >> >> c) move the operators that need to be translated into core >> >> The option (c) then leaves open questions related to - if we would want >> to move other operators to core, would this be the right time to ask >> questions if our current set of "core" operators is the ideal one? Or could >> this be optimized? >> >> Jan >> On 10/1/19 12:32 AM, Kenneth Knowles wrote: >> >> In the car analogy, you have something this: >> >> Iterable: car >> Iterator: taxi ride >> >> They are related, but not as variations of a common concept. >> >> In the discussion of Combine vs RSBK, if the reducer is required to be an >> associative and commutative operator, then it is the same thing under a >> different name. If the reducer can be non-associative or non-commutative, >> then it admits fewer transformations/optimizations. >> >> If you introduce a GroupIteratorsByKey and implement GroupByKey as a >> transform that combines the iterator by concatenation, I think you do get >> an internally consistent system. To execute efficiently, you need to always >> identify and replace the GroupByKey operation with a primitive one. It does >> make some sense to expose the weakest primitives for the sake of DSLs. But >> they are very poorly suited for end-users, and for GBK on most runners you >> get the more powerful one for free. >> >> Kenn >> >> On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> > The fact that the annotation on the ParDo "changes" the GroupByKey >>> implementation is very specific to the Spark runner implementation. >>> >>> I don't quite agree. It is not very specific to Spark, it is specific to >>> generally all runners, that produce grouped elements in a way that is not >>> reiterable. That is the key property. The example you gave with HDFS does >>> not satisfy this condition (files on HDFS are certainly reiterable), and >>> that's why no change to the GBK is needed (it actually already has the >>> required property). A quick look at what FlinkRunner (at least non portable >>> does) is that it implements GBK using reducing elements into List. That is >>> going to crash on big PCollection, which is even nicely documented: >>> >>> * <p>For internal use to translate {@link GroupByKey}. For a large >>> {@link PCollection} this is >>> * expected to crash! >>> >>> >>> If this is fixed, then it is likely to start behave the same as Spark. >>> So actually I think the opposite is true - Dataflow is a special case, >>> because of how its internal shuffle service works. >>> >>> > In general I sympathize with the worry about non-local effects. Beam >>> is already full of them (e.g. a Window.into statement effects downstream >>> GroupByKeys). In each case where they were added there was extensive debate >>> and discussion (Windowing semantics were debated for many months), exactly >>> because there was concern over adding these non-local effects. In every >>> case, no other good solution could be found. For the case of windowing for >>> example, it was often easy to propose simple local APIs (e.g. just pass the >>> window fn as a parameter to GroupByKey), however all of these local >>> solutions ended up not working for important use cases when we analyzed >>> them more deeply. >>> >>> That is very interesting. Could you elaborate more about some examples >>> of the use cases which didn't work? I'd like to try to match it against how >>> Euphoria is structured, it should be more resistant to this non-local >>> effects, because it very often bundles together multiple Beam's primitives >>> to single transform - ReduceByKey is one example of this, if is actually >>> mix of Window.into() + GBK + ParDo, Although it might look like if this >>> transform can be broken down to something else, then it is not primitive >>> (euphoria has no native equivalent of GBK itself), but it has several other >>> nice implications - that is that Combine now becomes a special case of RBK. >>> It now becomes only a question of where and how you can "run" the reduce >>> function. The logic is absolutely equal. This can be worked in more detail >>> and actually show, that even Combine and RBK can be decribed by a more >>> general stateful operation (ReduceStateByKey), and so finally Euphoria >>> actually has only two really "primitive" operations - these are FlatMap >>> (basically stateless ParDo) and RSBK. As I already mentioned on some other >>> thread, when stateful ParDo would support merging windows, it can be shown >>> that both Combine and GBK become special cases of this. >>> >>> > As you mentioned below, I do think it's perfectly reasonable for a >>> DSL to impose its own semantics. Scio already does this - the raw Beam API >>> is used by a DSL as a substrate, but the DSL does not need to blindly >>> mirror the semantics of the raw Beam API - at least in my opinion! >>> >>> Sure, but currently, there is no way for DSL to "hook" into runner, so >>> it has to use raw Beam SDK, and so this will fail in cases like this - >>> where Beam actually has stronger guarantees than it is required by the DSL. >>> It would be cool if we could find a way to do that - this pretty much >>> aligns with another question raised on ML, about the possibility to >>> override a default implementation of a PTransform for specific pipeline. >>> >>> Jan >>> >>> >>> On 9/29/19 7:46 PM, Reuven Lax wrote: >>> >>> Jan, >>> >>> The fact that the annotation on the ParDo "changes" the GroupByKey >>> implementation is very specific to the Spark runner implementation. You can >>> imagine another runner that simply writes out files in HDFS to implement a >>> GroupByKey - this GroupByKey implementation is agnostic whether the result >>> will be reiterated or not; in this case it is very much the ParDo >>> implementation that changes to implement a reiterable. vI think you don't >>> like the fact that an annotation on the ParDo will have a non-local effect >>> on the implementation of the GroupByKey upstream. However arguably the >>> non-local effect is just a quirk of how the Spark runner is implemented - >>> other runners might have a local effect. >>> >>> In general I sympathize with the worry about non-local effects. Beam is >>> already full of them (e.g. a Window.into statement effects downstream >>> GroupByKeys). In each case where they were added there was extensive debate >>> and discussion (Windowing semantics were debated for many months), exactly >>> because there was concern over adding these non-local effects. In every >>> case, no other good solution could be found. For the case of windowing for >>> example, it was often easy to propose simple local APIs (e.g. just pass the >>> window fn as a parameter to GroupByKey), however all of these local >>> solutions ended up not working for important use cases when we analyzed >>> them more deeply. >>> >>> As you mentioned below, I do think it's perfectly reasonable for a DSL >>> to impose its own semantics. Scio already does this - the raw Beam API is >>> used by a DSL as a substrate, but the DSL does not need to blindly mirror >>> the semantics of the raw Beam API - at least in my opinion! >>> >>> Reuven >>> >>> On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> I understand the concerns. Still, it looks a little like we want to be >>>> able to modify behavior of an object from inside a submodule - quite like >>>> if my subprogram would accept a Map interface, but internally I would say >>>> "hey, this is supposed to be a HashMap, please change it so". Because of >>>> how pipeline is constructed, we can do that, the question is if there >>>> really isn't a better solution. >>>> >>>> What I do not like about the proposed solution: >>>> >>>> 1) to specify that the grouped elements are supposed to be iterated >>>> only once can be done only on ParDo, although there are other (higher >>>> level) PTransforms, that can consume output of GBK >>>> >>>> 2) the annontation on ParDo is by definition generic - i.e. can be >>>> used on input which is not output of GBK, which makes no sense >>>> >>>> 3) we modify the behavior to unlock some optimizations (or change of >>>> behavior of the GBK itself), users will not understand that >>>> >>>> 4) the annotation somewhat arbitrarily modifies data types passed, >>>> that is counter-intuitive and will be source of confusion >>>> >>>> I think that a solution that solves the above problems (but brings >>>> somoe new, as always :-)), could be to change the output of GBK from >>>> PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can >>>> control which operators (and how) consume the grouping, and we can enable >>>> these transforms to specify additional parameters (like how they want to >>>> consume the grouping). It is obviously a breaking change (although can be >>>> probably made backwards compatible) and it would very much likely involve a >>>> substantial work. But maybe there are some other not yet discussed options. >>>> >>>> Jan >>>> On 9/28/19 6:46 AM, Reuven Lax wrote: >>>> >>>> In many cases, the writer of the ParDo has no access to the GBK (e.g. >>>> the GBK is hidden inside an upstream PTransform that they cannot modify). >>>> This is the same reason why RequiresStableInput was made a property of the >>>> ParDo, because the GroupByKey is quite often inaccessible. >>>> >>>> The car analogy doesn't quite apply here, because the runner does have >>>> a full view of the pipeline so can satisfy all constraints. The car >>>> dealership generally cannot read your mind (thankfully!), so you have to >>>> specify what you want. Or to put it another way, the various transforms in >>>> a Beam pipeline do not live in isolation. The full pipeline graph is what >>>> is executed, and the runner already has to analyze the full graph to run >>>> the pipeline (as well as to optimize the pipeline). >>>> >>>> Reuven >>>> >>>> On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> I'd suggest Stream instead of Iterator, it has the same semantics and >>>>> much better API. >>>>> >>>>> Still not sure, what is wrong on letting the GBK to decide this. I >>>>> have an analogy - if I decide to buy a car, I have to decide *what* car >>>>> I'm >>>>> going to buy (by think about how I'm going to use it) *before* I buy it. I >>>>> cannot just buy "a car" and then change it from minivan to sport car based >>>>> on my actual need. Same with the GBK - if I want to be able to reiterate >>>>> the result, then I should tell it in advance. >>>>> >>>>> Jan >>>>> On 9/27/19 10:50 PM, Kenneth Knowles wrote: >>>>> >>>>> Good point about sibling fusion requiring this. >>>>> >>>>> The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does imply >>>>> that the output iterable can be iterated arbitrarily many times. >>>>> >>>>> I think this should remain the default for all the reasons mentioned. >>>>> >>>>> We could have opt-in to the weaker KV<K, Iterator<V>> version. Agree >>>>> that this is a property of the ParDo. A particular use of a GBK has no >>>>> idea >>>>> what is downstream. If you owned the whole pipeline, a special >>>>> ParDo<Iterator<V>, Foo> would work. But to make the types line up, this >>>>> would require changes upstream, which is not good. >>>>> >>>>> Maybe something like this: >>>>> >>>>> ParDo<Iterable<V>, Foo> { >>>>> @ProcessElement >>>>> void process(@OneShotIterator Iterator<V> iter) { >>>>> ... >>>>> } >>>>> } >>>>> >>>>> I've described all of this in terms of Java SDK. So we would need a >>>>> portable representation for all this metadata. >>>>> >>>>> Kenn >>>>> >>>>> On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> I think the behavior to make explicit is the need to reiterate, not >>>>>> the need to handle large results. How large of a result can be handled >>>>>> will >>>>>> always be dependent on the runner, and each runner will probably have a >>>>>> different definition of large keys. Reiteration however is a logical >>>>>> difference in the programming API. Therefore I think it makes sense to >>>>>> specify the latter. The need to reiterate is a property of the downstream >>>>>> ParDo, so it should be specified there - not on the GBK. >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <je...@seznam.cz> >>>>>> wrote: >>>>>> >>>>>>> Ok, I think I understand there might be some benefits of this. Then >>>>>>> I'd propose we make this clear on the GBK. If we would support somehing >>>>>>> like this: >>>>>>> >>>>>>> PCollection<?> input = ....; >>>>>>> >>>>>>> input.apply(GroupByKey.withLargeKeys()); >>>>>>> >>>>>>> then SparkRunner could expand this to >>>>>>> repartitionAndSortWithinPartitions only on this PTransform, and >>>>>>> fallback to >>>>>>> the default (in memory) in other situations. The default expansion of >>>>>>> LargeGroupByKey (let's say) would be classic GBK, so that only runners >>>>>>> that >>>>>>> need to make sure that they don't break reiterations can expand this. >>>>>>> >>>>>>> WDYT? >>>>>>> >>>>>>> Jan >>>>>>> On 9/27/19 8:56 PM, Reuven Lax wrote: >>>>>>> >>>>>>> As I mentioned above, CoGroupByKey already takes advantage of this. >>>>>>> Reiterating is not the most common use case, but it's definitely one >>>>>>> that >>>>>>> comes up. Also keep in mind that this API has supported reiterating for >>>>>>> the >>>>>>> past five years (since even before the SDK was donated to Apache). >>>>>>> Therefore you should assume that users are relying on it, in ways we >>>>>>> might >>>>>>> not expect. >>>>>>> >>>>>>> Historically, Google's Flume system had collections that did not >>>>>>> support reiterating (they were even called OneShotCollections to make it >>>>>>> clear). This was the source of problems and user frustration, which was >>>>>>> one >>>>>>> reason that in the original Dataflow SDK we made sure that these >>>>>>> iterables >>>>>>> could be reiterated. Another reason why it's advantageous for a runner >>>>>>> to >>>>>>> support this is allowing for better fusion. If two ParDos A and B both >>>>>>> read >>>>>>> from the same GroupByKey, it is nice to be able to fuse them into one >>>>>>> logical operator. For this, you'll probably need a shuffle >>>>>>> implementation >>>>>>> that allows two independent readers from the same shuffle session. >>>>>>> >>>>>>> How easy it is to implement reiterables that don't have to fit in >>>>>>> memory will depend on the runner. For Dataflow it's possible because >>>>>>> the >>>>>>> shuffle session is logically persistent, so the runner can simply reread >>>>>>> the shuffle session. For other runners with different shuffle >>>>>>> implementations, it might be harder to support both properties. Maybe we >>>>>>> should introduce a new @RequiresReiteration annotation on ParDo? That >>>>>>> way >>>>>>> the Spark runner can see this and switch to the in-memory version just >>>>>>> for >>>>>>> groupings consumed by those ParDos. Runners that already support >>>>>>> reiteration can ignore this annotation, so it should be backwards >>>>>>> compatible. >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz> >>>>>>> wrote: >>>>>>> >>>>>>>> I'd like to know the use-case. Why would you *need* to actually >>>>>>>> iterate the grouped elements twice? By definition the first iteration >>>>>>>> would >>>>>>>> have to extract some statistic (or subset of elements that must fit >>>>>>>> into >>>>>>>> memory). This statistic can then be used as another input for the >>>>>>>> second >>>>>>>> iteration. Why not then calculate the statistic in a separate branch >>>>>>>> in the >>>>>>>> pipeline and feed it then into the ParDo as side input? That would be >>>>>>>> definitely more efficient, because the calculation of the statistic >>>>>>>> would >>>>>>>> be probably combinable (not sure if it is absolutely required to be >>>>>>>> combinable, but it seems probable). Even if the calculation would not >>>>>>>> be >>>>>>>> combinable, it is not less efficient than reiterating twice. Why then >>>>>>>> support multiple iterations (besides the fact that output of GBK is >>>>>>>> Iterable). Am I missing something? >>>>>>>> >>>>>>>> Jan >>>>>>>> On 9/27/19 6:36 PM, Reuven Lax wrote: >>>>>>>> >>>>>>>> This should be an element in the compatibility matrix as well. >>>>>>>> >>>>>>>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I am pretty surprised that we do not have >>>>>>>>> a @Category(ValidatesRunner) test in GroupByKeyTest that iterates >>>>>>>>> multiple >>>>>>>>> times. That is a major oversight. We should have this test, and it >>>>>>>>> can be >>>>>>>>> disabled by the SparkRunner's configuration. >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> The Dataflow version does not spill to disk. However Spark's >>>>>>>>>> design might require spilling to disk if you want that to be >>>>>>>>>> implemented >>>>>>>>>> properly. >>>>>>>>>> >>>>>>>>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Spark's GBK is currently implemented using `sortBy(key and >>>>>>>>>>> value).mapPartition(...)` for non-merging windowing in order to >>>>>>>>>>> support >>>>>>>>>>> large keys and large scale shuffles. Merging windowing is >>>>>>>>>>> implemented using >>>>>>>>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash >>>>>>>>>>> Grouping), >>>>>>>>>>> which is by design unable to support large keys. >>>>>>>>>>> >>>>>>>>>>> As Jan noted, problem with mapPartition is, that its UDF >>>>>>>>>>> receives an Iterator. Only option here is to wrap this iterator to >>>>>>>>>>> one that >>>>>>>>>>> spills to disk once an internal buffer is exceeded (the approach >>>>>>>>>>> suggested >>>>>>>>>>> by Reuven). This unfortunately comes with a cost in some cases. The >>>>>>>>>>> best >>>>>>>>>>> approach would be to somehow determine, that user wants multiple >>>>>>>>>>> iterations >>>>>>>>>>> and than wrap it in "re-iterator" if necessary. Does anyone have >>>>>>>>>>> any ideas >>>>>>>>>>> how to approach this? >>>>>>>>>>> >>>>>>>>>>> D. >>>>>>>>>>> >>>>>>>>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> The Beam API was written to support multiple iterations, and >>>>>>>>>>>> there are definitely transforms that do so. I believe that >>>>>>>>>>>> CoGroupByKey may >>>>>>>>>>>> do this as well with the resulting iterator. >>>>>>>>>>>> >>>>>>>>>>>> I know that the Dataflow runner is able to handles iterators >>>>>>>>>>>> larger than available memory by paging them in from shuffle, which >>>>>>>>>>>> still >>>>>>>>>>>> allows for reiterating. It sounds like Spark is less flexible here? >>>>>>>>>>>> >>>>>>>>>>>> Reuven >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org> >>>>>>>>>>>>> >>>>>>>>>>>>> Lukasz, why do you think that users expect to be able to >>>>>>>>>>>>> iterate multiple times grouped elements? Besides that it >>>>>>>>>>>>> obviously suggests >>>>>>>>>>>>> the 'Iterable'? The way that spark behaves is pretty much >>>>>>>>>>>>> analogous to how >>>>>>>>>>>>> MapReduce used to work - in certain cases it calles >>>>>>>>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, >>>>>>>>>>>>> which >>>>>>>>>>>>> accepts Iterator - that is because internally it merge sorts pre >>>>>>>>>>>>> sorted >>>>>>>>>>>>> segments. This approach enables to GroupByKey data sets that are >>>>>>>>>>>>> too big to >>>>>>>>>>>>> fit into memory (per key). >>>>>>>>>>>>> >>>>>>>>>>>>> If multiple iterations should be expected by users, we >>>>>>>>>>>>> probably should: >>>>>>>>>>>>> >>>>>>>>>>>>> a) include that in @ValidatesRunner tests >>>>>>>>>>>>> >>>>>>>>>>>>> b) store values in memory on spark, which will break for >>>>>>>>>>>>> certain pipelines >>>>>>>>>>>>> >>>>>>>>>>>>> Because of (b) I think that it would be much better to remove >>>>>>>>>>>>> this "expectation" and clearly document that the Iterable is not >>>>>>>>>>>>> supposed >>>>>>>>>>>>> to be iterated multiple times. >>>>>>>>>>>>> >>>>>>>>>>>>> Jan >>>>>>>>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> I pretty much think so, because that is how Spark works. The >>>>>>>>>>>>> Iterable inside is really an Iterator, which cannot be iterated >>>>>>>>>>>>> multiple >>>>>>>>>>>>> times. >>>>>>>>>>>>> >>>>>>>>>>>>> Jan >>>>>>>>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Jan, in Beam users expect to be able to iterate the GBK output >>>>>>>>>>>>> multiple times even from within the same ParDo. >>>>>>>>>>>>> Is this something that Beam on Spark Runner never supported? >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Gershi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> could you please outline the pipeline you are trying to >>>>>>>>>>>>>> execute? Basically, you cannot iterate the Iterable multiple >>>>>>>>>>>>>> times in >>>>>>>>>>>>>> single ParDo. It should be possible, though, to apply multiple >>>>>>>>>>>>>> ParDos to >>>>>>>>>>>>>> output from GroupByKey. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Jan >>>>>>>>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I want to iterate multiple times on the Iterable<V> (the >>>>>>>>>>>>>> output of GroupByKey transformation) >>>>>>>>>>>>>> >>>>>>>>>>>>>> When my Runner is SparkRunner, I get an exception: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator >>>>>>>>>>>>>> can't be iterated more than once,otherwise there could be data >>>>>>>>>>>>>> lost >>>>>>>>>>>>>> >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221) >>>>>>>>>>>>>> >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.lang.Iterable.spliterator(Iterable.java:101) >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I understood I can branch the pipeline after GroupByKey into >>>>>>>>>>>>>> multiple transformation and iterate in each of them once on the >>>>>>>>>>>>>> Iterable<V>. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is there a better way for that? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi* >>>>>>>>>>>>>> >>>>>>>>>>>>>> Software Developer >>>>>>>>>>>>>> >>>>>>>>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718> >>>>>>>>>>>>>> >>>>>>>>>>>>>> [image: Mail_signature_blue] >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>