On Tue, Oct 1, 2019 at 5:35 PM Robert Bradshaw <rober...@google.com> wrote:

> For this specific usecase, I would suggest this be done via
> PTranform URNs. E.g. one could have a GroupByKeyOneShot whose
> implementation is
>
> input
>     .apply(GroupByKey.of()
>     .apply(kv -> KV.of(kv.key(), kv.iterator())
>

This is dual to what I clumsily was trying to say in my last paragraph. But
I agree that ReduceByKey is better, if we were to add any new primitive
transform. I very much dislike PCollection<Iterator> for just the reasons
you also mention.

I think the annotation route where @ProcessElement can accept a different
type of element seems less intrusive and more flexible.


> On Tue, Oct 1, 2019 at 2:16 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> The car analogy was meant to say, that in real world you have to make
>> decision before you take any action. There is no retroactivity possible.
>>
> Reuven pointed out, that it is possible (although it seems a little weird
>> to me, but that is the only thing I can tell against it :-)), that the way
>> a grouped PCollection is produced might be out of control of a consuming
>> operator. One example of this might be, that the grouping is produced in a
>> submodule (some library), but still, the consumer wants to be able to
>> specify if he wants or doesn't want reiterations.
>>
> Exactly. The person choosing to GroupByKey and the person writing the
one-shot ParDo must be assumed to be different people, in general.

FWIW I always think of the pipeline as a program and the runner as a
planner/optimizer. So always responsible for reordering and making physical
planning decisions like whether to create an iterable materialization or
just some streamed iterator.

If we move on, our next option might be to specify the annotation on the
>> consumer (as suggested), but that has all the "not really nice" properties
>> of being counter-intuitive, ignoring strong types, etc., etc., for which
>> reason I think that this should be ruled out as well.
>>
> In Beam we have taken the position that type checking the graph after it
is constructed is an early enough place to catch type errors (speaking for
Java). The validation that ParDo does on the DoFn is basically lightweight,
local, type checking. This is how we detect and type check stateful ParDo
transforms as well as splittable ParDo transforms. We also catch errors
that are not expressible in Java's type system.

If we were discussing just this Spark limitation/optimization this is a
very natural fit with what we already have: Give the runner all the
information about the nature of the transforms and user functions, and let
it make the best plan it can.

So to me the interesting part is that there is a DSL that wants to support
primitives that are strictly weaker than Beam's, in order to *only* allow
the oneshot path. Annotations are quite annoying for DSLs, as you may have
noticed for state & timers, so that is not a good fit. But the concepts
still work. I would suggest pivot this thread into how to allow a DSL
builder to directly provide a DoFnInvoke with DoFnSignature in order to
programmatically provide the same information that annotations are used.
Essentially exposing an IR to DSL authors rather than forcing them to work
with the source language meant for end users. Do you already have a
solution for this today?

Kenn

This leaves us with a single option (at least I have not figured out any
>> other) - which is we can bundle GBK and associated ParDo into atomic
>> PTransform, which can then be overridden by runners that need special
>> handling of this situation - these are all runners that need buffer data to
>> memory in order to support reiterations (spark and flink, note that this
>> problem arises only for batch case, because in streaming case, one can
>> reasonably assume that the data resides in a state that supports
>> reiterations). But - we already have this PTransform in Euphoria, it is
>> called ReduceByKey, and has all the required properties (technically, it is
>> not a PTransform now, but that is a minor detail and can be changed
>> trivially).
>>
>> So, the direction I was trying to take this discussion was - what could
>> be the best way for a runner to natively support a PTransform from a DSL? I
>> can imagine several options:
>>
>>  a) support it directly and let runners depend on the DSL (compileOnly
>> dependency might suffice, because users will include the DSL into their
>> code to be able to use it)
>>
>>  b) create an interface in runners for user-code to be able to provide
>> translation for user-specified operators (this could be absolutely generic,
>> DSLs might just use this feature the same way any user could), after all
>> runners already use a concept of Translator, but that is pretty much
>> copy-pasted, not abstracted into a general purpose one
>>
>>  c) move the operators that need to be translated into core
>>
>> The option (c) then leaves open questions related to - if we would want
>> to move other operators to core, would this be the right time to ask
>> questions if our current set of "core" operators is the ideal one? Or could
>> this be optimized?
>>
>> Jan
>> On 10/1/19 12:32 AM, Kenneth Knowles wrote:
>>
>> In the car analogy, you have something this:
>>
>>     Iterable: car
>>     Iterator: taxi ride
>>
>> They are related, but not as variations of a common concept.
>>
>> In the discussion of Combine vs RSBK, if the reducer is required to be an
>> associative and commutative operator, then it is the same thing under a
>> different name. If the reducer can be non-associative or non-commutative,
>> then it admits fewer transformations/optimizations.
>>
>> If you introduce a GroupIteratorsByKey and implement GroupByKey as a
>> transform that combines the iterator by concatenation, I think you do get
>> an internally consistent system. To execute efficiently, you need to always
>> identify and replace the GroupByKey operation with a primitive one. It does
>> make some sense to expose the weakest primitives for the sake of DSLs. But
>> they are very poorly suited for end-users, and for GBK on most runners you
>> get the more powerful one for free.
>>
>> Kenn
>>
>> On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> > The fact that the annotation on the ParDo "changes" the GroupByKey
>>> implementation is very specific to the Spark runner implementation.
>>>
>>> I don't quite agree. It is not very specific to Spark, it is specific to
>>> generally all runners, that produce grouped elements in a way that is not
>>> reiterable. That is the key property. The example you gave with HDFS does
>>> not satisfy this condition (files on HDFS are certainly reiterable), and
>>> that's why no change to the GBK is needed (it actually already has the
>>> required property). A quick look at what FlinkRunner (at least non portable
>>> does) is that it implements GBK using reducing elements into List. That is
>>> going to crash on big PCollection, which is even nicely documented:
>>>
>>>    * <p>For internal use to translate {@link GroupByKey}. For a large 
>>> {@link PCollection} this is
>>>    * expected to crash!
>>>
>>>
>>> If this is fixed, then it is likely to start behave the same as Spark.
>>> So actually I think the opposite is true - Dataflow is a special case,
>>> because of how its internal shuffle service works.
>>>
>>> > In general I sympathize with the worry about non-local effects. Beam
>>> is already full of them (e.g. a Window.into statement effects downstream
>>> GroupByKeys). In each case where they were added there was extensive debate
>>> and discussion (Windowing semantics were debated for many months), exactly
>>> because there was concern over adding these non-local effects. In every
>>> case, no other good solution could be found. For the case of windowing for
>>> example, it was often easy to propose simple local APIs (e.g. just pass the
>>> window fn as a parameter to GroupByKey), however all of these local
>>> solutions ended up not working for important use cases when we analyzed
>>> them more deeply.
>>>
>>> That is very interesting. Could you elaborate more about some examples
>>> of the use cases which didn't work? I'd like to try to match it against how
>>> Euphoria is structured, it should be more resistant to this non-local
>>> effects, because it very often bundles together multiple Beam's primitives
>>> to single transform - ReduceByKey is one example of this, if is actually
>>> mix of Window.into() + GBK + ParDo, Although it might look like if this
>>> transform can be broken down to something else, then it is not primitive
>>> (euphoria has no native equivalent of GBK itself), but it has several other
>>> nice implications - that is that Combine now becomes a special case of RBK.
>>> It now becomes only a question of where and how you can "run" the reduce
>>> function. The logic is absolutely equal. This can be worked in more detail
>>> and actually show, that even Combine and RBK can be decribed by a more
>>> general stateful operation (ReduceStateByKey), and so finally Euphoria
>>> actually has only two really "primitive" operations - these are FlatMap
>>> (basically stateless ParDo) and RSBK. As I already mentioned on some other
>>> thread, when stateful ParDo would support merging windows, it can be shown
>>> that both Combine and GBK become special cases of this.
>>>
>>> > As you mentioned below, I do think it's perfectly reasonable for a
>>> DSL to impose its own semantics. Scio already does this - the raw Beam API
>>> is used by a DSL as a substrate, but the DSL does not need to blindly
>>> mirror the semantics of the raw Beam API - at least in my opinion!
>>>
>>> Sure, but currently, there is no way for DSL to "hook" into runner, so
>>> it has to use raw Beam SDK, and so this will fail in cases like this -
>>> where Beam actually has stronger guarantees than it is required by the DSL.
>>> It would be cool if we could find a way to do that - this pretty much
>>> aligns with another question raised on ML, about the possibility to
>>> override a default implementation of a PTransform for specific pipeline.
>>>
>>> Jan
>>>
>>>
>>> On 9/29/19 7:46 PM, Reuven Lax wrote:
>>>
>>> Jan,
>>>
>>> The fact that the annotation on the ParDo "changes" the GroupByKey
>>> implementation is very specific to the Spark runner implementation. You can
>>> imagine another runner that simply writes out files in HDFS to implement a
>>> GroupByKey - this GroupByKey implementation is agnostic whether the result
>>> will be reiterated or not; in this case it is very much the ParDo
>>> implementation that changes to implement a reiterable. vI think you don't
>>> like the fact that an annotation on the ParDo will have a non-local effect
>>> on the implementation of the GroupByKey upstream. However arguably the
>>> non-local effect is just a quirk of how the Spark runner is implemented -
>>> other runners might have a local effect.
>>>
>>> In general I sympathize with the worry about non-local effects. Beam is
>>> already full of them (e.g. a Window.into statement effects downstream
>>> GroupByKeys). In each case where they were added there was extensive debate
>>> and discussion (Windowing semantics were debated for many months), exactly
>>> because there was concern over adding these non-local effects. In every
>>> case, no other good solution could be found. For the case of windowing for
>>> example, it was often easy to propose simple local APIs (e.g. just pass the
>>> window fn as a parameter to GroupByKey), however all of these local
>>> solutions ended up not working for important use cases when we analyzed
>>> them more deeply.
>>>
>>> As you mentioned below, I do think it's perfectly reasonable for a DSL
>>> to impose its own semantics. Scio already does this - the raw Beam API is
>>> used by a DSL as a substrate, but the DSL does not need to blindly mirror
>>> the semantics of the raw Beam API - at least in my opinion!
>>>
>>> Reuven
>>>
>>> On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> I understand the concerns. Still, it looks a little like we want to be
>>>> able to modify behavior of an object from inside a submodule - quite like
>>>> if my subprogram would accept a Map interface, but internally I would say
>>>> "hey, this is supposed to be a HashMap, please change it so". Because of
>>>> how pipeline is constructed, we can do that, the question is if there
>>>> really isn't a better solution.
>>>>
>>>> What I do not like about the proposed solution:
>>>>
>>>>  1) to specify that the grouped elements are supposed to be iterated
>>>> only once can be done only on ParDo, although there are other (higher
>>>> level) PTransforms, that can consume output of GBK
>>>>
>>>>  2) the annontation on ParDo is by definition generic - i.e. can be
>>>> used on input which is not output of GBK, which makes no sense
>>>>
>>>>  3) we modify the behavior to unlock some optimizations (or change of
>>>> behavior of the GBK itself), users will not understand that
>>>>
>>>>  4) the annotation somewhat arbitrarily modifies data types passed,
>>>> that is counter-intuitive and will be source of confusion
>>>>
>>>> I think that a solution that solves the above problems (but brings
>>>> somoe new, as always :-)), could be to change the output of GBK from
>>>> PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can
>>>> control which operators (and how) consume the grouping, and we can enable
>>>> these transforms to specify additional parameters (like how they want to
>>>> consume the grouping). It is obviously a breaking change (although can be
>>>> probably made backwards compatible) and it would very much likely involve a
>>>> substantial work. But maybe there are some other not yet discussed options.
>>>>
>>>> Jan
>>>> On 9/28/19 6:46 AM, Reuven Lax wrote:
>>>>
>>>> In many cases, the writer of the ParDo has no access to the GBK (e.g.
>>>> the GBK is hidden inside an upstream PTransform that they cannot modify).
>>>> This is the same reason why RequiresStableInput was made a property of the
>>>> ParDo, because the GroupByKey is quite often inaccessible.
>>>>
>>>> The car analogy doesn't quite apply here, because the runner does have
>>>> a full view of the pipeline so can satisfy all constraints. The car
>>>> dealership generally cannot read your mind (thankfully!), so you have to
>>>> specify what you want. Or to put it another way, the various transforms in
>>>> a Beam pipeline do not live in isolation. The full pipeline graph is what
>>>> is executed, and the runner already has to analyze the full graph to run
>>>> the pipeline (as well as to optimize the pipeline).
>>>>
>>>> Reuven
>>>>
>>>> On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> I'd suggest Stream instead of Iterator, it has the same semantics and
>>>>> much better API.
>>>>>
>>>>> Still not sure, what is wrong on letting the GBK to decide this. I
>>>>> have an analogy - if I decide to buy a car, I have to decide *what* car 
>>>>> I'm
>>>>> going to buy (by think about how I'm going to use it) *before* I buy it. I
>>>>> cannot just buy "a car" and then change it from minivan to sport car based
>>>>> on my actual need. Same with the GBK - if I want to be able to reiterate
>>>>> the result, then I should tell it in advance.
>>>>>
>>>>> Jan
>>>>> On 9/27/19 10:50 PM, Kenneth Knowles wrote:
>>>>>
>>>>> Good point about sibling fusion requiring this.
>>>>>
>>>>> The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does imply
>>>>> that the output iterable can be iterated arbitrarily many times.
>>>>>
>>>>> I think this should remain the default for all the reasons mentioned.
>>>>>
>>>>> We could have opt-in to the weaker KV<K, Iterator<V>> version. Agree
>>>>> that this is a property of the ParDo. A particular use of a GBK has no 
>>>>> idea
>>>>> what is downstream. If you owned the whole pipeline, a special
>>>>> ParDo<Iterator<V>, Foo> would work. But to make the types line up, this
>>>>> would require changes upstream, which is not good.
>>>>>
>>>>> Maybe something like this:
>>>>>
>>>>> ParDo<Iterable<V>, Foo> {
>>>>>   @ProcessElement
>>>>>   void process(@OneShotIterator Iterator<V> iter) {
>>>>>     ...
>>>>>   }
>>>>> }
>>>>>
>>>>> I've described all of this in terms of Java SDK. So we would need a
>>>>> portable representation for all this metadata.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> I think the behavior to make explicit is the need to reiterate, not
>>>>>> the need to handle large results. How large of a result can be handled 
>>>>>> will
>>>>>> always be dependent on the runner, and each runner will probably have a
>>>>>> different definition of large keys. Reiteration however is a logical
>>>>>> difference in the programming API. Therefore I think it makes sense to
>>>>>> specify the latter. The need to reiterate is a property of the downstream
>>>>>> ParDo, so it should be specified there - not on the GBK.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <je...@seznam.cz>
>>>>>> wrote:
>>>>>>
>>>>>>> Ok, I think I understand there might be some benefits of this. Then
>>>>>>> I'd propose we make this clear on the GBK. If we would support somehing
>>>>>>> like this:
>>>>>>>
>>>>>>>  PCollection<?> input = ....;
>>>>>>>
>>>>>>>  input.apply(GroupByKey.withLargeKeys());
>>>>>>>
>>>>>>> then SparkRunner could expand this to
>>>>>>> repartitionAndSortWithinPartitions only on this PTransform, and 
>>>>>>> fallback to
>>>>>>> the default (in memory) in other situations. The default expansion of
>>>>>>> LargeGroupByKey (let's say) would be classic GBK, so that only runners 
>>>>>>> that
>>>>>>> need to make sure that they don't break reiterations can expand this.
>>>>>>>
>>>>>>> WDYT?
>>>>>>>
>>>>>>> Jan
>>>>>>> On 9/27/19 8:56 PM, Reuven Lax wrote:
>>>>>>>
>>>>>>> As I mentioned above, CoGroupByKey already takes advantage of this.
>>>>>>> Reiterating is not the most common use case, but it's definitely one 
>>>>>>> that
>>>>>>> comes up. Also keep in mind that this API has supported reiterating for 
>>>>>>> the
>>>>>>> past five years (since even before the SDK was donated to Apache).
>>>>>>> Therefore you should assume that users are relying on it, in ways we 
>>>>>>> might
>>>>>>> not expect.
>>>>>>>
>>>>>>> Historically, Google's Flume system had collections that did not
>>>>>>> support reiterating (they were even called OneShotCollections to make it
>>>>>>> clear). This was the source of problems and user frustration, which was 
>>>>>>> one
>>>>>>> reason that in the original Dataflow SDK we made sure that these 
>>>>>>> iterables
>>>>>>> could be reiterated. Another reason why it's advantageous for a runner 
>>>>>>> to
>>>>>>> support this is allowing for better fusion. If two ParDos A and B both 
>>>>>>> read
>>>>>>> from the same GroupByKey, it is nice to be able to fuse them into one
>>>>>>> logical operator. For this, you'll probably need a shuffle 
>>>>>>> implementation
>>>>>>> that allows two independent readers from the same shuffle session.
>>>>>>>
>>>>>>> How easy it is to implement reiterables that don't have to fit in
>>>>>>> memory will depend on the runner.  For Dataflow it's possible because 
>>>>>>> the
>>>>>>> shuffle session is logically persistent, so the runner can simply reread
>>>>>>> the shuffle session. For other runners with different shuffle
>>>>>>> implementations, it might be harder to support both properties. Maybe we
>>>>>>> should introduce a new @RequiresReiteration annotation on ParDo? That 
>>>>>>> way
>>>>>>> the Spark runner can see this and switch to the in-memory version just 
>>>>>>> for
>>>>>>> groupings consumed by those ParDos. Runners that already support
>>>>>>> reiteration can ignore this annotation, so it should be backwards
>>>>>>> compatible.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'd like to know the use-case. Why would you *need* to actually
>>>>>>>> iterate the grouped elements twice? By definition the first iteration 
>>>>>>>> would
>>>>>>>> have to extract some statistic (or subset of elements that must fit 
>>>>>>>> into
>>>>>>>> memory). This statistic can then be used as another input for the 
>>>>>>>> second
>>>>>>>> iteration. Why not then calculate the statistic in a separate branch 
>>>>>>>> in the
>>>>>>>> pipeline and feed it then into the ParDo as side input? That would be
>>>>>>>> definitely more efficient, because the calculation of the statistic 
>>>>>>>> would
>>>>>>>> be probably combinable (not sure if it is absolutely required to be
>>>>>>>> combinable, but it seems probable). Even if the calculation would not 
>>>>>>>> be
>>>>>>>> combinable, it is not less efficient than reiterating twice. Why then
>>>>>>>> support multiple iterations (besides the fact that output of GBK is
>>>>>>>> Iterable). Am I missing something?
>>>>>>>>
>>>>>>>> Jan
>>>>>>>> On 9/27/19 6:36 PM, Reuven Lax wrote:
>>>>>>>>
>>>>>>>> This should be an element in the compatibility matrix as well.
>>>>>>>>
>>>>>>>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I am pretty surprised that we do not have
>>>>>>>>> a @Category(ValidatesRunner) test in GroupByKeyTest that iterates 
>>>>>>>>> multiple
>>>>>>>>> times. That is a major oversight. We should have this test, and it 
>>>>>>>>> can be
>>>>>>>>> disabled by the SparkRunner's configuration.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> The Dataflow version does not spill to disk. However Spark's
>>>>>>>>>> design might require spilling to disk if you want that to be 
>>>>>>>>>> implemented
>>>>>>>>>> properly.
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Spark's GBK is currently implemented using `sortBy(key and
>>>>>>>>>>> value).mapPartition(...)` for non-merging windowing in order to 
>>>>>>>>>>> support
>>>>>>>>>>> large keys and large scale shuffles. Merging windowing is 
>>>>>>>>>>> implemented using
>>>>>>>>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash 
>>>>>>>>>>> Grouping),
>>>>>>>>>>> which is by design unable to support large keys.
>>>>>>>>>>>
>>>>>>>>>>> As Jan noted, problem with mapPartition is, that its UDF
>>>>>>>>>>> receives an Iterator. Only option here is to wrap this iterator to 
>>>>>>>>>>> one that
>>>>>>>>>>> spills to disk once an internal buffer is exceeded (the approach 
>>>>>>>>>>> suggested
>>>>>>>>>>> by Reuven). This unfortunately comes with a cost in some cases. The 
>>>>>>>>>>> best
>>>>>>>>>>> approach would be to somehow determine, that user wants multiple 
>>>>>>>>>>> iterations
>>>>>>>>>>> and than wrap it in "re-iterator" if necessary. Does anyone have 
>>>>>>>>>>> any ideas
>>>>>>>>>>> how to approach this?
>>>>>>>>>>>
>>>>>>>>>>> D.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> The Beam API was written to support multiple iterations, and
>>>>>>>>>>>> there are definitely transforms that do so. I believe that 
>>>>>>>>>>>> CoGroupByKey may
>>>>>>>>>>>> do this as well with the resulting iterator.
>>>>>>>>>>>>
>>>>>>>>>>>> I know that the Dataflow runner is able to handles iterators
>>>>>>>>>>>> larger than available memory by paging them in from shuffle, which 
>>>>>>>>>>>> still
>>>>>>>>>>>> allows for reiterating. It sounds like Spark is less flexible here?
>>>>>>>>>>>>
>>>>>>>>>>>> Reuven
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Lukasz, why do you think that users expect to be able to
>>>>>>>>>>>>> iterate multiple times grouped elements? Besides that it 
>>>>>>>>>>>>> obviously suggests
>>>>>>>>>>>>> the 'Iterable'? The way that spark behaves is pretty much 
>>>>>>>>>>>>> analogous to how
>>>>>>>>>>>>> MapReduce used to work - in certain cases it calles
>>>>>>>>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, 
>>>>>>>>>>>>> which
>>>>>>>>>>>>> accepts Iterator - that is because internally it merge sorts pre 
>>>>>>>>>>>>> sorted
>>>>>>>>>>>>> segments. This approach enables to GroupByKey data sets that are 
>>>>>>>>>>>>> too big to
>>>>>>>>>>>>> fit into memory (per key).
>>>>>>>>>>>>>
>>>>>>>>>>>>> If multiple iterations should be expected by users, we
>>>>>>>>>>>>> probably should:
>>>>>>>>>>>>>
>>>>>>>>>>>>>  a) include that in @ValidatesRunner tests
>>>>>>>>>>>>>
>>>>>>>>>>>>>  b) store values in memory on spark, which will break for
>>>>>>>>>>>>> certain pipelines
>>>>>>>>>>>>>
>>>>>>>>>>>>> Because of (b) I think that it would be much better to remove
>>>>>>>>>>>>> this "expectation" and clearly document that the Iterable is not 
>>>>>>>>>>>>> supposed
>>>>>>>>>>>>> to be iterated multiple times.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jan
>>>>>>>>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> I pretty much think so, because that is how Spark works. The
>>>>>>>>>>>>> Iterable inside is really an Iterator, which cannot be iterated 
>>>>>>>>>>>>> multiple
>>>>>>>>>>>>> times.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jan
>>>>>>>>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jan, in Beam users expect to be able to iterate the GBK output
>>>>>>>>>>>>> multiple times even from within the same ParDo.
>>>>>>>>>>>>> Is this something that Beam on Spark Runner never supported?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Gershi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> could you please outline the pipeline you are trying to
>>>>>>>>>>>>>> execute? Basically, you cannot iterate the Iterable multiple 
>>>>>>>>>>>>>> times in
>>>>>>>>>>>>>> single ParDo. It should be possible, though, to apply multiple 
>>>>>>>>>>>>>> ParDos to
>>>>>>>>>>>>>> output from GroupByKey.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jan
>>>>>>>>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I want to iterate multiple times on the Iterable<V> (the
>>>>>>>>>>>>>> output of GroupByKey transformation)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator
>>>>>>>>>>>>>> can't be iterated more than once,otherwise there could be data 
>>>>>>>>>>>>>> lost
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>                 at
>>>>>>>>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>                 at
>>>>>>>>>>>>>> java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I understood I can branch the pipeline after GroupByKey into
>>>>>>>>>>>>>> multiple transformation and iterate in each of them once on the 
>>>>>>>>>>>>>> Iterable<V>.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is there a better way for that?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Software Developer
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [image: Mail_signature_blue]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Reply via email to