This should be an element in the compatibility matrix as well.

On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org> wrote:

> I am pretty surprised that we do not have a @Category(ValidatesRunner)
> test in GroupByKeyTest that iterates multiple times. That is a major
> oversight. We should have this test, and it can be disabled by the
> SparkRunner's configuration.
>
> Kenn
>
> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com> wrote:
>
>> The Dataflow version does not spill to disk. However Spark's design might
>> require spilling to disk if you want that to be implemented properly.
>>
>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> Spark's GBK is currently implemented using `sortBy(key and
>>> value).mapPartition(...)` for non-merging windowing in order to support
>>> large keys and large scale shuffles. Merging windowing is implemented using
>>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
>>> which is by design unable to support large keys.
>>>
>>> As Jan noted, problem with mapPartition is, that its UDF receives an
>>> Iterator. Only option here is to wrap this iterator to one that spills to
>>> disk once an internal buffer is exceeded (the approach suggested by
>>> Reuven). This unfortunately comes with a cost in some cases. The best
>>> approach would be to somehow determine, that user wants multiple iterations
>>> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
>>> how to approach this?
>>>
>>> D.
>>>
>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> The Beam API was written to support multiple iterations, and there are
>>>> definitely transforms that do so. I believe that CoGroupByKey may do this
>>>> as well with the resulting iterator.
>>>>
>>>> I know that the Dataflow runner is able to handles iterators larger
>>>> than available memory by paging them in from shuffle, which still allows
>>>> for reiterating. It sounds like Spark is less flexible here?
>>>>
>>>> Reuven
>>>>
>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org>
>>>>>
>>>>> Lukasz, why do you think that users expect to be able to iterate
>>>>> multiple times grouped elements? Besides that it obviously suggests the
>>>>> 'Iterable'? The way that spark behaves is pretty much analogous to how
>>>>> MapReduce used to work - in certain cases it calles
>>>>> repartitionAndSortWithinPartitions and then does mapPartition, which
>>>>> accepts Iterator - that is because internally it merge sorts pre sorted
>>>>> segments. This approach enables to GroupByKey data sets that are too big 
>>>>> to
>>>>> fit into memory (per key).
>>>>>
>>>>> If multiple iterations should be expected by users, we probably should:
>>>>>
>>>>>  a) include that in @ValidatesRunner tests
>>>>>
>>>>>  b) store values in memory on spark, which will break for certain
>>>>> pipelines
>>>>>
>>>>> Because of (b) I think that it would be much better to remove this
>>>>> "expectation" and clearly document that the Iterable is not supposed to be
>>>>> iterated multiple times.
>>>>>
>>>>> Jan
>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>>
>>>>> I pretty much think so, because that is how Spark works. The Iterable
>>>>> inside is really an Iterator, which cannot be iterated multiple times.
>>>>>
>>>>> Jan
>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>>
>>>>> Jan, in Beam users expect to be able to iterate the GBK output
>>>>> multiple times even from within the same ParDo.
>>>>> Is this something that Beam on Spark Runner never supported?
>>>>>
>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> Hi Gershi,
>>>>>>
>>>>>> could you please outline the pipeline you are trying to execute?
>>>>>> Basically, you cannot iterate the Iterable multiple times in single 
>>>>>> ParDo.
>>>>>> It should be possible, though, to apply multiple ParDos to output from
>>>>>> GroupByKey.
>>>>>>
>>>>>> Jan
>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>>
>>>>>> I want to iterate multiple times on the Iterable<V> (the output of
>>>>>> GroupByKey transformation)
>>>>>>
>>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>>
>>>>>>
>>>>>>
>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>>>>>> iterated more than once,otherwise there could be data lost
>>>>>>
>>>>>>                 at
>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>>
>>>>>>                 at java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> I understood I can branch the pipeline after GroupByKey into multiple
>>>>>> transformation and iterate in each of them once on the Iterable<V>.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Is there a better way for that?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>>>
>>>>>> Software Developer
>>>>>>
>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>>
>>>>>> [image: Mail_signature_blue]
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to