I didn't get a chance to try this out -- it sounds like a bug with the SparkRunner, if you've tested it with FlinkRunner and it succeeded.
>From your description, it should be reproducible by reading any large database table with the SparkRunner where the entire dataset is greater than the memory available to a single executor? Do you have any other tips to reproduce? Expecially worrisome is "as past JDBC load job runs fine with 4GB heap" -- did this happen with the same volumes of data and a different version of Beam? Or the same version and a pipeline with different characteristics? This does sound like a regression, so details would help to confirm and track it down! All my best, Ryan On Tue, Oct 29, 2019 at 9:48 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > > I can not find anything in docs about expected behavior of DoFn emitting > arbitrary large number elements on one processElement(). > > I wonder if Spark Runner behavior is a bug or just a difference (and > disadvantage in this case) in execution more towards runner capability matrix > differences. > > Also, in such cases, what is an opinion about BoundedSource vs DoFn as a > source. What is a recommendation to IO developer if one want's to achieve > equivalent execution scalability across runners? > > > On Sun, Oct 27, 2019 at 6:02 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote: >> >> typo in my previous message. I meant to say => JDBC is `not` the main data >> set, just metadata >> >> On Sun, Oct 27, 2019 at 6:00 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote: >>> >>> Result of my query can fit the memory if I use 12GB heap per spark >>> executor. This makes the job quite inefficient as past JDBC load job runs >>> fine with 4GB heap to do the main heavy lifting - JDBC is the main data >>> set, just metadata. >>> >>> I just did run the same JdbcIO read code on Spark and Flink runner. Flink >>> did not blow up on memory. So it seems like this is a limitation of >>> SparkRunner. >>> >>> On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba <r...@skraba.com> wrote: >>>> >>>> One more thing to try -- depending on your pipeline, you can disable >>>> the "auto-reshuffle" of JdbcIO.Read by setting >>>> withOutputParallelization(false) >>>> >>>> This is particularly useful if (1) you do aggressive and cheap >>>> filtering immediately after the read or (2) you do your own >>>> repartitioning action like GroupByKey after the read. >>>> >>>> Given your investigation into the heap, I doubt this will help! I'll >>>> take a closer look at the DoFnOutputManager. In the meantime, is >>>> there anything particularly about your job that might help >>>> investigate? >>>> >>>> All my best, Ryan >>>> >>>> On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote: >>>> > >>>> > I agree I might be too quick to call DoFn output need to fit in memory. >>>> > Actually I am not sure what Beam model say on this matter and what >>>> > output managers of particular runners do about it. >>>> > >>>> > But SparkRunner definitely has an issue here. I did try set small >>>> > `fetchSize` for JdbcIO as well as change `storageLevel` to >>>> > MEMORY_AND_DISK. All fails on OOM. >>>> > When looking at the heap, most of it is used by linked list multi-map of >>>> > DoFnOutputManager here: >>>> > https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234 >>>> > >>>> >