I would like to propose a change to how many splits (sources) KafkaIO
creates. The code changes are relatively simple, but it has a couple of
drawbacks I would to discuss here.

KafkaIO currently takes '*desiredNumWorkers
<https://github.com/apache/incubator-beam/blob/v0.3.0-incubating-RC1/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L642>*'
hint literally and returns exactly that many splits. If *desiredNumWorkers*
is 10, and the topic has 50 partitions, each Kafka source reads from 5
partitions.

The primary disadvantage is that runner dependent 'desiredNumWorkers' might
not be accurate. In Dataflow, it is particularly low when we set
'maxNumWorkers' (BEAM-958 <https://issues.apache.org/jira/browse/BEAM-958>).
In addition, number of partitions in Kafka is a really good indicator of
its parallelism.

I would like to change KafkaIO to return one split for each of the
partitions.

Pros:

   - A partition is in fact the unit of parallelism in Kafka.
   - Does not depend on 'desiredNumWorkers'.
   - Little risk of having unreasonably large number of partitions (unlike
   say a source with one split for file). Number of partitions tend to be on
   the order of the Kafka cluster size.

Cons: mainly affects job update:

   - Breaks updating existing job
   <https://cloud.google.com/dataflow/pipelines/updating-a-pipeline> if it
   is updated to newer version of KafkaIO. New version changes number of
   splits returned, which is not allowed during update.
      - I think this is a reasonable breakage at this stage.
      - Vast majority of updates don't involve version change
      - We could add a work around where user can explicitly set number of
      splits in KafkaIO (this might be required to handle change in
partitions as
      well, see below)
   - Makes it a bit more difficult to support change in number of Kafka
   partitions across an update.
      - This is not a feature in KafkaIO yet. So not a new breakage.
      - If we don't depend on 'desiredNumWorkers', there is no way for us
      to know how many splits we had before the update. This is actually a
      limitation of UnboundedSource API. UnboundedSource needs
multiple teaks to
      support job update better. In that sense I don't think this should be a
      blocker.
      - A work around is to let user explicitly set number of splits. E.g.
         - when a job starts, say we had 70 partitions and after some time
         we add 10 more partitions.
         - At runtime, each Kafka split notices these and can distribute
         new partitions among existing 70.
         - But when the job is updated, KafkaIO does not know that it had
         only 70 partitions earlier.
         - For this to work, user could set number of splits to 70
         explicitly.


Thanks.
Raghu.

Reply via email to