I didn't get a chance to try this out -- it sounds like a bug with the
SparkRunner, if you've tested it with FlinkRunner and it succeeded.

>From your description, it should be reproducible by reading any large
database table with the SparkRunner where the entire dataset is
greater than the memory available to a single executor?  Do you have
any other tips to reproduce?

Expecially worrisome is "as past JDBC load job runs fine with 4GB
heap" -- did this happen with the same volumes of data and a different
version of Beam?  Or the same version and a pipeline with different
characteristics? This does sound like a regression, so details would
help to confirm and track it down!

All my best, Ryan




On Tue, Oct 29, 2019 at 9:48 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:
>
> I can not find anything in docs about expected behavior of DoFn emitting 
> arbitrary large number elements on one processElement().
>
> I wonder if Spark Runner behavior is a bug or just a difference (and 
> disadvantage in this case) in execution more towards runner capability matrix 
> differences.
>
> Also, in such cases, what is an opinion about BoundedSource vs DoFn as a 
> source. What is a recommendation to IO developer if one want's to achieve 
> equivalent execution scalability across runners?
>
>
> On Sun, Oct 27, 2019 at 6:02 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote:
>>
>> typo in my previous message. I meant to say => JDBC is `not` the main data 
>> set, just metadata
>>
>> On Sun, Oct 27, 2019 at 6:00 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote:
>>>
>>> Result of my query can fit the memory if I use 12GB heap per spark 
>>> executor. This makes the job quite inefficient as past JDBC load job runs 
>>> fine with 4GB heap to do the main heavy lifting - JDBC is the main data 
>>> set, just metadata.
>>>
>>> I just did run the same JdbcIO read code on Spark and Flink runner. Flink 
>>> did not blow up on memory. So it seems like this is a limitation of 
>>> SparkRunner.
>>>
>>> On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba <r...@skraba.com> wrote:
>>>>
>>>> One more thing to try -- depending on your pipeline, you can disable
>>>> the "auto-reshuffle" of JdbcIO.Read by setting
>>>> withOutputParallelization(false)
>>>>
>>>> This is particularly useful if (1) you do aggressive and cheap
>>>> filtering immediately after the read or (2) you do your own
>>>> repartitioning action like GroupByKey after the read.
>>>>
>>>> Given your investigation into the heap, I doubt this will help!  I'll
>>>> take a closer look at the DoFnOutputManager.  In the meantime, is
>>>> there anything particularly about your job that might help
>>>> investigate?
>>>>
>>>> All my best, Ryan
>>>>
>>>> On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote:
>>>> >
>>>> > I agree I might be too quick to call DoFn output need to fit in memory. 
>>>> > Actually I am not sure what Beam model say on this matter and what 
>>>> > output managers of particular runners do about it.
>>>> >
>>>> > But SparkRunner definitely has an issue here. I did try set small 
>>>> > `fetchSize` for JdbcIO as well as change `storageLevel` to 
>>>> > MEMORY_AND_DISK. All fails on OOM.
>>>> > When looking at the heap, most of it is used by linked list multi-map of 
>>>> > DoFnOutputManager here:
>>>> > https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234
>>>> >
>>>> >

Reply via email to