Putting a stateful dofn after a GBK is not completely redundant - the
element type changes, so it is different than just having .a stateful dofn.
However it is a weird thing to do, and usually not optimal (especially
because many runners might insert two shuffles in this case).

On Wed, Oct 2, 2019 at 2:24 AM Jan Lukavský <je...@seznam.cz> wrote:

> +1
>
> The difference between GroupByKeyOneShot and Reduce(By|Per)Key is probably
> only in that in the first case one can pass the result to a stateful ParDo.
> The latter has a more strict semantics, so that user is a little limited
> about what he can do with the result of the grouping. It seems to me,
> though, that applying a stateful operation on result of grouping makes
> little sense, because stateful operation performs this grouping (keying)
> automatically, so the preceding GroupBeyKeyOneShot would be somewhat
> redundant. But maybe someone can provide a different insight.
> On 10/2/19 2:34 AM, Robert Bradshaw wrote:
>
> For this specific usecase, I would suggest this be done via
> PTranform URNs. E.g. one could have a GroupByKeyOneShot whose
> implementation is
>
> input
>     .apply(GroupByKey.of()
>     .apply(kv -> KV.of(kv.key(), kv.iterator())
>
> A runner would be free to recognize and optimize this in the graph (based
> on its urn) and swap out a more efficient implementation. Of course a
> Coder<Iterator> would have to be introduced, and the semantics of
> PCollection<Iterator> are a bit odd due to the inherently mutable nature of
> Iterators. (Possibly a ReducePerKey transform would be a better
> abstraction.)
>
>
> On Tue, Oct 1, 2019 at 2:16 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> The car analogy was meant to say, that in real world you have to make
>> decision before you take any action. There is no retroactivity possible.
>>
>> Reuven pointed out, that it is possible (although it seems a little weird
>> to me, but that is the only thing I can tell against it :-)), that the way
>> a grouped PCollection is produced might be out of control of a consuming
>> operator. One example of this might be, that the grouping is produced in a
>> submodule (some library), but still, the consumer wants to be able to
>> specify if he wants or doesn't want reiterations. There still is a
>> "classical" solution to this - the library might expose an interface to
>> specify a factory for the grouped PCollection, so that the user of the
>> library will be able to specify what he wants. But we can say, that we
>> don't want to force users (or authors of libraries) to do that. That's okay
>> for me.
>>
>> If we move on, our next option might be to specify the annotation on the
>> consumer (as suggested), but that has all the "not really nice" properties
>> of being counter-intuitive, ignoring strong types, etc., etc., for which
>> reason I think that this should be ruled out as well.
>>
>> This leaves us with a single option (at least I have not figured out any
>> other) - which is we can bundle GBK and associated ParDo into atomic
>> PTransform, which can then be overridden by runners that need special
>> handling of this situation - these are all runners that need buffer data to
>> memory in order to support reiterations (spark and flink, note that this
>> problem arises only for batch case, because in streaming case, one can
>> reasonably assume that the data resides in a state that supports
>> reiterations). But - we already have this PTransform in Euphoria, it is
>> called ReduceByKey, and has all the required properties (technically, it is
>> not a PTransform now, but that is a minor detail and can be changed
>> trivially).
>>
>> So, the direction I was trying to take this discussion was - what could
>> be the best way for a runner to natively support a PTransform from a DSL? I
>> can imagine several options:
>>
>>  a) support it directly and let runners depend on the DSL (compileOnly
>> dependency might suffice, because users will include the DSL into their
>> code to be able to use it)
>>
>>  b) create an interface in runners for user-code to be able to provide
>> translation for user-specified operators (this could be absolutely generic,
>> DSLs might just use this feature the same way any user could), after all
>> runners already use a concept of Translator, but that is pretty much
>> copy-pasted, not abstracted into a general purpose one
>>
>>  c) move the operators that need to be translated into core
>>
>> The option (c) then leaves open questions related to - if we would want
>> to move other operators to core, would this be the right time to ask
>> questions if our current set of "core" operators is the ideal one? Or could
>> this be optimized?
>>
>> Jan
>> On 10/1/19 12:32 AM, Kenneth Knowles wrote:
>>
>> In the car analogy, you have something this:
>>
>>     Iterable: car
>>     Iterator: taxi ride
>>
>> They are related, but not as variations of a common concept.
>>
>> In the discussion of Combine vs RSBK, if the reducer is required to be an
>> associative and commutative operator, then it is the same thing under a
>> different name. If the reducer can be non-associative or non-commutative,
>> then it admits fewer transformations/optimizations.
>>
>> If you introduce a GroupIteratorsByKey and implement GroupByKey as a
>> transform that combines the iterator by concatenation, I think you do get
>> an internally consistent system. To execute efficiently, you need to always
>> identify and replace the GroupByKey operation with a primitive one. It does
>> make some sense to expose the weakest primitives for the sake of DSLs. But
>> they are very poorly suited for end-users, and for GBK on most runners you
>> get the more powerful one for free.
>>
>> Kenn
>>
>> On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> > The fact that the annotation on the ParDo "changes" the GroupByKey
>>> implementation is very specific to the Spark runner implementation.
>>>
>>> I don't quite agree. It is not very specific to Spark, it is specific to
>>> generally all runners, that produce grouped elements in a way that is not
>>> reiterable. That is the key property. The example you gave with HDFS does
>>> not satisfy this condition (files on HDFS are certainly reiterable), and
>>> that's why no change to the GBK is needed (it actually already has the
>>> required property). A quick look at what FlinkRunner (at least non portable
>>> does) is that it implements GBK using reducing elements into List. That is
>>> going to crash on big PCollection, which is even nicely documented:
>>>
>>>    * <p>For internal use to translate {@link GroupByKey}. For a large 
>>> {@link PCollection} this is
>>>    * expected to crash!
>>>
>>>
>>> If this is fixed, then it is likely to start behave the same as Spark.
>>> So actually I think the opposite is true - Dataflow is a special case,
>>> because of how its internal shuffle service works.
>>>
>>> > In general I sympathize with the worry about non-local effects. Beam
>>> is already full of them (e.g. a Window.into statement effects downstream
>>> GroupByKeys). In each case where they were added there was extensive debate
>>> and discussion (Windowing semantics were debated for many months), exactly
>>> because there was concern over adding these non-local effects. In every
>>> case, no other good solution could be found. For the case of windowing for
>>> example, it was often easy to propose simple local APIs (e.g. just pass the
>>> window fn as a parameter to GroupByKey), however all of these local
>>> solutions ended up not working for important use cases when we analyzed
>>> them more deeply.
>>>
>>> That is very interesting. Could you elaborate more about some examples
>>> of the use cases which didn't work? I'd like to try to match it against how
>>> Euphoria is structured, it should be more resistant to this non-local
>>> effects, because it very often bundles together multiple Beam's primitives
>>> to single transform - ReduceByKey is one example of this, if is actually
>>> mix of Window.into() + GBK + ParDo, Although it might look like if this
>>> transform can be broken down to something else, then it is not primitive
>>> (euphoria has no native equivalent of GBK itself), but it has several other
>>> nice implications - that is that Combine now becomes a special case of RBK.
>>> It now becomes only a question of where and how you can "run" the reduce
>>> function. The logic is absolutely equal. This can be worked in more detail
>>> and actually show, that even Combine and RBK can be decribed by a more
>>> general stateful operation (ReduceStateByKey), and so finally Euphoria
>>> actually has only two really "primitive" operations - these are FlatMap
>>> (basically stateless ParDo) and RSBK. As I already mentioned on some other
>>> thread, when stateful ParDo would support merging windows, it can be shown
>>> that both Combine and GBK become special cases of this.
>>>
>>> > As you mentioned below, I do think it's perfectly reasonable for a
>>> DSL to impose its own semantics. Scio already does this - the raw Beam API
>>> is used by a DSL as a substrate, but the DSL does not need to blindly
>>> mirror the semantics of the raw Beam API - at least in my opinion!
>>>
>>> Sure, but currently, there is no way for DSL to "hook" into runner, so
>>> it has to use raw Beam SDK, and so this will fail in cases like this -
>>> where Beam actually has stronger guarantees than it is required by the DSL.
>>> It would be cool if we could find a way to do that - this pretty much
>>> aligns with another question raised on ML, about the possibility to
>>> override a default implementation of a PTransform for specific pipeline.
>>>
>>> Jan
>>>
>>>
>>> On 9/29/19 7:46 PM, Reuven Lax wrote:
>>>
>>> Jan,
>>>
>>> The fact that the annotation on the ParDo "changes" the GroupByKey
>>> implementation is very specific to the Spark runner implementation. You can
>>> imagine another runner that simply writes out files in HDFS to implement a
>>> GroupByKey - this GroupByKey implementation is agnostic whether the result
>>> will be reiterated or not; in this case it is very much the ParDo
>>> implementation that changes to implement a reiterable. vI think you don't
>>> like the fact that an annotation on the ParDo will have a non-local effect
>>> on the implementation of the GroupByKey upstream. However arguably the
>>> non-local effect is just a quirk of how the Spark runner is implemented -
>>> other runners might have a local effect.
>>>
>>> In general I sympathize with the worry about non-local effects. Beam is
>>> already full of them (e.g. a Window.into statement effects downstream
>>> GroupByKeys). In each case where they were added there was extensive debate
>>> and discussion (Windowing semantics were debated for many months), exactly
>>> because there was concern over adding these non-local effects. In every
>>> case, no other good solution could be found. For the case of windowing for
>>> example, it was often easy to propose simple local APIs (e.g. just pass the
>>> window fn as a parameter to GroupByKey), however all of these local
>>> solutions ended up not working for important use cases when we analyzed
>>> them more deeply.
>>>
>>> As you mentioned below, I do think it's perfectly reasonable for a DSL
>>> to impose its own semantics. Scio already does this - the raw Beam API is
>>> used by a DSL as a substrate, but the DSL does not need to blindly mirror
>>> the semantics of the raw Beam API - at least in my opinion!
>>>
>>> Reuven
>>>
>>> On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> I understand the concerns. Still, it looks a little like we want to be
>>>> able to modify behavior of an object from inside a submodule - quite like
>>>> if my subprogram would accept a Map interface, but internally I would say
>>>> "hey, this is supposed to be a HashMap, please change it so". Because of
>>>> how pipeline is constructed, we can do that, the question is if there
>>>> really isn't a better solution.
>>>>
>>>> What I do not like about the proposed solution:
>>>>
>>>>  1) to specify that the grouped elements are supposed to be iterated
>>>> only once can be done only on ParDo, although there are other (higher
>>>> level) PTransforms, that can consume output of GBK
>>>>
>>>>  2) the annontation on ParDo is by definition generic - i.e. can be
>>>> used on input which is not output of GBK, which makes no sense
>>>>
>>>>  3) we modify the behavior to unlock some optimizations (or change of
>>>> behavior of the GBK itself), users will not understand that
>>>>
>>>>  4) the annotation somewhat arbitrarily modifies data types passed,
>>>> that is counter-intuitive and will be source of confusion
>>>>
>>>> I think that a solution that solves the above problems (but brings
>>>> somoe new, as always :-)), could be to change the output of GBK from
>>>> PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can
>>>> control which operators (and how) consume the grouping, and we can enable
>>>> these transforms to specify additional parameters (like how they want to
>>>> consume the grouping). It is obviously a breaking change (although can be
>>>> probably made backwards compatible) and it would very much likely involve a
>>>> substantial work. But maybe there are some other not yet discussed options.
>>>>
>>>> Jan
>>>> On 9/28/19 6:46 AM, Reuven Lax wrote:
>>>>
>>>> In many cases, the writer of the ParDo has no access to the GBK (e.g.
>>>> the GBK is hidden inside an upstream PTransform that they cannot modify).
>>>> This is the same reason why RequiresStableInput was made a property of the
>>>> ParDo, because the GroupByKey is quite often inaccessible.
>>>>
>>>> The car analogy doesn't quite apply here, because the runner does have
>>>> a full view of the pipeline so can satisfy all constraints. The car
>>>> dealership generally cannot read your mind (thankfully!), so you have to
>>>> specify what you want. Or to put it another way, the various transforms in
>>>> a Beam pipeline do not live in isolation. The full pipeline graph is what
>>>> is executed, and the runner already has to analyze the full graph to run
>>>> the pipeline (as well as to optimize the pipeline).
>>>>
>>>> Reuven
>>>>
>>>> On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> I'd suggest Stream instead of Iterator, it has the same semantics and
>>>>> much better API.
>>>>>
>>>>> Still not sure, what is wrong on letting the GBK to decide this. I
>>>>> have an analogy - if I decide to buy a car, I have to decide *what* car 
>>>>> I'm
>>>>> going to buy (by think about how I'm going to use it) *before* I buy it. I
>>>>> cannot just buy "a car" and then change it from minivan to sport car based
>>>>> on my actual need. Same with the GBK - if I want to be able to reiterate
>>>>> the result, then I should tell it in advance.
>>>>>
>>>>> Jan
>>>>> On 9/27/19 10:50 PM, Kenneth Knowles wrote:
>>>>>
>>>>> Good point about sibling fusion requiring this.
>>>>>
>>>>> The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does imply
>>>>> that the output iterable can be iterated arbitrarily many times.
>>>>>
>>>>> I think this should remain the default for all the reasons mentioned.
>>>>>
>>>>> We could have opt-in to the weaker KV<K, Iterator<V>> version. Agree
>>>>> that this is a property of the ParDo. A particular use of a GBK has no 
>>>>> idea
>>>>> what is downstream. If you owned the whole pipeline, a special
>>>>> ParDo<Iterator<V>, Foo> would work. But to make the types line up, this
>>>>> would require changes upstream, which is not good.
>>>>>
>>>>> Maybe something like this:
>>>>>
>>>>> ParDo<Iterable<V>, Foo> {
>>>>>   @ProcessElement
>>>>>   void process(@OneShotIterator Iterator<V> iter) {
>>>>>     ...
>>>>>   }
>>>>> }
>>>>>
>>>>> I've described all of this in terms of Java SDK. So we would need a
>>>>> portable representation for all this metadata.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> I think the behavior to make explicit is the need to reiterate, not
>>>>>> the need to handle large results. How large of a result can be handled 
>>>>>> will
>>>>>> always be dependent on the runner, and each runner will probably have a
>>>>>> different definition of large keys. Reiteration however is a logical
>>>>>> difference in the programming API. Therefore I think it makes sense to
>>>>>> specify the latter. The need to reiterate is a property of the downstream
>>>>>> ParDo, so it should be specified there - not on the GBK.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <je...@seznam.cz>
>>>>>> wrote:
>>>>>>
>>>>>>> Ok, I think I understand there might be some benefits of this. Then
>>>>>>> I'd propose we make this clear on the GBK. If we would support somehing
>>>>>>> like this:
>>>>>>>
>>>>>>>  PCollection<?> input = ....;
>>>>>>>
>>>>>>>  input.apply(GroupByKey.withLargeKeys());
>>>>>>>
>>>>>>> then SparkRunner could expand this to
>>>>>>> repartitionAndSortWithinPartitions only on this PTransform, and 
>>>>>>> fallback to
>>>>>>> the default (in memory) in other situations. The default expansion of
>>>>>>> LargeGroupByKey (let's say) would be classic GBK, so that only runners 
>>>>>>> that
>>>>>>> need to make sure that they don't break reiterations can expand this.
>>>>>>>
>>>>>>> WDYT?
>>>>>>>
>>>>>>> Jan
>>>>>>> On 9/27/19 8:56 PM, Reuven Lax wrote:
>>>>>>>
>>>>>>> As I mentioned above, CoGroupByKey already takes advantage of this.
>>>>>>> Reiterating is not the most common use case, but it's definitely one 
>>>>>>> that
>>>>>>> comes up. Also keep in mind that this API has supported reiterating for 
>>>>>>> the
>>>>>>> past five years (since even before the SDK was donated to Apache).
>>>>>>> Therefore you should assume that users are relying on it, in ways we 
>>>>>>> might
>>>>>>> not expect.
>>>>>>>
>>>>>>> Historically, Google's Flume system had collections that did not
>>>>>>> support reiterating (they were even called OneShotCollections to make it
>>>>>>> clear). This was the source of problems and user frustration, which was 
>>>>>>> one
>>>>>>> reason that in the original Dataflow SDK we made sure that these 
>>>>>>> iterables
>>>>>>> could be reiterated. Another reason why it's advantageous for a runner 
>>>>>>> to
>>>>>>> support this is allowing for better fusion. If two ParDos A and B both 
>>>>>>> read
>>>>>>> from the same GroupByKey, it is nice to be able to fuse them into one
>>>>>>> logical operator. For this, you'll probably need a shuffle 
>>>>>>> implementation
>>>>>>> that allows two independent readers from the same shuffle session.
>>>>>>>
>>>>>>> How easy it is to implement reiterables that don't have to fit in
>>>>>>> memory will depend on the runner.  For Dataflow it's possible because 
>>>>>>> the
>>>>>>> shuffle session is logically persistent, so the runner can simply reread
>>>>>>> the shuffle session. For other runners with different shuffle
>>>>>>> implementations, it might be harder to support both properties. Maybe we
>>>>>>> should introduce a new @RequiresReiteration annotation on ParDo? That 
>>>>>>> way
>>>>>>> the Spark runner can see this and switch to the in-memory version just 
>>>>>>> for
>>>>>>> groupings consumed by those ParDos. Runners that already support
>>>>>>> reiteration can ignore this annotation, so it should be backwards
>>>>>>> compatible.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'd like to know the use-case. Why would you *need* to actually
>>>>>>>> iterate the grouped elements twice? By definition the first iteration 
>>>>>>>> would
>>>>>>>> have to extract some statistic (or subset of elements that must fit 
>>>>>>>> into
>>>>>>>> memory). This statistic can then be used as another input for the 
>>>>>>>> second
>>>>>>>> iteration. Why not then calculate the statistic in a separate branch 
>>>>>>>> in the
>>>>>>>> pipeline and feed it then into the ParDo as side input? That would be
>>>>>>>> definitely more efficient, because the calculation of the statistic 
>>>>>>>> would
>>>>>>>> be probably combinable (not sure if it is absolutely required to be
>>>>>>>> combinable, but it seems probable). Even if the calculation would not 
>>>>>>>> be
>>>>>>>> combinable, it is not less efficient than reiterating twice. Why then
>>>>>>>> support multiple iterations (besides the fact that output of GBK is
>>>>>>>> Iterable). Am I missing something?
>>>>>>>>
>>>>>>>> Jan
>>>>>>>> On 9/27/19 6:36 PM, Reuven Lax wrote:
>>>>>>>>
>>>>>>>> This should be an element in the compatibility matrix as well.
>>>>>>>>
>>>>>>>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I am pretty surprised that we do not have
>>>>>>>>> a @Category(ValidatesRunner) test in GroupByKeyTest that iterates 
>>>>>>>>> multiple
>>>>>>>>> times. That is a major oversight. We should have this test, and it 
>>>>>>>>> can be
>>>>>>>>> disabled by the SparkRunner's configuration.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> The Dataflow version does not spill to disk. However Spark's
>>>>>>>>>> design might require spilling to disk if you want that to be 
>>>>>>>>>> implemented
>>>>>>>>>> properly.
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Spark's GBK is currently implemented using `sortBy(key and
>>>>>>>>>>> value).mapPartition(...)` for non-merging windowing in order to 
>>>>>>>>>>> support
>>>>>>>>>>> large keys and large scale shuffles. Merging windowing is 
>>>>>>>>>>> implemented using
>>>>>>>>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash 
>>>>>>>>>>> Grouping),
>>>>>>>>>>> which is by design unable to support large keys.
>>>>>>>>>>>
>>>>>>>>>>> As Jan noted, problem with mapPartition is, that its UDF
>>>>>>>>>>> receives an Iterator. Only option here is to wrap this iterator to 
>>>>>>>>>>> one that
>>>>>>>>>>> spills to disk once an internal buffer is exceeded (the approach 
>>>>>>>>>>> suggested
>>>>>>>>>>> by Reuven). This unfortunately comes with a cost in some cases. The 
>>>>>>>>>>> best
>>>>>>>>>>> approach would be to somehow determine, that user wants multiple 
>>>>>>>>>>> iterations
>>>>>>>>>>> and than wrap it in "re-iterator" if necessary. Does anyone have 
>>>>>>>>>>> any ideas
>>>>>>>>>>> how to approach this?
>>>>>>>>>>>
>>>>>>>>>>> D.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> The Beam API was written to support multiple iterations, and
>>>>>>>>>>>> there are definitely transforms that do so. I believe that 
>>>>>>>>>>>> CoGroupByKey may
>>>>>>>>>>>> do this as well with the resulting iterator.
>>>>>>>>>>>>
>>>>>>>>>>>> I know that the Dataflow runner is able to handles iterators
>>>>>>>>>>>> larger than available memory by paging them in from shuffle, which 
>>>>>>>>>>>> still
>>>>>>>>>>>> allows for reiterating. It sounds like Spark is less flexible here?
>>>>>>>>>>>>
>>>>>>>>>>>> Reuven
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Lukasz, why do you think that users expect to be able to
>>>>>>>>>>>>> iterate multiple times grouped elements? Besides that it 
>>>>>>>>>>>>> obviously suggests
>>>>>>>>>>>>> the 'Iterable'? The way that spark behaves is pretty much 
>>>>>>>>>>>>> analogous to how
>>>>>>>>>>>>> MapReduce used to work - in certain cases it calles
>>>>>>>>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, 
>>>>>>>>>>>>> which
>>>>>>>>>>>>> accepts Iterator - that is because internally it merge sorts pre 
>>>>>>>>>>>>> sorted
>>>>>>>>>>>>> segments. This approach enables to GroupByKey data sets that are 
>>>>>>>>>>>>> too big to
>>>>>>>>>>>>> fit into memory (per key).
>>>>>>>>>>>>>
>>>>>>>>>>>>> If multiple iterations should be expected by users, we
>>>>>>>>>>>>> probably should:
>>>>>>>>>>>>>
>>>>>>>>>>>>>  a) include that in @ValidatesRunner tests
>>>>>>>>>>>>>
>>>>>>>>>>>>>  b) store values in memory on spark, which will break for
>>>>>>>>>>>>> certain pipelines
>>>>>>>>>>>>>
>>>>>>>>>>>>> Because of (b) I think that it would be much better to remove
>>>>>>>>>>>>> this "expectation" and clearly document that the Iterable is not 
>>>>>>>>>>>>> supposed
>>>>>>>>>>>>> to be iterated multiple times.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jan
>>>>>>>>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> I pretty much think so, because that is how Spark works. The
>>>>>>>>>>>>> Iterable inside is really an Iterator, which cannot be iterated 
>>>>>>>>>>>>> multiple
>>>>>>>>>>>>> times.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jan
>>>>>>>>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jan, in Beam users expect to be able to iterate the GBK output
>>>>>>>>>>>>> multiple times even from within the same ParDo.
>>>>>>>>>>>>> Is this something that Beam on Spark Runner never supported?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Gershi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> could you please outline the pipeline you are trying to
>>>>>>>>>>>>>> execute? Basically, you cannot iterate the Iterable multiple 
>>>>>>>>>>>>>> times in
>>>>>>>>>>>>>> single ParDo. It should be possible, though, to apply multiple 
>>>>>>>>>>>>>> ParDos to
>>>>>>>>>>>>>> output from GroupByKey.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jan
>>>>>>>>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I want to iterate multiple times on the Iterable<V> (the
>>>>>>>>>>>>>> output of GroupByKey transformation)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator
>>>>>>>>>>>>>> can't be iterated more than once,otherwise there could be data 
>>>>>>>>>>>>>> lost
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>                 at
>>>>>>>>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>                 at
>>>>>>>>>>>>>> java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I understood I can branch the pipeline after GroupByKey into
>>>>>>>>>>>>>> multiple transformation and iterate in each of them once on the 
>>>>>>>>>>>>>> Iterable<V>.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is there a better way for that?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Software Developer
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [image: Mail_signature_blue]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Reply via email to