I agree with all of this, except

I think this also avoids the need to "remember" the original number of
> parallelism.


KafkaIO still need to decide how many splits it needs to return in
generateInitialSplits().

'Update' could be Dataflow specific concern. We could drop it for this
thread, though I think IO and runners do interact with each other and thus
should influence the api. But that might be slated for future.

On Fri, Nov 11, 2016 at 6:09 AM, Amit Sela <amitsel...@gmail.com> wrote:

> +1
> I think this makes more sense then the existing form of a split that is
> made of several Kafka partitions since, as mentioned, Kafka partitions are
> in fact it's parallelism.
>
> As for supporting a change in the number of partitions (mainly, added
> partitions), I'll suggest something I brought up before, and might make
> more sense now:
> Hashing an UnboundedSource according to it's split's properties
> (topic-partition in this case). This will allow to key the stream by the
> source in a way that the reader's CheckpointMark is tied to the split, and
> if a "new split" is created (a new partition added to a topic the pipeline
> consumes) it's reader's state is non-existing (starting from
> latest/earlies), while the rest (of the readers) will pick-up where they
> left.
> I think this also avoids the need to "remember" the original number of
> parallelism.
>
> Thanks,
> Amit
>
> On Fri, Nov 11, 2016 at 4:22 AM Raghu Angadi <rang...@google.com.invalid>
> wrote:
>
> > I would like to propose a change to how many splits (sources) KafkaIO
> > creates. The code changes are relatively simple, but it has a couple of
> > drawbacks I would to discuss here.
> >
> > KafkaIO currently takes '*desiredNumWorkers
> > <
> > https://github.com/apache/incubator-beam/blob/v0.3.0-
> incubating-RC1/sdks/java/io/kafka/src/main/java/org/
> apache/beam/sdk/io/kafka/KafkaIO.java#L642
> > >*'
> > hint literally and returns exactly that many splits. If
> *desiredNumWorkers*
> > is 10, and the topic has 50 partitions, each Kafka source reads from 5
> > partitions.
> >
> > The primary disadvantage is that runner dependent 'desiredNumWorkers'
> might
> > not be accurate. In Dataflow, it is particularly low when we set
> > 'maxNumWorkers' (BEAM-958 <https://issues.apache.org/
> jira/browse/BEAM-958
> > >).
> > In addition, number of partitions in Kafka is a really good indicator of
> > its parallelism.
> >
> > I would like to change KafkaIO to return one split for each of the
> > partitions.
> >
> > Pros:
> >
> >    - A partition is in fact the unit of parallelism in Kafka.
> >    - Does not depend on 'desiredNumWorkers'.
> >    - Little risk of having unreasonably large number of partitions
> (unlike
> >    say a source with one split for file). Number of partitions tend to be
> > on
> >    the order of the Kafka cluster size.
> >
> > Cons: mainly affects job update:
> >
> >    - Breaks updating existing job
> >    <https://cloud.google.com/dataflow/pipelines/updating-a-pipeline> if
> it
> >    is updated to newer version of KafkaIO. New version changes number of
> >    splits returned, which is not allowed during update.
> >       - I think this is a reasonable breakage at this stage.
> >       - Vast majority of updates don't involve version change
> >       - We could add a work around where user can explicitly set number
> of
> >       splits in KafkaIO (this might be required to handle change in
> > partitions as
> >       well, see below)
> >    - Makes it a bit more difficult to support change in number of Kafka
> >    partitions across an update.
> >       - This is not a feature in KafkaIO yet. So not a new breakage.
> >       - If we don't depend on 'desiredNumWorkers', there is no way for us
> >       to know how many splits we had before the update. This is actually
> a
> >       limitation of UnboundedSource API. UnboundedSource needs
> > multiple teaks to
> >       support job update better. In that sense I don't think this should
> > be a
> >       blocker.
> >       - A work around is to let user explicitly set number of splits.
> E.g.
> >          - when a job starts, say we had 70 partitions and after some
> time
> >          we add 10 more partitions.
> >          - At runtime, each Kafka split notices these and can distribute
> >          new partitions among existing 70.
> >          - But when the job is updated, KafkaIO does not know that it had
> >          only 70 partitions earlier.
> >          - For this to work, user could set number of splits to 70
> >          explicitly.
> >
> >
> > Thanks.
> > Raghu.
> >
>

Reply via email to