I can not find anything in docs about expected behavior of DoFn emitting
arbitrary large number elements on one processElement().

I wonder if Spark Runner behavior is a bug or just a difference (and
disadvantage in this case) in execution more towards runner capability
matrix differences.

Also, in such cases, what is an opinion about BoundedSource vs DoFn as a
source. What is a recommendation to IO developer if one want's to achieve
equivalent execution scalability across runners?


On Sun, Oct 27, 2019 at 6:02 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> typo in my previous message. I meant to say => JDBC is `not` the main data
> set, just metadata
>
> On Sun, Oct 27, 2019 at 6:00 PM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
>
>> Result of my query can fit the memory if I use 12GB heap per spark
>> executor. This makes the job quite inefficient as past JDBC load job runs
>> fine with 4GB heap to do the main heavy lifting - JDBC is the main data
>> set, just metadata.
>>
>> I just did run the same JdbcIO read code on Spark and Flink runner. Flink
>> did not blow up on memory. So it seems like this is a limitation of
>> SparkRunner.
>>
>> On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba <r...@skraba.com> wrote:
>>
>>> One more thing to try -- depending on your pipeline, you can disable
>>> the "auto-reshuffle" of JdbcIO.Read by setting
>>> withOutputParallelization(false)
>>>
>>> This is particularly useful if (1) you do aggressive and cheap
>>> filtering immediately after the read or (2) you do your own
>>> repartitioning action like GroupByKey after the read.
>>>
>>> Given your investigation into the heap, I doubt this will help!  I'll
>>> take a closer look at the DoFnOutputManager.  In the meantime, is
>>> there anything particularly about your job that might help
>>> investigate?
>>>
>>> All my best, Ryan
>>>
>>> On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek <jozo.vil...@gmail.com>
>>> wrote:
>>> >
>>> > I agree I might be too quick to call DoFn output need to fit in
>>> memory. Actually I am not sure what Beam model say on this matter and what
>>> output managers of particular runners do about it.
>>> >
>>> > But SparkRunner definitely has an issue here. I did try set small
>>> `fetchSize` for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK.
>>> All fails on OOM.
>>> > When looking at the heap, most of it is used by linked list multi-map
>>> of DoFnOutputManager here:
>>> >
>>> https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234
>>> >
>>> >
>>>
>>

Reply via email to