I think this makes more sense then the existing form of a split that is
made of several Kafka partitions since, as mentioned, Kafka partitions are
in fact it's parallelism.

As for supporting a change in the number of partitions (mainly, added
partitions), I'll suggest something I brought up before, and might make
more sense now:
Hashing an UnboundedSource according to it's split's properties
(topic-partition in this case). This will allow to key the stream by the
source in a way that the reader's CheckpointMark is tied to the split, and
if a "new split" is created (a new partition added to a topic the pipeline
consumes) it's reader's state is non-existing (starting from
latest/earlies), while the rest (of the readers) will pick-up where they
I think this also avoids the need to "remember" the original number of


On Fri, Nov 11, 2016 at 4:22 AM Raghu Angadi <rang...@google.com.invalid>

> I would like to propose a change to how many splits (sources) KafkaIO
> creates. The code changes are relatively simple, but it has a couple of
> drawbacks I would to discuss here.
> KafkaIO currently takes '*desiredNumWorkers
> <
> https://github.com/apache/incubator-beam/blob/v0.3.0-incubating-RC1/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L642
> >*'
> hint literally and returns exactly that many splits. If *desiredNumWorkers*
> is 10, and the topic has 50 partitions, each Kafka source reads from 5
> partitions.
> The primary disadvantage is that runner dependent 'desiredNumWorkers' might
> not be accurate. In Dataflow, it is particularly low when we set
> 'maxNumWorkers' (BEAM-958 <https://issues.apache.org/jira/browse/BEAM-958
> >).
> In addition, number of partitions in Kafka is a really good indicator of
> its parallelism.
> I would like to change KafkaIO to return one split for each of the
> partitions.
> Pros:
>    - A partition is in fact the unit of parallelism in Kafka.
>    - Does not depend on 'desiredNumWorkers'.
>    - Little risk of having unreasonably large number of partitions (unlike
>    say a source with one split for file). Number of partitions tend to be
> on
>    the order of the Kafka cluster size.
> Cons: mainly affects job update:
>    - Breaks updating existing job
>    <https://cloud.google.com/dataflow/pipelines/updating-a-pipeline> if it
>    is updated to newer version of KafkaIO. New version changes number of
>    splits returned, which is not allowed during update.
>       - I think this is a reasonable breakage at this stage.
>       - Vast majority of updates don't involve version change
>       - We could add a work around where user can explicitly set number of
>       splits in KafkaIO (this might be required to handle change in
> partitions as
>       well, see below)
>    - Makes it a bit more difficult to support change in number of Kafka
>    partitions across an update.
>       - This is not a feature in KafkaIO yet. So not a new breakage.
>       - If we don't depend on 'desiredNumWorkers', there is no way for us
>       to know how many splits we had before the update. This is actually a
>       limitation of UnboundedSource API. UnboundedSource needs
> multiple teaks to
>       support job update better. In that sense I don't think this should
> be a
>       blocker.
>       - A work around is to let user explicitly set number of splits. E.g.
>          - when a job starts, say we had 70 partitions and after some time
>          we add 10 more partitions.
>          - At runtime, each Kafka split notices these and can distribute
>          new partitions among existing 70.
>          - But when the job is updated, KafkaIO does not know that it had
>          only 70 partitions earlier.
>          - For this to work, user could set number of splits to 70
>          explicitly.
> Thanks.
> Raghu.

Reply via email to