Re: [Current spark runner] Combine globally translation is risky and not very performant

2019-07-01 Thread Jan Lukavský

Hi Etienne,

the data is collected to driver using RDD#aggregate [1], which collects 
single value for RDD. This value is then expanded to single value per 
window, because the aggregated value holds all values for all windows.


Jan

[1] 
https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/rdd/RDD.html#aggregate-U-scala.Function2-scala.Function2-scala.reflect.ClassTag-


On 7/1/19 5:02 PM, Etienne Chauchot wrote:

Hi Jan,
The collect call is before the extraction so it is collecting a value 
par accumulator to the spark driver, see 
sparkCombineFn.extractOutput(maybeAccumulated.get());

call implementation. So potentially more than one value per window.

For the new spark runner, what I'm using is native combine that all 
happens at the dataset (equivalent of rdd to simplify) side, so it is 
all in parallel.


Etienne


Le jeudi 27 juin 2019 à 15:13 +0200, Jan Lukavský a écrit :

Hi Etienne,
I saw that too while working on solving [1]. It seems a little weird and
I was a little tempted to changed it to something roughly equivalent to
Combine.perKey with single key. But, actually the Combine.globally
should be rather small, right? There will be single value for each
window. And even if we change it to Combine.perKey with single key I
think the problem of potential OOM will be just moved to some worker. Or
would you see some other option?
Jan
[1]https://issues.apache.org/jira/browse/BEAM-7574
On 6/27/19 11:43 AM, Etienne Chauchot wrote:
Hi guys,
FYI, while I'm working on the combine translation for the new spark
runner poc, I saw something that do not seem right in the current
runner:https://issues.apache.org/jira/browse/BEAM-7647
Best,
Etienne


Re: [Current spark runner] Combine globally translation is risky and not very performant

2019-07-01 Thread Etienne Chauchot
Hi Jan,The collect call is before the extraction so it is collecting a value 
par accumulator to the spark driver, see
sparkCombineFn.extractOutput(maybeAccumulated.get()); call implementation. So 
potentially more than one value per
window.
For the new spark runner, what I'm using is native combine that all happens at 
the dataset (equivalent of rdd to
simplify) side, so it is all in parallel.
Etienne 

Le jeudi 27 juin 2019 à 15:13 +0200, Jan Lukavský a écrit :
> Hi Etienne,
> I saw that too while working on solving [1]. It seems a little weird and I 
> was a little tempted to changed it to
> something roughly equivalent to Combine.perKey with single key. But, actually 
> the Combine.globally should be rather
> small, right? There will be single value for each window. And even if we 
> change it to Combine.perKey with single key I
> think the problem of potential OOM will be just moved to some worker. Or 
> would you see some other option?
> Jan
> [1] https://issues.apache.org/jira/browse/BEAM-7574
> On 6/27/19 11:43 AM, Etienne Chauchot wrote:
> Hi guys,
> FYI, while I'm working on the combine translation for the new spark runner 
> poc, I saw something that do not seem right
> in the current runner: https://issues.apache.org/jira/browse/BEAM-7647
> Best,Etienne


Re: [Current spark runner] Combine globally translation is risky and not very performant

2019-06-27 Thread Jan Lukavský

Hi Etienne,

I saw that too while working on solving [1]. It seems a little weird and 
I was a little tempted to changed it to something roughly equivalent to 
Combine.perKey with single key. But, actually the Combine.globally 
should be rather small, right? There will be single value for each 
window. And even if we change it to Combine.perKey with single key I 
think the problem of potential OOM will be just moved to some worker. Or 
would you see some other option?


Jan

[1] https://issues.apache.org/jira/browse/BEAM-7574

On 6/27/19 11:43 AM, Etienne Chauchot wrote:

Hi guys,

FYI, while I'm working on the combine translation for the new spark 
runner poc, I saw something that do not seem right in the current 
runner: https://issues.apache.org/jira/browse/BEAM-7647


Best,
Etienne