Hi there,

We are struggling to get the JdbcIO-connector to read a large table on
spark.

In short - we wish to read a large table (several billion rows), transform
then write the transformed data to a new table.

We are aware that `JdbcIO.read()` does not parallelize. In order to solve
this, we attempted to create ranges then generate `limit/offset` queries
and use `JdbcIO.readAll()` instead.

The overall steps look something like this (sanitized for readability):

```
pipeline
  .apply("Read row count", JdbcIo.read()
    .withQuery("select count(*) from MYTABLE;")
    .withCoder(VarLongCoder.of())
    .withOtherOptions(...))
  .apply("Genereate queries", ParDo.of(new DoFn<Long, Long>() {...}) //
Outputs table offsets
  .apply("Read results", JdbcIO.<Long, Row>readAll()
    .withCoder(SchemaCoder.of(...))
    .withOutputParallelization(false)
    .withQuery("select * from MYTABLE offset ? limit MYLIMIT;")
    .withParameterSetter((element, statement) -> statement.setLong(1,
element))
    .withOtherOptions(...))
  .apply("more steps", ...);
```

The problem is that this does not seem to parallelize on the spark runner.
Only a single worker seem to be doing all the work.

We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`,
however this did not seem to make a difference.

Our goal is to avoid all data from the query be cached in memory between
the read and transform operations. This causes OOM-exceptions. Having a
single worker reading the database is okay as long as other workers can
process the data as soon as it is read and not having to wait for all the
data to be ready.

Any advice on how we approach this.

Best Regards
Thomas Li Fredriksen

Reply via email to