On Fri, Oct 7, 2016 at 3:17 AM, Amit Sela <amitsel...@gmail.com> wrote:

> Filed https://issues.apache.org/jira/browse/BEAM-727.
>

Thanks. This is currently marked 'minor'. Please comment there if the
feature is important (either to you specifically, or users in general).


> As for initializing from workers, this is actually the opposite of what's
> discussed in https://issues.apache.org/jira/browse/BEAM-704.
> IMO Kafka metadata should be accessed once, before splitting, since this
> metadata eventually determines the splitting (and for other reasons
> mentioned in BEAM-704).
>

I commented on the jira. Not sure if I quite follow the issue. Offset is
the most important state stored in checkpoint.

As for the "Driver/Launcher" program accessing Kafka, I guess it could run
> on a worker, right ? I can only speak for Spark saying that when running on
> YARN it can run in "cluster-mode" - running the driver program in a YARN
> container as well.
>

That's neat. I don't think Dataflow has such an option.

Raghu.

>
> On Tue, Oct 4, 2016 at 10:39 PM Raghu Angadi <rang...@google.com.invalid>
> wrote:
>
> > On Wed, Sep 14, 2016 at 1:43 PM, Amit Sela <amitsel...@gmail.com> wrote:
> >
> > > >
> > > > For generateInitialSplits, the UnboundedSource API doesn't require
> > > > deterministic splitting (although it's recommended), and a
> > PipelineRunner
> > > > should keep track of the initially generated splits.
> > > >
> > > If the splitting were to be consistent, in such way that newly added
> > > partitions would be assigned with a new "splitId" while existing ones
> > would
> > > still be assigned with the same (consistent) splitId, it could support
> > > newly added partitions, no ?
> >
> >
> > Yes, consistently assigning the partitions will let us do this. I
> wouldn't
> > hash though, it would not distribute partitions evenly when the number of
> > partitions is low (say 2 or 10, which is pretty common case). We can
> assign
> > consistently even with a round-robin assignment. Current assignment would
> > work, except when we are reading from multiple topics. We can update it
> to
> > handle multiple topics better (assign each partitions for each topic
> > independently).
> >
> > This strategy would still depend on strong guarantee on
> > generateInitialSplits() interface where 'desiredNumSplits' stays same
> > across updates.
> >
> > Please file a jira for adding support for handling change in Kafka
> > partitions.
> >
> > In fact, KafkaIO should probably not fetch partitions info form inside
> > generateIntialSplits() at all (sometimes Kafka cluster might not be
> > accessible from where the job is launched from) and instead do all the
> > initialization from the workers, even though it implies multiple fetches
> of
> > Kafka metadata.
> >
>

Reply via email to