If I understand correctly this will break
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L857
in
KafkaIO.

So it's a KafkaIO limitation (for now ?) ?

On Tue, Sep 13, 2016 at 11:31 AM Amit Sela <amitsel...@gmail.com> wrote:

> Thanks Thomas, you did understand correct.
> Doing this, or assigning a running id, is basically the same, as long as
> generateInitialSplits implementation is deterministic (KafkaIO actually
> notes this).
>
> So what if partitions were added at runtime to one (or more) of the topics
> I'm consuming from ?
>
> On Tue, Sep 13, 2016 at 3:51 AM Thomas Groh <tg...@google.com.invalid>
> wrote:
>
>> I'm not sure if I've understood what the problem is - from what I can tell
>> it's about associating UnboundedSource splits with Checkpoints in order to
>> get consistent behavior from the sources. If I'm wrong, the following
>> isn't
>> really relevant to your problem - it's about the expected behavior of a
>> runner interacting with any split of a Source.
>>
>> In the absence of updates, the evaluation of a split of UnboundedSource
>> must to obey the general contract for UnboundedSource, which is that
>> createReader(PipelineOptions, CheckpointMarkT) will only ever be called
>> with a Checkpoint Mark that was generated by an UnboundedReader that was
>> created from the source - i.e., a Source creates Readers and is provided
>> only checkpoints from those readers it creates. Each Source instance
>> (split
>> and top-level) should be independent of all other instances. A split of a
>> Source should generally be indistinguishable from a top-level source (it
>> will just have slightly different configuration).
>>
>> Generally this means that Source splits have to have an associated
>> identifier, but these identifiers are arbitrary and not relevant to the
>> actual evaluation of the Source - so the runner gets to tag splits however
>> it pleases, so long as those tags don't allow splits to bleed into each
>> other.
>>
>> Could you instead store the Source paired with some (arbitrary and unique)
>> key and pull out the checkpoint using the key (or even just store the keys
>> and store the source with the checkpoint)? That way you always will keep
>> the same association between Source and Checkpoint. Flink does something
>> like this where they store the serialized source alongside the
>> CheckpointMark so they're never separated (
>>
>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L164
>> and
>>
>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L334
>> )
>>
>> On Mon, Sep 12, 2016 at 11:40 AM, Amit Sela <amitsel...@gmail.com> wrote:
>>
>> > If this issue doesn't make sense for "native" streaming systems, and
>> it's
>> > only a Spark issue (and my implementation of Read.Unbounded) - I could
>> keep
>> > doing what I do, use a running id.
>> > I was just wondering... ( hence the question mark in the title ;-) )
>> >
>> > On Mon, Sep 12, 2016 at 9:31 PM Amit Sela <amitsel...@gmail.com> wrote:
>> >
>> > > Not sure how it works in Dataflow or Flink, but I'm working on an
>> > > implementation for Spark using the (almost) only stateful operator it
>> > has -
>> > > "mapWithState" - and the State needs to correspond to a key.
>> > > Each micro-batch, the Sources recreate the readers and "look-up" the
>> > > latest checkpoint.
>> > >
>> > > On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi
>> <rang...@google.com.invalid
>> > >
>> > > wrote:
>> > >
>> > >> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela <amitsel...@gmail.com>
>> wrote:
>> > >>
>> > >> > @Raghu, hashing <topic, partition> is exactly what I mean, but I'm
>> > >> asking
>> > >> > if it should be abstracted in the Source.. Otherwise, runners will
>> > have
>> > >> to
>> > >> > *is instance of* on every Source, and write their own hash
>> > >> implementation.
>> > >> > Since splits contain the "splitted" Source, and it contains it's
>> own
>> > >> > CheckpointMark, and hashing would probably be tied to that
>> > >> CheckpointMark,
>> > >> > why not abstract it in the UnboundedSource ?
>> > >> >
>> > >>
>> > >> I don't quite follow how a runner should be concerned about hashing
>> used
>> > >> by
>> > >> a Source for its own splits. Can you give a concrete example? To me
>> it
>> > >> looks like source and checkpoint objects are completely opaque to the
>> > >> runners.
>> > >>
>> > >>
>> > >> > On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi
>> > <rang...@google.com.invalid
>> > >> >
>> > >> > wrote:
>> > >> >
>> > >> > > > If splits (UnboundedSources) had an identifier, this could be
>> > >> avoided,
>> > >> > > and checkpoints could be persisted accordingly.
>> > >> > >
>> > >> > > The order of the splits that a source returns is preserved. So
>> > during
>> > >> an
>> > >> > > update, you can expect 5th split gets invoked with the same
>> > checkpoint
>> > >> > mark
>> > >> > > that 5th split saved before upgrade. You can hash <topic,
>> partition>
>> > >> to
>> > >> > one
>> > >> > > of the indices.
>> > >> > >
>> > >> > > KafkaIO.java doe not support change in partitions.
>> > >> > >
>> > >> > > On Tue, Sep 6, 2016 at 4:04 PM, Eugene Kirpichov <
>> > >> > > kirpic...@google.com.invalid> wrote:
>> > >> > >
>> > >> > > > Oh! Okay, looks like this is a part of the code I was
>> unfamiliar
>> > >> with.
>> > >> > > I'd
>> > >> > > > like to know the answer too, in this case.
>> > >> > > > +Daniel Mills <mil...@google.com> can you comment ?
>> > >> > > >
>> > >> > > > On Tue, Sep 6, 2016 at 3:32 PM Amit Sela <amitsel...@gmail.com
>> >
>> > >> wrote:
>> > >> > > >
>> > >> > > > > That is correct, as long as non of the Kafka topics "grow"
>> > another
>> > >> > > > > partition (which it could).
>> > >> > > > > In that case, some bundle will have to start reading from
>> this
>> > >> > > partition
>> > >> > > > as
>> > >> > > > > well, and since all other bundles already have a "previous
>> > >> > checkpoint"
>> > >> > > it
>> > >> > > > > matters which checkpoint to relate to. I don't know how it's
>> > >> > > implemented
>> > >> > > > in
>> > >> > > > > Dataflow, but in Spark I'm testing using mapWithState to
>> store
>> > the
>> > >> > > > > checkpoints per split.
>> > >> > > > > It also seems that order does matter to the KafkIO:
>> > >> > > > >
>> > >> > > > > https://github.com/apache/incubator-beam/blob/master/
>> > >> > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
>> > >> > > > kafka/KafkaIO.java#L636
>> > >> > > > >
>> > >> > > > > On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov
>> > >> > > > > <kirpic...@google.com.invalid> wrote:
>> > >> > > > >
>> > >> > > > > > Hi Amit,
>> > >> > > > > > Could you explain more about why you're saying the order of
>> > >> splits
>> > >> > > > > matters?
>> > >> > > > > > AFAIK the semantics of Read.Unbounded is "read from all of
>> the
>> > >> > splits
>> > >> > > > in
>> > >> > > > > > parallel, checkpointing each of them independently", so
>> their
>> > >> order
>> > >> > > > > > shouldn't matter.
>> > >> > > > > >
>> > >> > > > > > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela <
>> > amitsel...@gmail.com>
>> > >> > > wrote:
>> > >> > > > > >
>> > >> > > > > > > UnboundedSources generate initial splits, which are
>> > basically
>> > >> > > splits
>> > >> > > > of
>> > >> > > > > > > them selves - for example, if an UnboundedKafkaSource is
>> set
>> > >> to
>> > >> > > read
>> > >> > > > > from
>> > >> > > > > > > topic T1 which is made of 2 partitions P1 and P2, it will
>> > >> > > (optimally)
>> > >> > > > > > split
>> > >> > > > > > > into two UnboundedKafkaSource, one per partition.
>> > >> > > > > > > During the lifecycle of the "reader" bundles,
>> > CheckpointMarks
>> > >> are
>> > >> > > > used
>> > >> > > > > to
>> > >> > > > > > > checkpoint "last-read" and so readers may restart/resume.
>> > I'm
>> > >> > > > assuming
>> > >> > > > > > this
>> > >> > > > > > > is how newly created partitions will automatically be
>> added
>> > to
>> > >> > > > readers.
>> > >> > > > > > >
>> > >> > > > > > > The problem is that it's all fine while there is only one
>> > >> topic
>> > >> > > > (Kafka
>> > >> > > > > > > topic-partition pairs are ordered), but if reading from
>> more
>> > >> then
>> > >> > > one
>> > >> > > > > > topic
>> > >> > > > > > > this may break:
>> > >> > > > > > > T1,P1
>> > >> > > > > > > T1,P2
>> > >> > > > > > > T1,P3
>> > >> > > > > > > T2,P1
>> > >> > > > > > > The order is not maintained and T2,P1 is 4th now.
>> > >> > > > > > >
>> > >> > > > > > > If splits (UnboundedSources) had an identifier, this
>> could
>> > be
>> > >> > > > avoided,
>> > >> > > > > > and
>> > >> > > > > > > checkpoints could be persisted accordingly.
>> > >> > > > > > > For example, an UnboundedKafkaSource could use the hash
>> code
>> > >> of
>> > >> > > it's
>> > >> > > > > > > assigned topic-partition pairs.
>> > >> > > > > > >
>> > >> > > > > > > I don't know how relevant this is to other Sources, but I
>> > >> guess
>> > >> > it
>> > >> > > is
>> > >> > > > > as
>> > >> > > > > > > long as they may grow their partitions dynamically
>> (though I
>> > >> > might
>> > >> > > be
>> > >> > > > > > > completely wrong...) and I don't see much of a downside.
>> > >> > > > > > >
>> > >> > > > > > > Thoughts ?
>> > >> > > > > > >
>> > >> > > > > > > This is something that troubles me now while working on
>> > >> > > > Read.Unbounded,
>> > >> > > > > > and
>> > >> > > > > > > from a quick look I saw that the FlinkRunner expects
>> > "stable"
>> > >> > > > splitting
>> > >> > > > > > as
>> > >> > > > > > > well.. How does Dataflow handle this ?
>> > >> > > > > > >
>> > >> > > > > > > Thanks,
>> > >> > > > > > > Amit
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> >
>>
>

Reply via email to