Hello,

(my excuses for the long email but this requires context)

As part of the move from Source based IOs to DoFn based ones. One pattern
emerged due to the composable nature of DoFn. The idea is to have a different
kind of composable reads where we take a PCollection of different sorts of
intermediate specifications e.g. tables, queries, etc, for example:

JdbcIO:
ReadAll<ParameterT, OutputT> extends
PTransform<PCollection<ParameterT>, PCollection<OutputT>>

RedisIO:
ReadAll extends PTransform<PCollection<String>, PCollection<KV<String, String>>>

HBaseIO:
ReadAll extends PTransform<PCollection<HBaseQuery>, PCollection<Result>>

These patterns enabled richer use cases like doing multiple queries in the same
Pipeline, querying based on key patterns or querying from multiple tables at the
same time but came with some maintenance issues:

- We ended up needing to add to the ReadAll transforms the parameters for
  missing information so we ended up with lots of duplicated with methods and
  error-prone code from the Read transforms into the ReadAll transforms.

- When you require new parameters you have to expand the input parameters of the
  intermediary specification into something that resembles the full `Read`
  definition for example imagine you want to read from multiple tables or
  servers as part of the same pipeline but this was not in the intermediate
  specification you end up adding those extra methods (duplicating more code)
  just o get close to the be like the Read full spec.

- If new parameters are added to the Read method we end up adding them
  systematically to the ReadAll transform too so they are taken into account.

Due to these issues I recently did a change to test a new approach that is
simpler, more complete and maintainable. The code became:

HBaseIO:
ReadAll extends PTransform<PCollection<Read>, PCollection<Result>>

With this approach users gain benefits of improvements on parameters of normal
Read because they count with the full Read parameters. But of course there are
some minor caveats:

1. You need to push some information into normal Reads for example
   partition boundaries information or Restriction information (in the SDF
   case).  Notice that this consistent approach of ReadAll produces a simple
   pattern that ends up being almost reusable between IOs (e.g. the    non-SDF
   case):

  public static class ReadAll extends PTransform<PCollection<Read>,
PCollection<SolrDocument>> {
    @Override
    public PCollection<SolrDocument> expand(PCollection<Read> input) {
      return input
          .apply("Split", ParDo.of(new SplitFn()))
          .apply("Reshuffle", Reshuffle.viaRandomKey())
          .apply("Read", ParDo.of(new ReadFn()));
    }
  }

2. If you are using Generic types for the results ReadAll you must have the
   Coders used in its definition and require consistent types from the data
   sources, in practice this means we need to add extra withCoder method(s) on
   ReadAll but not the full specs.


At the moment HBaseIO and SolrIO already follow this ReadAll pattern. RedisIO
and CassandraIO have already WIP PRs to do so. So I wanted to bring this subject
to the mailing list to see your opinions, and if you see any sort of issues that
we might be missing with this idea.

Also I would like to see if we have consensus to start using consistently the
terminology of ReadAll transforms based on Read and the readAll() method for new
IOs (at this point probably outdoing this in the only remaining inconsistent
place in JdbcIO might not be a good idea but apart of this we should be ok).

I mention this because the recent PR on KafkaIO based on SDF is doing something
similar to the old pattern but being called ReadAll and maybe it is worth to be
consistent for the benefit of users.

Regards,
Ismaël

Reply via email to