I can not find anything in docs about expected behavior of DoFn emitting arbitrary large number elements on one processElement().
I wonder if Spark Runner behavior is a bug or just a difference (and disadvantage in this case) in execution more towards runner capability matrix differences. Also, in such cases, what is an opinion about BoundedSource vs DoFn as a source. What is a recommendation to IO developer if one want's to achieve equivalent execution scalability across runners? On Sun, Oct 27, 2019 at 6:02 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > typo in my previous message. I meant to say => JDBC is `not` the main data > set, just metadata > > On Sun, Oct 27, 2019 at 6:00 PM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> Result of my query can fit the memory if I use 12GB heap per spark >> executor. This makes the job quite inefficient as past JDBC load job runs >> fine with 4GB heap to do the main heavy lifting - JDBC is the main data >> set, just metadata. >> >> I just did run the same JdbcIO read code on Spark and Flink runner. Flink >> did not blow up on memory. So it seems like this is a limitation of >> SparkRunner. >> >> On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba <r...@skraba.com> wrote: >> >>> One more thing to try -- depending on your pipeline, you can disable >>> the "auto-reshuffle" of JdbcIO.Read by setting >>> withOutputParallelization(false) >>> >>> This is particularly useful if (1) you do aggressive and cheap >>> filtering immediately after the read or (2) you do your own >>> repartitioning action like GroupByKey after the read. >>> >>> Given your investigation into the heap, I doubt this will help! I'll >>> take a closer look at the DoFnOutputManager. In the meantime, is >>> there anything particularly about your job that might help >>> investigate? >>> >>> All my best, Ryan >>> >>> On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek <jozo.vil...@gmail.com> >>> wrote: >>> > >>> > I agree I might be too quick to call DoFn output need to fit in >>> memory. Actually I am not sure what Beam model say on this matter and what >>> output managers of particular runners do about it. >>> > >>> > But SparkRunner definitely has an issue here. I did try set small >>> `fetchSize` for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK. >>> All fails on OOM. >>> > When looking at the heap, most of it is used by linked list multi-map >>> of DoFnOutputManager here: >>> > >>> https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234 >>> > >>> > >>> >>