The Dataflow version does not spill to disk. However Spark's design might
require spilling to disk if you want that to be implemented properly.

On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org> wrote:

> Hi,
>
> Spark's GBK is currently implemented using `sortBy(key and
> value).mapPartition(...)` for non-merging windowing in order to support
> large keys and large scale shuffles. Merging windowing is implemented using
> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
> which is by design unable to support large keys.
>
> As Jan noted, problem with mapPartition is, that its UDF receives an
> Iterator. Only option here is to wrap this iterator to one that spills to
> disk once an internal buffer is exceeded (the approach suggested by
> Reuven). This unfortunately comes with a cost in some cases. The best
> approach would be to somehow determine, that user wants multiple iterations
> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
> how to approach this?
>
> D.
>
> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> wrote:
>
>> The Beam API was written to support multiple iterations, and there are
>> definitely transforms that do so. I believe that CoGroupByKey may do this
>> as well with the resulting iterator.
>>
>> I know that the Dataflow runner is able to handles iterators larger than
>> available memory by paging them in from shuffle, which still allows for
>> reiterating. It sounds like Spark is less flexible here?
>>
>> Reuven
>>
>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> +dev <dev@beam.apache.org> <dev@beam.apache.org>
>>>
>>> Lukasz, why do you think that users expect to be able to iterate
>>> multiple times grouped elements? Besides that it obviously suggests the
>>> 'Iterable'? The way that spark behaves is pretty much analogous to how
>>> MapReduce used to work - in certain cases it calles
>>> repartitionAndSortWithinPartitions and then does mapPartition, which
>>> accepts Iterator - that is because internally it merge sorts pre sorted
>>> segments. This approach enables to GroupByKey data sets that are too big to
>>> fit into memory (per key).
>>>
>>> If multiple iterations should be expected by users, we probably should:
>>>
>>>  a) include that in @ValidatesRunner tests
>>>
>>>  b) store values in memory on spark, which will break for certain
>>> pipelines
>>>
>>> Because of (b) I think that it would be much better to remove this
>>> "expectation" and clearly document that the Iterable is not supposed to be
>>> iterated multiple times.
>>>
>>> Jan
>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>
>>> I pretty much think so, because that is how Spark works. The Iterable
>>> inside is really an Iterator, which cannot be iterated multiple times.
>>>
>>> Jan
>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>
>>> Jan, in Beam users expect to be able to iterate the GBK output multiple
>>> times even from within the same ParDo.
>>> Is this something that Beam on Spark Runner never supported?
>>>
>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi Gershi,
>>>>
>>>> could you please outline the pipeline you are trying to execute?
>>>> Basically, you cannot iterate the Iterable multiple times in single ParDo.
>>>> It should be possible, though, to apply multiple ParDos to output from
>>>> GroupByKey.
>>>>
>>>> Jan
>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> I want to iterate multiple times on the Iterable<V> (the output of
>>>> GroupByKey transformation)
>>>>
>>>> When my Runner is SparkRunner, I get an exception:
>>>>
>>>>
>>>>
>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>>>> iterated more than once,otherwise there could be data lost
>>>>
>>>>                 at
>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>
>>>>                 at java.lang.Iterable.spliterator(Iterable.java:101)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> I understood I can branch the pipeline after GroupByKey into multiple
>>>> transformation and iterate in each of them once on the Iterable<V>.
>>>>
>>>>
>>>>
>>>> Is there a better way for that?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>
>>>> Software Developer
>>>>
>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>
>>>> [image: Mail_signature_blue]
>>>>
>>>>
>>>>
>>>>

Reply via email to