If I understand correctly this will break https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L857 in KafkaIO.
So it's a KafkaIO limitation (for now ?) ? On Tue, Sep 13, 2016 at 11:31 AM Amit Sela <amitsel...@gmail.com> wrote: > Thanks Thomas, you did understand correct. > Doing this, or assigning a running id, is basically the same, as long as > generateInitialSplits implementation is deterministic (KafkaIO actually > notes this). > > So what if partitions were added at runtime to one (or more) of the topics > I'm consuming from ? > > On Tue, Sep 13, 2016 at 3:51 AM Thomas Groh <tg...@google.com.invalid> > wrote: > >> I'm not sure if I've understood what the problem is - from what I can tell >> it's about associating UnboundedSource splits with Checkpoints in order to >> get consistent behavior from the sources. If I'm wrong, the following >> isn't >> really relevant to your problem - it's about the expected behavior of a >> runner interacting with any split of a Source. >> >> In the absence of updates, the evaluation of a split of UnboundedSource >> must to obey the general contract for UnboundedSource, which is that >> createReader(PipelineOptions, CheckpointMarkT) will only ever be called >> with a Checkpoint Mark that was generated by an UnboundedReader that was >> created from the source - i.e., a Source creates Readers and is provided >> only checkpoints from those readers it creates. Each Source instance >> (split >> and top-level) should be independent of all other instances. A split of a >> Source should generally be indistinguishable from a top-level source (it >> will just have slightly different configuration). >> >> Generally this means that Source splits have to have an associated >> identifier, but these identifiers are arbitrary and not relevant to the >> actual evaluation of the Source - so the runner gets to tag splits however >> it pleases, so long as those tags don't allow splits to bleed into each >> other. >> >> Could you instead store the Source paired with some (arbitrary and unique) >> key and pull out the checkpoint using the key (or even just store the keys >> and store the source with the checkpoint)? That way you always will keep >> the same association between Source and Checkpoint. Flink does something >> like this where they store the serialized source alongside the >> CheckpointMark so they're never separated ( >> >> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L164 >> and >> >> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L334 >> ) >> >> On Mon, Sep 12, 2016 at 11:40 AM, Amit Sela <amitsel...@gmail.com> wrote: >> >> > If this issue doesn't make sense for "native" streaming systems, and >> it's >> > only a Spark issue (and my implementation of Read.Unbounded) - I could >> keep >> > doing what I do, use a running id. >> > I was just wondering... ( hence the question mark in the title ;-) ) >> > >> > On Mon, Sep 12, 2016 at 9:31 PM Amit Sela <amitsel...@gmail.com> wrote: >> > >> > > Not sure how it works in Dataflow or Flink, but I'm working on an >> > > implementation for Spark using the (almost) only stateful operator it >> > has - >> > > "mapWithState" - and the State needs to correspond to a key. >> > > Each micro-batch, the Sources recreate the readers and "look-up" the >> > > latest checkpoint. >> > > >> > > On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi >> <rang...@google.com.invalid >> > > >> > > wrote: >> > > >> > >> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela <amitsel...@gmail.com> >> wrote: >> > >> >> > >> > @Raghu, hashing <topic, partition> is exactly what I mean, but I'm >> > >> asking >> > >> > if it should be abstracted in the Source.. Otherwise, runners will >> > have >> > >> to >> > >> > *is instance of* on every Source, and write their own hash >> > >> implementation. >> > >> > Since splits contain the "splitted" Source, and it contains it's >> own >> > >> > CheckpointMark, and hashing would probably be tied to that >> > >> CheckpointMark, >> > >> > why not abstract it in the UnboundedSource ? >> > >> > >> > >> >> > >> I don't quite follow how a runner should be concerned about hashing >> used >> > >> by >> > >> a Source for its own splits. Can you give a concrete example? To me >> it >> > >> looks like source and checkpoint objects are completely opaque to the >> > >> runners. >> > >> >> > >> >> > >> > On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi >> > <rang...@google.com.invalid >> > >> > >> > >> > wrote: >> > >> > >> > >> > > > If splits (UnboundedSources) had an identifier, this could be >> > >> avoided, >> > >> > > and checkpoints could be persisted accordingly. >> > >> > > >> > >> > > The order of the splits that a source returns is preserved. So >> > during >> > >> an >> > >> > > update, you can expect 5th split gets invoked with the same >> > checkpoint >> > >> > mark >> > >> > > that 5th split saved before upgrade. You can hash <topic, >> partition> >> > >> to >> > >> > one >> > >> > > of the indices. >> > >> > > >> > >> > > KafkaIO.java doe not support change in partitions. >> > >> > > >> > >> > > On Tue, Sep 6, 2016 at 4:04 PM, Eugene Kirpichov < >> > >> > > kirpic...@google.com.invalid> wrote: >> > >> > > >> > >> > > > Oh! Okay, looks like this is a part of the code I was >> unfamiliar >> > >> with. >> > >> > > I'd >> > >> > > > like to know the answer too, in this case. >> > >> > > > +Daniel Mills <mil...@google.com> can you comment ? >> > >> > > > >> > >> > > > On Tue, Sep 6, 2016 at 3:32 PM Amit Sela <amitsel...@gmail.com >> > >> > >> wrote: >> > >> > > > >> > >> > > > > That is correct, as long as non of the Kafka topics "grow" >> > another >> > >> > > > > partition (which it could). >> > >> > > > > In that case, some bundle will have to start reading from >> this >> > >> > > partition >> > >> > > > as >> > >> > > > > well, and since all other bundles already have a "previous >> > >> > checkpoint" >> > >> > > it >> > >> > > > > matters which checkpoint to relate to. I don't know how it's >> > >> > > implemented >> > >> > > > in >> > >> > > > > Dataflow, but in Spark I'm testing using mapWithState to >> store >> > the >> > >> > > > > checkpoints per split. >> > >> > > > > It also seems that order does matter to the KafkIO: >> > >> > > > > >> > >> > > > > https://github.com/apache/incubator-beam/blob/master/ >> > >> > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/ >> > >> > > > kafka/KafkaIO.java#L636 >> > >> > > > > >> > >> > > > > On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov >> > >> > > > > <kirpic...@google.com.invalid> wrote: >> > >> > > > > >> > >> > > > > > Hi Amit, >> > >> > > > > > Could you explain more about why you're saying the order of >> > >> splits >> > >> > > > > matters? >> > >> > > > > > AFAIK the semantics of Read.Unbounded is "read from all of >> the >> > >> > splits >> > >> > > > in >> > >> > > > > > parallel, checkpointing each of them independently", so >> their >> > >> order >> > >> > > > > > shouldn't matter. >> > >> > > > > > >> > >> > > > > > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela < >> > amitsel...@gmail.com> >> > >> > > wrote: >> > >> > > > > > >> > >> > > > > > > UnboundedSources generate initial splits, which are >> > basically >> > >> > > splits >> > >> > > > of >> > >> > > > > > > them selves - for example, if an UnboundedKafkaSource is >> set >> > >> to >> > >> > > read >> > >> > > > > from >> > >> > > > > > > topic T1 which is made of 2 partitions P1 and P2, it will >> > >> > > (optimally) >> > >> > > > > > split >> > >> > > > > > > into two UnboundedKafkaSource, one per partition. >> > >> > > > > > > During the lifecycle of the "reader" bundles, >> > CheckpointMarks >> > >> are >> > >> > > > used >> > >> > > > > to >> > >> > > > > > > checkpoint "last-read" and so readers may restart/resume. >> > I'm >> > >> > > > assuming >> > >> > > > > > this >> > >> > > > > > > is how newly created partitions will automatically be >> added >> > to >> > >> > > > readers. >> > >> > > > > > > >> > >> > > > > > > The problem is that it's all fine while there is only one >> > >> topic >> > >> > > > (Kafka >> > >> > > > > > > topic-partition pairs are ordered), but if reading from >> more >> > >> then >> > >> > > one >> > >> > > > > > topic >> > >> > > > > > > this may break: >> > >> > > > > > > T1,P1 >> > >> > > > > > > T1,P2 >> > >> > > > > > > T1,P3 >> > >> > > > > > > T2,P1 >> > >> > > > > > > The order is not maintained and T2,P1 is 4th now. >> > >> > > > > > > >> > >> > > > > > > If splits (UnboundedSources) had an identifier, this >> could >> > be >> > >> > > > avoided, >> > >> > > > > > and >> > >> > > > > > > checkpoints could be persisted accordingly. >> > >> > > > > > > For example, an UnboundedKafkaSource could use the hash >> code >> > >> of >> > >> > > it's >> > >> > > > > > > assigned topic-partition pairs. >> > >> > > > > > > >> > >> > > > > > > I don't know how relevant this is to other Sources, but I >> > >> guess >> > >> > it >> > >> > > is >> > >> > > > > as >> > >> > > > > > > long as they may grow their partitions dynamically >> (though I >> > >> > might >> > >> > > be >> > >> > > > > > > completely wrong...) and I don't see much of a downside. >> > >> > > > > > > >> > >> > > > > > > Thoughts ? >> > >> > > > > > > >> > >> > > > > > > This is something that troubles me now while working on >> > >> > > > Read.Unbounded, >> > >> > > > > > and >> > >> > > > > > > from a quick look I saw that the FlinkRunner expects >> > "stable" >> > >> > > > splitting >> > >> > > > > > as >> > >> > > > > > > well.. How does Dataflow handle this ? >> > >> > > > > > > >> > >> > > > > > > Thanks, >> > >> > > > > > > Amit >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > >> >> > > >> > >> >