I am pretty surprised that we do not have a @Category(ValidatesRunner) test
in GroupByKeyTest that iterates multiple times. That is a major oversight.
We should have this test, and it can be disabled by the SparkRunner's
configuration.

Kenn

On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com> wrote:

> The Dataflow version does not spill to disk. However Spark's design might
> require spilling to disk if you want that to be implemented properly.
>
> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org> wrote:
>
>> Hi,
>>
>> Spark's GBK is currently implemented using `sortBy(key and
>> value).mapPartition(...)` for non-merging windowing in order to support
>> large keys and large scale shuffles. Merging windowing is implemented using
>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
>> which is by design unable to support large keys.
>>
>> As Jan noted, problem with mapPartition is, that its UDF receives an
>> Iterator. Only option here is to wrap this iterator to one that spills to
>> disk once an internal buffer is exceeded (the approach suggested by
>> Reuven). This unfortunately comes with a cost in some cases. The best
>> approach would be to somehow determine, that user wants multiple iterations
>> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
>> how to approach this?
>>
>> D.
>>
>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> wrote:
>>
>>> The Beam API was written to support multiple iterations, and there are
>>> definitely transforms that do so. I believe that CoGroupByKey may do this
>>> as well with the resulting iterator.
>>>
>>> I know that the Dataflow runner is able to handles iterators larger than
>>> available memory by paging them in from shuffle, which still allows for
>>> reiterating. It sounds like Spark is less flexible here?
>>>
>>> Reuven
>>>
>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org>
>>>>
>>>> Lukasz, why do you think that users expect to be able to iterate
>>>> multiple times grouped elements? Besides that it obviously suggests the
>>>> 'Iterable'? The way that spark behaves is pretty much analogous to how
>>>> MapReduce used to work - in certain cases it calles
>>>> repartitionAndSortWithinPartitions and then does mapPartition, which
>>>> accepts Iterator - that is because internally it merge sorts pre sorted
>>>> segments. This approach enables to GroupByKey data sets that are too big to
>>>> fit into memory (per key).
>>>>
>>>> If multiple iterations should be expected by users, we probably should:
>>>>
>>>>  a) include that in @ValidatesRunner tests
>>>>
>>>>  b) store values in memory on spark, which will break for certain
>>>> pipelines
>>>>
>>>> Because of (b) I think that it would be much better to remove this
>>>> "expectation" and clearly document that the Iterable is not supposed to be
>>>> iterated multiple times.
>>>>
>>>> Jan
>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>
>>>> I pretty much think so, because that is how Spark works. The Iterable
>>>> inside is really an Iterator, which cannot be iterated multiple times.
>>>>
>>>> Jan
>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>
>>>> Jan, in Beam users expect to be able to iterate the GBK output multiple
>>>> times even from within the same ParDo.
>>>> Is this something that Beam on Spark Runner never supported?
>>>>
>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Gershi,
>>>>>
>>>>> could you please outline the pipeline you are trying to execute?
>>>>> Basically, you cannot iterate the Iterable multiple times in single ParDo.
>>>>> It should be possible, though, to apply multiple ParDos to output from
>>>>> GroupByKey.
>>>>>
>>>>> Jan
>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>>
>>>>> I want to iterate multiple times on the Iterable<V> (the output of
>>>>> GroupByKey transformation)
>>>>>
>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>
>>>>>
>>>>>
>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>>>>> iterated more than once,otherwise there could be data lost
>>>>>
>>>>>                 at
>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>
>>>>>                 at java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I understood I can branch the pipeline after GroupByKey into multiple
>>>>> transformation and iterate in each of them once on the Iterable<V>.
>>>>>
>>>>>
>>>>>
>>>>> Is there a better way for that?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>>
>>>>> Software Developer
>>>>>
>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>
>>>>> [image: Mail_signature_blue]
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to