[jira] [Commented] (BEAM-6740) Combine.globally translation is never called

2019-04-01 Thread Etienne Chauchot (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16806472#comment-16806472
 ] 

Etienne Chauchot commented on BEAM-6740:


Totally agree with what [~kenn] said.

> Combine.globally translation is never called
> 
>
> Key: BEAM-6740
> URL: https://issues.apache.org/jira/browse/BEAM-6740
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Etienne Chauchot
>Assignee: Ismaël Mejía
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Beam translates Combine.Globally as a composite transform composed of:
>  * Map that assigns Void keys
>  * Combine.PerKey
> on spark: As Combine.Perkey uses a spark GBK inside it, the runner adds its 
> own translation of Combine.Globally to avoid less performant GBK. This 
> translation should be called in place of entering the composite transform 
> translation.A pipeline like this: 
> {code:java}
> PCollection input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 
> 9, 10));
> input.apply(
>  Combine.globally(new IntegerCombineFn()));
> {code}
> {code:java}
>   private static class IntegerCombineFn extends Combine.CombineFn Integer, Integer> {
> @Override
> public Integer createAccumulator() {
>   return 0;
> }
> @Override
> public Integer addInput(Integer accumulator, Integer input) {
>   return accumulator + input;
> }
> @Override
> public Integer mergeAccumulators(Iterable accumulators) {
>   Integer result = 0;
>   for (Integer value : accumulators) {
> result += value;
>   }
>   return result;
> }
> @Override
> public Integer extractOutput(Integer accumulator) {
>   return accumulator;
> }
>   }
> {code}
> is translated as the above composite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-6740) Combine.globally translation is never called

2019-03-29 Thread Kenneth Knowles (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805587#comment-16805587
 ] 

Kenneth Knowles commented on BEAM-6740:
---

I think you should just add a URN for Combine.globally. That seems useful. Any 
transform that has well-defined semantics can and should have a URN. It is 
valuable in many ways:

 - if a runner has a special implementation (like yours) it can choose that and 
run zero user code
 - if there are multiple languages in a pipeline, the runner can choose the 
language with the most advantageous fusion to implement the URN

A URN does not just apply to primitives, and there is no requirement that any 
SDK implement anything except the primitives SDK harness URNs. It is just a 
hint.

> Combine.globally translation is never called
> 
>
> Key: BEAM-6740
> URL: https://issues.apache.org/jira/browse/BEAM-6740
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Etienne Chauchot
>Assignee: Ismaël Mejía
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Beam translates Combine.Globally as a composite transform composed of:
>  * Map that assigns Void keys
>  * Combine.PerKey
> on spark: As Combine.Perkey uses a spark GBK inside it, the runner adds its 
> own translation of Combine.Globally to avoid less performant GBK. This 
> translation should be called in place of entering the composite transform 
> translation.A pipeline like this: 
> {code:java}
> PCollection input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 
> 9, 10));
> input.apply(
>  Combine.globally(new IntegerCombineFn()));
> {code}
> {code:java}
>   private static class IntegerCombineFn extends Combine.CombineFn Integer, Integer> {
> @Override
> public Integer createAccumulator() {
>   return 0;
> }
> @Override
> public Integer addInput(Integer accumulator, Integer input) {
>   return accumulator + input;
> }
> @Override
> public Integer mergeAccumulators(Iterable accumulators) {
>   Integer result = 0;
>   for (Integer value : accumulators) {
> result += value;
>   }
>   return result;
> }
> @Override
> public Integer extractOutput(Integer accumulator) {
>   return accumulator;
> }
>   }
> {code}
> is translated as the above composite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-6740) Combine.globally translation is never called

2019-02-26 Thread Etienne Chauchot (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16777945#comment-16777945
 ] 

Etienne Chauchot commented on BEAM-6740:


No, [~iemejia] changed it in this PR: 
https://github.com/apache/beam/pull/7005/files spark translation is no more 
based on class but on urn. Previously in spark the class was added directly in 
spark translators map, and the translation worked fine. 

The problem seems more general than only spark runner because in 
runner-construction, there is only Combine.perKey PayloadTranslator see  
https://github.com/apache/beam/blob/2fbc8c0c2db3a0798e7cca0d30c3a7eec855b375/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java#L57.
  I fear the impact of adding a new CombineTranslation for Combine.globally in 
runner-contruction side. That being said, you're right, only spark has a direct 
translation of combine.Globally, all the others use only Combine.perKey.

I took a look at nexmark query 7 perf (that uses Combine.globally) and the perf 
seems the same in spark before deactivating Combine.globally special 
translation (with Ismael's PR) and after. IMHO, I would recommend to leave the 
Combine.Globally dead code in case we need it in the future (we see its need 
with more in depth performances tests). But What is sure is that for now I will 
deactivate urn lookup in the new POC spark runner for which, not having a 
combine.globally translation will harm performances. 
WDYT ? 



> Combine.globally translation is never called
> 
>
> Key: BEAM-6740
> URL: https://issues.apache.org/jira/browse/BEAM-6740
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Etienne Chauchot
>Priority: Major
>
> SDK translates Combine.Globally as a composite transform composed of:
>  * Map that assigns Void keys
>  * Combine.PerKey
> on spark: As Combine.Perkey uses a spark GBK inside it, the runner adds its 
> own translation of Combine.Globally to avoid less performant GBK. This 
> translation should be called in place of entering the composite transform 
> translation.A pipeline like this: 
> {code}
> PCollection input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 
> 9, 10));
> input.apply(
>  Combine.globally(new IntegerCombineFn()));
> {code}
> {code}
>   private static class IntegerCombineFn extends Combine.CombineFn Integer, Integer> {
> @Override
> public Integer createAccumulator() {
>   return 0;
> }
> @Override
> public Integer addInput(Integer accumulator, Integer input) {
>   return accumulator + input;
> }
> @Override
> public Integer mergeAccumulators(Iterable accumulators) {
>   Integer result = 0;
>   for (Integer value : accumulators) {
> result += value;
>   }
>   return result;
> }
> @Override
> public Integer extractOutput(Integer accumulator) {
>   return accumulator;
> }
>   }
> {code}
> is translated as the above composite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-6740) Combine.globally translation is never called

2019-02-25 Thread Kenneth Knowles (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16777077#comment-16777077
 ] 

Kenneth Knowles commented on BEAM-6740:
---

This is part of the SparkRunner, yes? Doesn't the SparkRunner still translate 
based on the Java class? There wasn't really a change - some runners started to 
translate based on URN, but they only added URNs for primitives and special 
composites that they cared about. I think other runners maybe just don't care 
about Combine.globally as a special composite. It seems just fine to add a URN 
now. Or am I misunderstanding the issue? Can you point it out in the 
SparkRunner?


> Combine.globally translation is never called
> 
>
> Key: BEAM-6740
> URL: https://issues.apache.org/jira/browse/BEAM-6740
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Etienne Chauchot
>Priority: Major
>
> SDK translates Combine.Globally as a composite transform composed of:
>  * Map that assigns Void keys
>  * Combine.PerKey
> on spark: As Combine.Perkey uses a spark GBK inside it, the runner adds its 
> own translation of Combine.Globally to avoid less performant GBK. This 
> translation should be called in place of entering the composite transform 
> translation.A pipeline like this: 
> {code}
> PCollection input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 
> 9, 10));
> input.apply(
>  Combine.globally(new IntegerCombineFn()));
> {code}
> {code}
>   private static class IntegerCombineFn extends Combine.CombineFn Integer, Integer> {
> @Override
> public Integer createAccumulator() {
>   return 0;
> }
> @Override
> public Integer addInput(Integer accumulator, Integer input) {
>   return accumulator + input;
> }
> @Override
> public Integer mergeAccumulators(Iterable accumulators) {
>   Integer result = 0;
>   for (Integer value : accumulators) {
> result += value;
>   }
>   return result;
> }
> @Override
> public Integer extractOutput(Integer accumulator) {
>   return accumulator;
> }
>   }
> {code}
> is translated as the above composite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-6740) Combine.globally translation is never called

2019-02-25 Thread Etienne Chauchot (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16777066#comment-16777066
 ] 

Etienne Chauchot commented on BEAM-6740:


[~kenn] ok so it is indeed a bug that needs to be fixed. It might have worked 
in the past otherwise there would not be any special combine.globally 
translation. Nevertheless I cannot spot the point in history where it changed. 
IMHO, the way I see this is that a given composite transform should be 
translated to its inner primitives only if a direct translation is not 
registered in the runner. Do you agree?

> Combine.globally translation is never called
> 
>
> Key: BEAM-6740
> URL: https://issues.apache.org/jira/browse/BEAM-6740
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Etienne Chauchot
>Priority: Major
>
> SDK translates Combine.Globally as a composite transform composed of:
>  * Map that assigns Void keys
>  * Combine.PerKey
> on spark: As Combine.Perkey uses a spark GBK inside it, the runner adds its 
> own translation of Combine.Globally to avoid less performant GBK. This 
> translation should be called in place of entering the composite transform 
> translation.A pipeline like this: 
> {code}
> PCollection input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 
> 9, 10));
> input.apply(
>  Combine.globally(new IntegerCombineFn()));
> {code}
> {code}
>   private static class IntegerCombineFn extends Combine.CombineFn Integer, Integer> {
> @Override
> public Integer createAccumulator() {
>   return 0;
> }
> @Override
> public Integer addInput(Integer accumulator, Integer input) {
>   return accumulator + input;
> }
> @Override
> public Integer mergeAccumulators(Iterable accumulators) {
>   Integer result = 0;
>   for (Integer value : accumulators) {
> result += value;
>   }
>   return result;
> }
> @Override
> public Integer extractOutput(Integer accumulator) {
>   return accumulator;
> }
>   }
> {code}
> is translated as the above composite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-6740) Combine.globally translation is never called

2019-02-25 Thread Etienne Chauchot (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16776755#comment-16776755
 ] 

Etienne Chauchot commented on BEAM-6740:


As it is in runner construction, it makes me wonder: is it intended that 
Combine.Globally never gets translated directly (without decomposing it into 
its primitives) ?
[~kenn] WDYT ?


> Combine.globally translation is never called
> 
>
> Key: BEAM-6740
> URL: https://issues.apache.org/jira/browse/BEAM-6740
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Etienne Chauchot
>Priority: Major
>
> SDK translates Combine.Globally as a composite transform composed of:
>  * Map that assigns Void keys
>  * Combine.PerKey
> As Combine.Perkey uses a spark GBK inside it, the runner adds its own 
> translation of Combine.Globally to avoid less performant GBK. This 
> translation should be called in place of entering the composite transform 
> translation.A pipeline like this: 
> {code}
> PCollection input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 
> 9, 10));
> input.apply(
>  Combine.globally(new IntegerCombineFn()));
> {code}
> {code}
>   private static class IntegerCombineFn extends Combine.CombineFn Integer, Integer> {
> @Override
> public Integer createAccumulator() {
>   return 0;
> }
> @Override
> public Integer addInput(Integer accumulator, Integer input) {
>   return accumulator + input;
> }
> @Override
> public Integer mergeAccumulators(Iterable accumulators) {
>   Integer result = 0;
>   for (Integer value : accumulators) {
> result += value;
>   }
>   return result;
> }
> @Override
> public Integer extractOutput(Integer accumulator) {
>   return accumulator;
> }
>   }
> {code}
> is translated as the above composite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-6740) Combine.globally translation is never called

2019-02-25 Thread Etienne Chauchot (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16776742#comment-16776742
 ] 

Etienne Chauchot commented on BEAM-6740:


That is because none of 
{code}
RawPTransformTranslator
KnownTransformPayloadTranslator
ParDoTranslator
{code}
known translators can translate Combine.Globally

> Combine.globally translation is never called
> 
>
> Key: BEAM-6740
> URL: https://issues.apache.org/jira/browse/BEAM-6740
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Etienne Chauchot
>Priority: Major
>
> SDK translates Combine.Globally as a composite transform composed of:
>  * Map that assigns Void keys
>  * Combine.PerKey
> As Combine.Perkey uses a spark GBK inside it, the runner adds its own 
> translation of Combine.Globally to avoid less performant GBK. This 
> translation should be called in place of entering the composite transform 
> translation.A pipeline like this: 
> {code}
> PCollection input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 
> 9, 10));
> input.apply(
>  Combine.globally(new IntegerCombineFn()));
> {code}
> {code}
>   private static class IntegerCombineFn extends Combine.CombineFn Integer, Integer> {
> @Override
> public Integer createAccumulator() {
>   return 0;
> }
> @Override
> public Integer addInput(Integer accumulator, Integer input) {
>   return accumulator + input;
> }
> @Override
> public Integer mergeAccumulators(Iterable accumulators) {
>   Integer result = 0;
>   for (Integer value : accumulators) {
> result += value;
>   }
>   return result;
> }
> @Override
> public Integer extractOutput(Integer accumulator) {
>   return accumulator;
> }
>   }
> {code}
> is translated as the above composite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)