Etienne Chauchot created BEAM-12153:
---------------------------------------

             Summary: OOM on GBK in SparkRunner and 
SpartStructuredStreamingRunner
                 Key: BEAM-12153
                 URL: https://issues.apache.org/jira/browse/BEAM-12153
             Project: Beam
          Issue Type: Bug
          Components: runner-spark
            Reporter: Etienne Chauchot


We have being experiencing OOM errors on GroupByKey in batch mode in both Spark 
runners even if behind the woods spark spills data to disk in such cases: 
taking a look at the translation in the two runners, it might be due to using 
ReduceFnRunner for merging windows in GBK translation. 
ReduceFnRunner.processElements expects to have all elements to merge the 
windows between each other.:

RDD spark runner:

https://github.com/apache/beam/blob/752798e3b0e6911fa84f8d138dacccdb6fcc85ef/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java#L99

structured streaming spark: runner: 
[https://github.com/apache/beam/blob/752798e3b0e6911fa84f8d138dacccdb6fcc85ef/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java#L74|https://github.com/apache/beam/blob/752798e3b0e6911fa84f8d138dacccdb6fcc85ef/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java#L106]

Even replacing the Iterable with an Iterator in ReduceFnRunner to avoid 
materialization does not work because deep in ReduceFnRunner.processElements(), 
the collection is iterated twice.

It could be better to do what flink runner does and translate GBK as 
CombinePerKey with a Concatenate combine fn and thus avoid elements 
materialization.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to