On Fri, Oct 7, 2016 at 3:17 AM, Amit Sela <amitsel...@gmail.com> wrote:
> Filed https://issues.apache.org/jira/browse/BEAM-727. > Thanks. This is currently marked 'minor'. Please comment there if the feature is important (either to you specifically, or users in general). > As for initializing from workers, this is actually the opposite of what's > discussed in https://issues.apache.org/jira/browse/BEAM-704. > IMO Kafka metadata should be accessed once, before splitting, since this > metadata eventually determines the splitting (and for other reasons > mentioned in BEAM-704). > I commented on the jira. Not sure if I quite follow the issue. Offset is the most important state stored in checkpoint. As for the "Driver/Launcher" program accessing Kafka, I guess it could run > on a worker, right ? I can only speak for Spark saying that when running on > YARN it can run in "cluster-mode" - running the driver program in a YARN > container as well. > That's neat. I don't think Dataflow has such an option. Raghu. > > On Tue, Oct 4, 2016 at 10:39 PM Raghu Angadi <rang...@google.com.invalid> > wrote: > > > On Wed, Sep 14, 2016 at 1:43 PM, Amit Sela <amitsel...@gmail.com> wrote: > > > > > > > > > > For generateInitialSplits, the UnboundedSource API doesn't require > > > > deterministic splitting (although it's recommended), and a > > PipelineRunner > > > > should keep track of the initially generated splits. > > > > > > > If the splitting were to be consistent, in such way that newly added > > > partitions would be assigned with a new "splitId" while existing ones > > would > > > still be assigned with the same (consistent) splitId, it could support > > > newly added partitions, no ? > > > > > > Yes, consistently assigning the partitions will let us do this. I > wouldn't > > hash though, it would not distribute partitions evenly when the number of > > partitions is low (say 2 or 10, which is pretty common case). We can > assign > > consistently even with a round-robin assignment. Current assignment would > > work, except when we are reading from multiple topics. We can update it > to > > handle multiple topics better (assign each partitions for each topic > > independently). > > > > This strategy would still depend on strong guarantee on > > generateInitialSplits() interface where 'desiredNumSplits' stays same > > across updates. > > > > Please file a jira for adding support for handling change in Kafka > > partitions. > > > > In fact, KafkaIO should probably not fetch partitions info form inside > > generateIntialSplits() at all (sometimes Kafka cluster might not be > > accessible from where the job is launched from) and instead do all the > > initialization from the workers, even though it implies multiple fetches > of > > Kafka metadata. > > >