Good point about sibling fusion requiring this.

The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does imply that
the output iterable can be iterated arbitrarily many times.

I think this should remain the default for all the reasons mentioned.

We could have opt-in to the weaker KV<K, Iterator<V>> version. Agree that
this is a property of the ParDo. A particular use of a GBK has no idea what
is downstream. If you owned the whole pipeline, a special
ParDo<Iterator<V>, Foo> would work. But to make the types line up, this
would require changes upstream, which is not good.

Maybe something like this:

ParDo<Iterable<V>, Foo> {
  @ProcessElement
  void process(@OneShotIterator Iterator<V> iter) {
    ...
  }
}

I've described all of this in terms of Java SDK. So we would need a
portable representation for all this metadata.

Kenn

On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <[email protected]> wrote:

> I think the behavior to make explicit is the need to reiterate, not the
> need to handle large results. How large of a result can be handled will
> always be dependent on the runner, and each runner will probably have a
> different definition of large keys. Reiteration however is a logical
> difference in the programming API. Therefore I think it makes sense to
> specify the latter. The need to reiterate is a property of the downstream
> ParDo, so it should be specified there - not on the GBK.
>
> Reuven
>
> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <[email protected]> wrote:
>
>> Ok, I think I understand there might be some benefits of this. Then I'd
>> propose we make this clear on the GBK. If we would support somehing like
>> this:
>>
>>  PCollection<?> input = ....;
>>
>>  input.apply(GroupByKey.withLargeKeys());
>>
>> then SparkRunner could expand this to repartitionAndSortWithinPartitions
>> only on this PTransform, and fallback to the default (in memory) in other
>> situations. The default expansion of LargeGroupByKey (let's say) would be
>> classic GBK, so that only runners that need to make sure that they don't
>> break reiterations can expand this.
>>
>> WDYT?
>>
>> Jan
>> On 9/27/19 8:56 PM, Reuven Lax wrote:
>>
>> As I mentioned above, CoGroupByKey already takes advantage of this.
>> Reiterating is not the most common use case, but it's definitely one that
>> comes up. Also keep in mind that this API has supported reiterating for the
>> past five years (since even before the SDK was donated to Apache).
>> Therefore you should assume that users are relying on it, in ways we might
>> not expect.
>>
>> Historically, Google's Flume system had collections that did not support
>> reiterating (they were even called OneShotCollections to make it clear).
>> This was the source of problems and user frustration, which was one reason
>> that in the original Dataflow SDK we made sure that these iterables could
>> be reiterated. Another reason why it's advantageous for a runner to support
>> this is allowing for better fusion. If two ParDos A and B both read from
>> the same GroupByKey, it is nice to be able to fuse them into one logical
>> operator. For this, you'll probably need a shuffle implementation that
>> allows two independent readers from the same shuffle session.
>>
>> How easy it is to implement reiterables that don't have to fit in memory
>> will depend on the runner.  For Dataflow it's possible because the shuffle
>> session is logically persistent, so the runner can simply reread the
>> shuffle session. For other runners with different shuffle implementations,
>> it might be harder to support both properties. Maybe we should introduce a
>> new @RequiresReiteration annotation on ParDo? That way the Spark runner can
>> see this and switch to the in-memory version just for groupings consumed by
>> those ParDos. Runners that already support reiteration can ignore this
>> annotation, so it should be backwards compatible.
>>
>> Reuven
>>
>> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <[email protected]> wrote:
>>
>>> I'd like to know the use-case. Why would you *need* to actually iterate
>>> the grouped elements twice? By definition the first iteration would have to
>>> extract some statistic (or subset of elements that must fit into memory).
>>> This statistic can then be used as another input for the second iteration.
>>> Why not then calculate the statistic in a separate branch in the pipeline
>>> and feed it then into the ParDo as side input? That would be definitely
>>> more efficient, because the calculation of the statistic would be probably
>>> combinable (not sure if it is absolutely required to be combinable, but it
>>> seems probable). Even if the calculation would not be combinable, it is not
>>> less efficient than reiterating twice. Why then support multiple iterations
>>> (besides the fact that output of GBK is Iterable). Am I missing something?
>>>
>>> Jan
>>> On 9/27/19 6:36 PM, Reuven Lax wrote:
>>>
>>> This should be an element in the compatibility matrix as well.
>>>
>>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <[email protected]> wrote:
>>>
>>>> I am pretty surprised that we do not have a @Category(ValidatesRunner)
>>>> test in GroupByKeyTest that iterates multiple times. That is a major
>>>> oversight. We should have this test, and it can be disabled by the
>>>> SparkRunner's configuration.
>>>>
>>>> Kenn
>>>>
>>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> The Dataflow version does not spill to disk. However Spark's design
>>>>> might require spilling to disk if you want that to be implemented 
>>>>> properly.
>>>>>
>>>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <[email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Spark's GBK is currently implemented using `sortBy(key and
>>>>>> value).mapPartition(...)` for non-merging windowing in order to support
>>>>>> large keys and large scale shuffles. Merging windowing is implemented 
>>>>>> using
>>>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
>>>>>> which is by design unable to support large keys.
>>>>>>
>>>>>> As Jan noted, problem with mapPartition is, that its UDF receives an
>>>>>> Iterator. Only option here is to wrap this iterator to one that spills to
>>>>>> disk once an internal buffer is exceeded (the approach suggested by
>>>>>> Reuven). This unfortunately comes with a cost in some cases. The best
>>>>>> approach would be to somehow determine, that user wants multiple 
>>>>>> iterations
>>>>>> and than wrap it in "re-iterator" if necessary. Does anyone have any 
>>>>>> ideas
>>>>>> how to approach this?
>>>>>>
>>>>>> D.
>>>>>>
>>>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> The Beam API was written to support multiple iterations, and there
>>>>>>> are definitely transforms that do so. I believe that CoGroupByKey may do
>>>>>>> this as well with the resulting iterator.
>>>>>>>
>>>>>>> I know that the Dataflow runner is able to handles iterators larger
>>>>>>> than available memory by paging them in from shuffle, which still allows
>>>>>>> for reiterating. It sounds like Spark is less flexible here?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> +dev <[email protected]> <[email protected]>
>>>>>>>>
>>>>>>>> Lukasz, why do you think that users expect to be able to iterate
>>>>>>>> multiple times grouped elements? Besides that it obviously suggests the
>>>>>>>> 'Iterable'? The way that spark behaves is pretty much analogous to how
>>>>>>>> MapReduce used to work - in certain cases it calles
>>>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, which
>>>>>>>> accepts Iterator - that is because internally it merge sorts pre sorted
>>>>>>>> segments. This approach enables to GroupByKey data sets that are too 
>>>>>>>> big to
>>>>>>>> fit into memory (per key).
>>>>>>>>
>>>>>>>> If multiple iterations should be expected by users, we probably
>>>>>>>> should:
>>>>>>>>
>>>>>>>>  a) include that in @ValidatesRunner tests
>>>>>>>>
>>>>>>>>  b) store values in memory on spark, which will break for certain
>>>>>>>> pipelines
>>>>>>>>
>>>>>>>> Because of (b) I think that it would be much better to remove this
>>>>>>>> "expectation" and clearly document that the Iterable is not supposed 
>>>>>>>> to be
>>>>>>>> iterated multiple times.
>>>>>>>>
>>>>>>>> Jan
>>>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>>>>>
>>>>>>>> I pretty much think so, because that is how Spark works. The
>>>>>>>> Iterable inside is really an Iterator, which cannot be iterated 
>>>>>>>> multiple
>>>>>>>> times.
>>>>>>>>
>>>>>>>> Jan
>>>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>>>>>
>>>>>>>> Jan, in Beam users expect to be able to iterate the GBK output
>>>>>>>> multiple times even from within the same ParDo.
>>>>>>>> Is this something that Beam on Spark Runner never supported?
>>>>>>>>
>>>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Gershi,
>>>>>>>>>
>>>>>>>>> could you please outline the pipeline you are trying to execute?
>>>>>>>>> Basically, you cannot iterate the Iterable multiple times in single 
>>>>>>>>> ParDo.
>>>>>>>>> It should be possible, though, to apply multiple ParDos to output from
>>>>>>>>> GroupByKey.
>>>>>>>>>
>>>>>>>>> Jan
>>>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I want to iterate multiple times on the Iterable<V> (the output of
>>>>>>>>> GroupByKey transformation)
>>>>>>>>>
>>>>>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>>>>>>>>> iterated more than once,otherwise there could be data lost
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>>>>>
>>>>>>>>>                 at
>>>>>>>>> java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I understood I can branch the pipeline after GroupByKey into
>>>>>>>>> multiple transformation and iterate in each of them once on the 
>>>>>>>>> Iterable<V>.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Is there a better way for that?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>>>>>>
>>>>>>>>> Software Developer
>>>>>>>>>
>>>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>>>>>
>>>>>>>>> [image: Mail_signature_blue]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

Reply via email to