Re: JdbcIO read needs to fit in memory

2019-10-29 Thread Jozef Vilcek
On Tue, Oct 29, 2019 at 10:04 AM Ryan Skraba  wrote:

> I didn't get a chance to try this out -- it sounds like a bug with the
> SparkRunner, if you've tested it with FlinkRunner and it succeeded.
>
> From your description, it should be reproducible by reading any large
> database table with the SparkRunner where the entire dataset is
> greater than the memory available to a single executor?  Do you have
> any other tips to reproduce?
>

Yes, that is what I do.


> Expecially worrisome is "as past JDBC load job runs fine with 4GB
> heap" -- did this happen with the same volumes of data and a different
> version of Beam?  Or the same version and a pipeline with different
> characteristics? This does sound like a regression, so details would
> help to confirm and track it down!
>

Eh, my english, sorry :) What I meant to say is, that if I provide this
data e.g. via file dump, then whole job runs OK with 4GB executor heap.
Run is about 400 cores for 1 hour, so triple the heap size for all just for
one initial load on one executor is inefficient.
I am not aware about any regression.


>
> All my best, Ryan
>
>
>
>
> On Tue, Oct 29, 2019 at 9:48 AM Jozef Vilcek 
> wrote:
> >
> > I can not find anything in docs about expected behavior of DoFn emitting
> arbitrary large number elements on one processElement().
> >
> > I wonder if Spark Runner behavior is a bug or just a difference (and
> disadvantage in this case) in execution more towards runner capability
> matrix differences.
> >
> > Also, in such cases, what is an opinion about BoundedSource vs DoFn as a
> source. What is a recommendation to IO developer if one want's to achieve
> equivalent execution scalability across runners?
> >
> >
> > On Sun, Oct 27, 2019 at 6:02 PM Jozef Vilcek 
> wrote:
> >>
> >> typo in my previous message. I meant to say => JDBC is `not` the main
> data set, just metadata
> >>
> >> On Sun, Oct 27, 2019 at 6:00 PM Jozef Vilcek 
> wrote:
> >>>
> >>> Result of my query can fit the memory if I use 12GB heap per spark
> executor. This makes the job quite inefficient as past JDBC load job runs
> fine with 4GB heap to do the main heavy lifting - JDBC is the main data
> set, just metadata.
> >>>
> >>> I just did run the same JdbcIO read code on Spark and Flink runner.
> Flink did not blow up on memory. So it seems like this is a limitation of
> SparkRunner.
> >>>
> >>> On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba  wrote:
> 
>  One more thing to try -- depending on your pipeline, you can disable
>  the "auto-reshuffle" of JdbcIO.Read by setting
>  withOutputParallelization(false)
> 
>  This is particularly useful if (1) you do aggressive and cheap
>  filtering immediately after the read or (2) you do your own
>  repartitioning action like GroupByKey after the read.
> 
>  Given your investigation into the heap, I doubt this will help!  I'll
>  take a closer look at the DoFnOutputManager.  In the meantime, is
>  there anything particularly about your job that might help
>  investigate?
> 
>  All my best, Ryan
> 
>  On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek 
> wrote:
>  >
>  > I agree I might be too quick to call DoFn output need to fit in
> memory. Actually I am not sure what Beam model say on this matter and what
> output managers of particular runners do about it.
>  >
>  > But SparkRunner definitely has an issue here. I did try set small
> `fetchSize` for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK.
> All fails on OOM.
>  > When looking at the heap, most of it is used by linked list
> multi-map of DoFnOutputManager here:
>  >
> https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234
>  >
>  >
>


Re: JdbcIO read needs to fit in memory

2019-10-29 Thread Ryan Skraba
I didn't get a chance to try this out -- it sounds like a bug with the
SparkRunner, if you've tested it with FlinkRunner and it succeeded.

>From your description, it should be reproducible by reading any large
database table with the SparkRunner where the entire dataset is
greater than the memory available to a single executor?  Do you have
any other tips to reproduce?

Expecially worrisome is "as past JDBC load job runs fine with 4GB
heap" -- did this happen with the same volumes of data and a different
version of Beam?  Or the same version and a pipeline with different
characteristics? This does sound like a regression, so details would
help to confirm and track it down!

All my best, Ryan




On Tue, Oct 29, 2019 at 9:48 AM Jozef Vilcek  wrote:
>
> I can not find anything in docs about expected behavior of DoFn emitting 
> arbitrary large number elements on one processElement().
>
> I wonder if Spark Runner behavior is a bug or just a difference (and 
> disadvantage in this case) in execution more towards runner capability matrix 
> differences.
>
> Also, in such cases, what is an opinion about BoundedSource vs DoFn as a 
> source. What is a recommendation to IO developer if one want's to achieve 
> equivalent execution scalability across runners?
>
>
> On Sun, Oct 27, 2019 at 6:02 PM Jozef Vilcek  wrote:
>>
>> typo in my previous message. I meant to say => JDBC is `not` the main data 
>> set, just metadata
>>
>> On Sun, Oct 27, 2019 at 6:00 PM Jozef Vilcek  wrote:
>>>
>>> Result of my query can fit the memory if I use 12GB heap per spark 
>>> executor. This makes the job quite inefficient as past JDBC load job runs 
>>> fine with 4GB heap to do the main heavy lifting - JDBC is the main data 
>>> set, just metadata.
>>>
>>> I just did run the same JdbcIO read code on Spark and Flink runner. Flink 
>>> did not blow up on memory. So it seems like this is a limitation of 
>>> SparkRunner.
>>>
>>> On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba  wrote:

 One more thing to try -- depending on your pipeline, you can disable
 the "auto-reshuffle" of JdbcIO.Read by setting
 withOutputParallelization(false)

 This is particularly useful if (1) you do aggressive and cheap
 filtering immediately after the read or (2) you do your own
 repartitioning action like GroupByKey after the read.

 Given your investigation into the heap, I doubt this will help!  I'll
 take a closer look at the DoFnOutputManager.  In the meantime, is
 there anything particularly about your job that might help
 investigate?

 All my best, Ryan

 On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek  wrote:
 >
 > I agree I might be too quick to call DoFn output need to fit in memory. 
 > Actually I am not sure what Beam model say on this matter and what 
 > output managers of particular runners do about it.
 >
 > But SparkRunner definitely has an issue here. I did try set small 
 > `fetchSize` for JdbcIO as well as change `storageLevel` to 
 > MEMORY_AND_DISK. All fails on OOM.
 > When looking at the heap, most of it is used by linked list multi-map of 
 > DoFnOutputManager here:
 > https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234
 >
 >


Re: JdbcIO read needs to fit in memory

2019-10-29 Thread Jozef Vilcek
I can not find anything in docs about expected behavior of DoFn emitting
arbitrary large number elements on one processElement().

I wonder if Spark Runner behavior is a bug or just a difference (and
disadvantage in this case) in execution more towards runner capability
matrix differences.

Also, in such cases, what is an opinion about BoundedSource vs DoFn as a
source. What is a recommendation to IO developer if one want's to achieve
equivalent execution scalability across runners?


On Sun, Oct 27, 2019 at 6:02 PM Jozef Vilcek  wrote:

> typo in my previous message. I meant to say => JDBC is `not` the main data
> set, just metadata
>
> On Sun, Oct 27, 2019 at 6:00 PM Jozef Vilcek 
> wrote:
>
>> Result of my query can fit the memory if I use 12GB heap per spark
>> executor. This makes the job quite inefficient as past JDBC load job runs
>> fine with 4GB heap to do the main heavy lifting - JDBC is the main data
>> set, just metadata.
>>
>> I just did run the same JdbcIO read code on Spark and Flink runner. Flink
>> did not blow up on memory. So it seems like this is a limitation of
>> SparkRunner.
>>
>> On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba  wrote:
>>
>>> One more thing to try -- depending on your pipeline, you can disable
>>> the "auto-reshuffle" of JdbcIO.Read by setting
>>> withOutputParallelization(false)
>>>
>>> This is particularly useful if (1) you do aggressive and cheap
>>> filtering immediately after the read or (2) you do your own
>>> repartitioning action like GroupByKey after the read.
>>>
>>> Given your investigation into the heap, I doubt this will help!  I'll
>>> take a closer look at the DoFnOutputManager.  In the meantime, is
>>> there anything particularly about your job that might help
>>> investigate?
>>>
>>> All my best, Ryan
>>>
>>> On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek 
>>> wrote:
>>> >
>>> > I agree I might be too quick to call DoFn output need to fit in
>>> memory. Actually I am not sure what Beam model say on this matter and what
>>> output managers of particular runners do about it.
>>> >
>>> > But SparkRunner definitely has an issue here. I did try set small
>>> `fetchSize` for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK.
>>> All fails on OOM.
>>> > When looking at the heap, most of it is used by linked list multi-map
>>> of DoFnOutputManager here:
>>> >
>>> https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234
>>> >
>>> >
>>>
>>


Re: JdbcIO read needs to fit in memory

2019-10-27 Thread Jozef Vilcek
Result of my query can fit the memory if I use 12GB heap per spark
executor. This makes the job quite inefficient as past JDBC load job runs
fine with 4GB heap to do the main heavy lifting - JDBC is the main data
set, just metadata.

I just did run the same JdbcIO read code on Spark and Flink runner. Flink
did not blow up on memory. So it seems like this is a limitation of
SparkRunner.

On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba  wrote:

> One more thing to try -- depending on your pipeline, you can disable
> the "auto-reshuffle" of JdbcIO.Read by setting
> withOutputParallelization(false)
>
> This is particularly useful if (1) you do aggressive and cheap
> filtering immediately after the read or (2) you do your own
> repartitioning action like GroupByKey after the read.
>
> Given your investigation into the heap, I doubt this will help!  I'll
> take a closer look at the DoFnOutputManager.  In the meantime, is
> there anything particularly about your job that might help
> investigate?
>
> All my best, Ryan
>
> On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek 
> wrote:
> >
> > I agree I might be too quick to call DoFn output need to fit in memory.
> Actually I am not sure what Beam model say on this matter and what output
> managers of particular runners do about it.
> >
> > But SparkRunner definitely has an issue here. I did try set small
> `fetchSize` for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK.
> All fails on OOM.
> > When looking at the heap, most of it is used by linked list multi-map of
> DoFnOutputManager here:
> >
> https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234
> >
> >
>


Re: JdbcIO read needs to fit in memory

2019-10-25 Thread Ryan Skraba
One more thing to try -- depending on your pipeline, you can disable
the "auto-reshuffle" of JdbcIO.Read by setting
withOutputParallelization(false)

This is particularly useful if (1) you do aggressive and cheap
filtering immediately after the read or (2) you do your own
repartitioning action like GroupByKey after the read.

Given your investigation into the heap, I doubt this will help!  I'll
take a closer look at the DoFnOutputManager.  In the meantime, is
there anything particularly about your job that might help
investigate?

All my best, Ryan

On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek  wrote:
>
> I agree I might be too quick to call DoFn output need to fit in memory. 
> Actually I am not sure what Beam model say on this matter and what output 
> managers of particular runners do about it.
>
> But SparkRunner definitely has an issue here. I did try set small `fetchSize` 
> for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK. All fails on 
> OOM.
> When looking at the heap, most of it is used by linked list multi-map of 
> DoFnOutputManager here:
> https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234
>
>


Re: JdbcIO read needs to fit in memory

2019-10-25 Thread Jozef Vilcek
I agree I might be too quick to call DoFn output need to fit in memory.
Actually I am not sure what Beam model say on this matter and what output
managers of particular runners do about it.

But SparkRunner definitely has an issue here. I did try set small
`fetchSize` for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK.
All fails on OOM.
When looking at the heap, most of it is used by linked list multi-map of
DoFnOutputManager here:
https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234


Re: JdbcIO read needs to fit in memory

2019-10-24 Thread Alexey Romanenko
Jozef, do you have any NPE stacktrace to share?

> On 24 Oct 2019, at 15:26, Jozef Vilcek  wrote:
> 
> Hi,
> 
> I am in a need to read a big-ish data set via JdbcIO. This forced me to bump 
> up memory for my executor (right now using SparkRunner). It seems that JdbcIO 
> has a requirement to fit all data in memory as it is using DoFn to unfold 
> query to list of elements.
> 
> BoundedSource would not face the need to fit result in memory, but JdbcIO is 
> using DoFn. Also, in recent discussion [1] it was suggested that 
> BoudnedSource should not be used as it is obsolete.
> 
> Does anyone faced this issue? What would be the best way to solve it? If DoFn 
> should be kept, then I can only think of splitting the query to ranges and 
> try to find most fitting number of rows to read at once.
> 
> I appreciate any thoughts. 
> 
> [1] 
> https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Reading%20from%20RDB%2C%20ParDo%20or%20BoundedSource
>  
> 


Re: JdbcIO read needs to fit in memory

2019-10-24 Thread Ryan Skraba
Hello!

If I remember correctly -- the JdbcIO will use *one* DoFn instance to
read all of the rows, but that instance is not required to hold all of
the rows in memory.

The fetch size will, however, read 50K rows at a time by default and
those will all be held in memory in that single worker until they are
emitted.  You can adjust this setting with the setFetchSize(...)
method.

By default, the JdbcIO.Read transform adds a "reshuffle", which will
repartition the records among all of the nodes in the cluster.  This
means that all of the rows need to fit into total available memory of
the cluster (not just that one node), especially if the RDD underneath
the PCollection is reused/persisted.  You can change the persistence
level to "MEMORY_AND_DISK" in this case if you want to spill data to
disk instead of failing your job:
https://github.com/apache/beam/blob/416f62bdd7fa092257921e4835a48094ebe1dda4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L56

I hope this helps!  Ryan




On Thu, Oct 24, 2019 at 4:26 PM Jean-Baptiste Onofré  wrote:
>
> Hi
>
> JdbcIO is basically a DoFn. So it could load all on a single executor 
> (there's no obvious way to split).
>
> It's what you mean ?
>
> Regards
> JB
>
> Le 24 oct. 2019 15:26, Jozef Vilcek  a écrit :
>
> Hi,
>
> I am in a need to read a big-ish data set via JdbcIO. This forced me to bump 
> up memory for my executor (right now using SparkRunner). It seems that JdbcIO 
> has a requirement to fit all data in memory as it is using DoFn to unfold 
> query to list of elements.
>
> BoundedSource would not face the need to fit result in memory, but JdbcIO is 
> using DoFn. Also, in recent discussion [1] it was suggested that 
> BoudnedSource should not be used as it is obsolete.
>
> Does anyone faced this issue? What would be the best way to solve it? If DoFn 
> should be kept, then I can only think of splitting the query to ranges and 
> try to find most fitting number of rows to read at once.
>
> I appreciate any thoughts.
>
> [1] 
> https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Reading%20from%20RDB%2C%20ParDo%20or%20BoundedSource
>
>


Re: JdbcIO read needs to fit in memory

2019-10-24 Thread Eugene Kirpichov
Sorry, I just realized I've made a mistake. BoundedSource in some runners
may not have the same "fits in memory" limitation as DoFn's, so in that
sense you're right - if it was done as a BoundedSource, perhaps it would
work better in your case, even if it didn't read things in parallel.

On Thu, Oct 24, 2019 at 8:17 AM Eugene Kirpichov 
wrote:

> Hi Josef,
>
> JdbcIO per se does not require the result set to fit in memory. The issues
> come from the limitations of the context in which it runs:
> - It indeed uses a DoFn to emit results; a DoFn is in general allowed to
> emit an unbounded number of results that doesn't necessarily have to fit in
> memory, but some runners may have this requirement (e.g. Spark probably
> does, Dataflow doesn't, not sure about the others)
> - JdbcIO uses a database cursor provided by the underlying JDBC driver to
> read through the results. Again, depending on the particular JDBC driver,
> the cursor may or may not be able to stream the results without storing all
> of them in memory.
> - The biggest issue, though, is that there's no way to automatically split
> the execution of a JDBC query into several sub-queries whose results
> together are equivalent to the result of the original query. Because of
> this, it is not possible to implement JdbcIO in a way that it would
> *automatically* avoid scanning through the entire result set, because
> scanning through the entire result set sequentially is the only way JDBC
> drivers (and most databases) allow you to access query results. Even if we
> chose to use BoundedSource, we wouldn't be able to implement the split()
> method.
>
> If you need to read query results in parallel, or to circumvent memory
> limitations of a particular runner or JDBC driver, you can use
> JdbcIO.readAll(), and parameterize your query such that passing all the
> parameter values together adds up to the original query you wanted. Most
> likely it would be something like transforming "SELECT * FROM TABLE" to a
> family of queries "SELECT * FROM TABLE WHERE MY_PRIMARY_KEY BETWEEN ? AND
> ?" and passing primary key ranges adding up to the full range of the
> table's keys.
>
> Note that, whether this performs better, will also depend on the database
> - e.g. if the database is already bottlenecked, then reading from it in
> parallel will not make things faster.
>
> On Thu, Oct 24, 2019 at 7:26 AM Jean-Baptiste Onofré 
> wrote:
>
>> Hi
>>
>> JdbcIO is basically a DoFn. So it could load all on a single executor
>> (there's no obvious way to split).
>>
>> It's what you mean ?
>>
>> Regards
>> JB
>>
>> Le 24 oct. 2019 15:26, Jozef Vilcek  a écrit :
>>
>> Hi,
>>
>> I am in a need to read a big-ish data set via JdbcIO. This forced me to
>> bump up memory for my executor (right now using SparkRunner). It seems that
>> JdbcIO has a requirement to fit all data in memory as it is using DoFn to
>> unfold query to list of elements.
>>
>> BoundedSource would not face the need to fit result in memory, but JdbcIO
>> is using DoFn. Also, in recent discussion [1] it was suggested that
>> BoudnedSource should not be used as it is obsolete.
>>
>> Does anyone faced this issue? What would be the best way to solve it? If
>> DoFn should be kept, then I can only think of splitting the query to ranges
>> and try to find most fitting number of rows to read at once.
>>
>> I appreciate any thoughts.
>>
>> [1]
>> https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Reading%20from%20RDB%2C%20ParDo%20or%20BoundedSource
>>
>>
>>


Re: JdbcIO read needs to fit in memory

2019-10-24 Thread Jean-Baptiste Onofré
HiJdbcIO is basically a DoFn. So it could load all on a single executor (there's no obvious way to split).It's what you mean ?RegardsJBLe 24 oct. 2019 15:26, Jozef Vilcek  a écrit :Hi,I am in a need to read a big-ish data set via JdbcIO. This forced me to bump up memory for my executor (right now using SparkRunner). It seems that JdbcIO has a requirement to fit all data in memory as it is using DoFn to unfold query to list of elements.BoundedSource would not face the need to fit result in memory, but JdbcIO is using DoFn. Also, in recent discussion [1] it was suggested that BoudnedSource should not be used as it is obsolete.Does anyone faced this issue? What would be the best way to solve it? If DoFn should be kept, then I can only think of splitting the query to ranges and try to find most fitting number of rows to read at once.I appreciate any thoughts. [1] https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Reading%20from%20RDB%2C%20ParDo%20or%20BoundedSource


JdbcIO read needs to fit in memory

2019-10-24 Thread Jozef Vilcek
Hi,

I am in a need to read a big-ish data set via JdbcIO. This forced me to
bump up memory for my executor (right now using SparkRunner). It seems that
JdbcIO has a requirement to fit all data in memory as it is using DoFn to
unfold query to list of elements.

BoundedSource would not face the need to fit result in memory, but JdbcIO
is using DoFn. Also, in recent discussion [1] it was suggested that
BoudnedSource should not be used as it is obsolete.

Does anyone faced this issue? What would be the best way to solve it? If
DoFn should be kept, then I can only think of splitting the query to ranges
and try to find most fitting number of rows to read at once.

I appreciate any thoughts.

[1]
https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Reading%20from%20RDB%2C%20ParDo%20or%20BoundedSource