Why is it that we don't generate initial splits after the pipeline has been created and the runner is processing it?
This would allow a runner to look at the old state of the pipeline and see how many splits there were. This would allow the runner to provide a hint as to how many splits it wants. This brings it inline with how bounded sources work where the splitting is performed once the pipeline has started. On Fri, Nov 11, 2016 at 8:09 AM, Amit Sela <amitsel...@gmail.com> wrote: > +1 > I think this makes more sense then the existing form of a split that is > made of several Kafka partitions since, as mentioned, Kafka partitions are > in fact it's parallelism. > > As for supporting a change in the number of partitions (mainly, added > partitions), I'll suggest something I brought up before, and might make > more sense now: > Hashing an UnboundedSource according to it's split's properties > (topic-partition in this case). This will allow to key the stream by the > source in a way that the reader's CheckpointMark is tied to the split, and > if a "new split" is created (a new partition added to a topic the pipeline > consumes) it's reader's state is non-existing (starting from > latest/earlies), while the rest (of the readers) will pick-up where they > left. > I think this also avoids the need to "remember" the original number of > parallelism. > > Thanks, > Amit > > On Fri, Nov 11, 2016 at 4:22 AM Raghu Angadi <rang...@google.com.invalid> > wrote: > > > I would like to propose a change to how many splits (sources) KafkaIO > > creates. The code changes are relatively simple, but it has a couple of > > drawbacks I would to discuss here. > > > > KafkaIO currently takes '*desiredNumWorkers > > < > > https://github.com/apache/incubator-beam/blob/v0.3.0- > incubating-RC1/sdks/java/io/kafka/src/main/java/org/ > apache/beam/sdk/io/kafka/KafkaIO.java#L642 > > >*' > > hint literally and returns exactly that many splits. If > *desiredNumWorkers* > > is 10, and the topic has 50 partitions, each Kafka source reads from 5 > > partitions. > > > > The primary disadvantage is that runner dependent 'desiredNumWorkers' > might > > not be accurate. In Dataflow, it is particularly low when we set > > 'maxNumWorkers' (BEAM-958 <https://issues.apache.org/ > jira/browse/BEAM-958 > > >). > > In addition, number of partitions in Kafka is a really good indicator of > > its parallelism. > > > > I would like to change KafkaIO to return one split for each of the > > partitions. > > > > Pros: > > > > - A partition is in fact the unit of parallelism in Kafka. > > - Does not depend on 'desiredNumWorkers'. > > - Little risk of having unreasonably large number of partitions > (unlike > > say a source with one split for file). Number of partitions tend to be > > on > > the order of the Kafka cluster size. > > > > Cons: mainly affects job update: > > > > - Breaks updating existing job > > <https://cloud.google.com/dataflow/pipelines/updating-a-pipeline> if > it > > is updated to newer version of KafkaIO. New version changes number of > > splits returned, which is not allowed during update. > > - I think this is a reasonable breakage at this stage. > > - Vast majority of updates don't involve version change > > - We could add a work around where user can explicitly set number > of > > splits in KafkaIO (this might be required to handle change in > > partitions as > > well, see below) > > - Makes it a bit more difficult to support change in number of Kafka > > partitions across an update. > > - This is not a feature in KafkaIO yet. So not a new breakage. > > - If we don't depend on 'desiredNumWorkers', there is no way for us > > to know how many splits we had before the update. This is actually > a > > limitation of UnboundedSource API. UnboundedSource needs > > multiple teaks to > > support job update better. In that sense I don't think this should > > be a > > blocker. > > - A work around is to let user explicitly set number of splits. > E.g. > > - when a job starts, say we had 70 partitions and after some > time > > we add 10 more partitions. > > - At runtime, each Kafka split notices these and can distribute > > new partitions among existing 70. > > - But when the job is updated, KafkaIO does not know that it had > > only 70 partitions earlier. > > - For this to work, user could set number of splits to 70 > > explicitly. > > > > > > Thanks. > > Raghu. > > >