+1 I think this makes more sense then the existing form of a split that is made of several Kafka partitions since, as mentioned, Kafka partitions are in fact it's parallelism.
As for supporting a change in the number of partitions (mainly, added partitions), I'll suggest something I brought up before, and might make more sense now: Hashing an UnboundedSource according to it's split's properties (topic-partition in this case). This will allow to key the stream by the source in a way that the reader's CheckpointMark is tied to the split, and if a "new split" is created (a new partition added to a topic the pipeline consumes) it's reader's state is non-existing (starting from latest/earlies), while the rest (of the readers) will pick-up where they left. I think this also avoids the need to "remember" the original number of parallelism. Thanks, Amit On Fri, Nov 11, 2016 at 4:22 AM Raghu Angadi <rang...@google.com.invalid> wrote: > I would like to propose a change to how many splits (sources) KafkaIO > creates. The code changes are relatively simple, but it has a couple of > drawbacks I would to discuss here. > > KafkaIO currently takes '*desiredNumWorkers > < > https://github.com/apache/incubator-beam/blob/v0.3.0-incubating-RC1/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L642 > >*' > hint literally and returns exactly that many splits. If *desiredNumWorkers* > is 10, and the topic has 50 partitions, each Kafka source reads from 5 > partitions. > > The primary disadvantage is that runner dependent 'desiredNumWorkers' might > not be accurate. In Dataflow, it is particularly low when we set > 'maxNumWorkers' (BEAM-958 <https://issues.apache.org/jira/browse/BEAM-958 > >). > In addition, number of partitions in Kafka is a really good indicator of > its parallelism. > > I would like to change KafkaIO to return one split for each of the > partitions. > > Pros: > > - A partition is in fact the unit of parallelism in Kafka. > - Does not depend on 'desiredNumWorkers'. > - Little risk of having unreasonably large number of partitions (unlike > say a source with one split for file). Number of partitions tend to be > on > the order of the Kafka cluster size. > > Cons: mainly affects job update: > > - Breaks updating existing job > <https://cloud.google.com/dataflow/pipelines/updating-a-pipeline> if it > is updated to newer version of KafkaIO. New version changes number of > splits returned, which is not allowed during update. > - I think this is a reasonable breakage at this stage. > - Vast majority of updates don't involve version change > - We could add a work around where user can explicitly set number of > splits in KafkaIO (this might be required to handle change in > partitions as > well, see below) > - Makes it a bit more difficult to support change in number of Kafka > partitions across an update. > - This is not a feature in KafkaIO yet. So not a new breakage. > - If we don't depend on 'desiredNumWorkers', there is no way for us > to know how many splits we had before the update. This is actually a > limitation of UnboundedSource API. UnboundedSource needs > multiple teaks to > support job update better. In that sense I don't think this should > be a > blocker. > - A work around is to let user explicitly set number of splits. E.g. > - when a job starts, say we had 70 partitions and after some time > we add 10 more partitions. > - At runtime, each Kafka split notices these and can distribute > new partitions among existing 70. > - But when the job is updated, KafkaIO does not know that it had > only 70 partitions earlier. > - For this to work, user could set number of splits to 70 > explicitly. > > > Thanks. > Raghu. >