Hi,

Did you check a Spark DAG if it doesn’t fork branches after "Genereate queries” 
transform?

—
Alexey

> On 24 May 2021, at 20:32, Thomas Fredriksen(External) 
> <thomas.fredrik...@cognite.com> wrote:
> 
> Hi there,
> 
> We are struggling to get the JdbcIO-connector to read a large table on spark.
> 
> In short - we wish to read a large table (several billion rows), transform 
> then write the transformed data to a new table.
> 
> We are aware that `JdbcIO.read()` does not parallelize. In order to solve 
> this, we attempted to create ranges then generate `limit/offset` queries and 
> use `JdbcIO.readAll()` instead.
> 
> The overall steps look something like this (sanitized for readability):
> 
> ```
> pipeline
>   .apply("Read row count", JdbcIo.read()
>     .withQuery("select count(*) from MYTABLE;")
>     .withCoder(VarLongCoder.of())
>     .withOtherOptions(...))
>   .apply("Genereate queries", ParDo.of(new DoFn<Long, Long>() {...}) // 
> Outputs table offsets
>   .apply("Read results", JdbcIO.<Long, Row>readAll()
>     .withCoder(SchemaCoder.of(...))
>     .withOutputParallelization(false)
>     .withQuery("select * from MYTABLE offset ? limit MYLIMIT;")
>     .withParameterSetter((element, statement) -> statement.setLong(1, 
> element))
>     .withOtherOptions(...))
>   .apply("more steps", ...);
> ```
> 
> The problem is that this does not seem to parallelize on the spark runner. 
> Only a single worker seem to be doing all the work.
> 
> We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`, 
> however this did not seem to make a difference.
> 
> Our goal is to avoid all data from the query be cached in memory between the 
> read and transform operations. This causes OOM-exceptions. Having a single 
> worker reading the database is okay as long as other workers can process the 
> data as soon as it is read and not having to wait for all the data to be 
> ready.
> 
> Any advice on how we approach this.
> 
> Best Regards
> Thomas Li Fredriksen

Reply via email to