Hi, Did you check a Spark DAG if it doesn’t fork branches after "Genereate queries” transform?
— Alexey > On 24 May 2021, at 20:32, Thomas Fredriksen(External) > <thomas.fredrik...@cognite.com> wrote: > > Hi there, > > We are struggling to get the JdbcIO-connector to read a large table on spark. > > In short - we wish to read a large table (several billion rows), transform > then write the transformed data to a new table. > > We are aware that `JdbcIO.read()` does not parallelize. In order to solve > this, we attempted to create ranges then generate `limit/offset` queries and > use `JdbcIO.readAll()` instead. > > The overall steps look something like this (sanitized for readability): > > ``` > pipeline > .apply("Read row count", JdbcIo.read() > .withQuery("select count(*) from MYTABLE;") > .withCoder(VarLongCoder.of()) > .withOtherOptions(...)) > .apply("Genereate queries", ParDo.of(new DoFn<Long, Long>() {...}) // > Outputs table offsets > .apply("Read results", JdbcIO.<Long, Row>readAll() > .withCoder(SchemaCoder.of(...)) > .withOutputParallelization(false) > .withQuery("select * from MYTABLE offset ? limit MYLIMIT;") > .withParameterSetter((element, statement) -> statement.setLong(1, > element)) > .withOtherOptions(...)) > .apply("more steps", ...); > ``` > > The problem is that this does not seem to parallelize on the spark runner. > Only a single worker seem to be doing all the work. > > We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`, > however this did not seem to make a difference. > > Our goal is to avoid all data from the query be cached in memory between the > read and transform operations. This causes OOM-exceptions. Having a single > worker reading the database is okay as long as other workers can process the > data as soon as it is read and not having to wait for all the data to be > ready. > > Any advice on how we approach this. > > Best Regards > Thomas Li Fredriksen