Hi Dong,
I think your approach will allow user to distinguish between the metadata > before and after the topic deletion. I also agree that this can be > potentially be useful to user. I am just not very sure whether we already > have a good use-case to make the additional complexity worthwhile. It seems > that this feature is kind of independent of the main problem of this KIP. > Could we add this as a future work? Do you think it's fair if we bump the topic epoch on deletion and leave propagation of the epoch for deleted topics for future work? I don't think this adds much complexity and it makes the behavior consistent: every topic mutation results in an epoch bump. Thanks, Jason On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Ismael, > > I guess we actually need user to see this field so that user can store this > value in the external store together with the offset. We just prefer the > value to be opaque to discourage most users from interpreting this value. > One more advantage of using such an opaque field is to be able to evolve > the information (or schema) of this value without changing consumer API in > the future. > > I also thinking it is probably OK for user to be able to interpret this > value, particularly for those advanced users. > > Thanks, > Dong > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma <ism...@juma.me.uk> wrote: > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > > class OffsetAndMetadata { > > > long offset; > > > byte[] offsetMetadata; > > > String metadata; > > > } > > > > > > > Admittedly, the naming is a bit annoying, but we can probably come up > > with > > > something better. Internally the byte array would have a version. If in > > the > > > future we have anything else we need to add, we can update the version > > and > > > we wouldn't need any new APIs. > > > > > > > We can also add fields to a class in a compatible way. So, it seems to me > > that the main advantage of the byte array is that it's opaque to the > user. > > Is that correct? If so, we could also add any opaque metadata in a > subclass > > so that users don't even see it (unless they cast it, but then they're on > > their own). > > > > Ismael > > > > The corresponding seek() and position() APIs might look something like > > this: > > > > > > void seek(TopicPartition partition, long offset, byte[] > offsetMetadata); > > > byte[] positionMetadata(TopicPartition partition); > > > > > > What do you think? > > > > > > Thanks, > > > Jason > > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > > > Hey Jun, Jason, > > > > > > > > Thanks much for all the feedback. I have updated the KIP based on the > > > > latest discussion. Can you help check whether it looks good? > > > > > > > > Thanks, > > > > Dong > > > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > > > Hey Jun, > > > > > > > > > > Hmm... thinking about this more, I am not sure that the proposed > API > > is > > > > > sufficient. For users that store offset externally, we probably > need > > > > extra > > > > > API to return the leader_epoch and partition_epoch for all > partitions > > > > that > > > > > consumers are consuming. I suppose these users currently use > > position() > > > > to > > > > > get the offset. Thus we probably need a new method > > > positionWithEpoch(..) > > > > to > > > > > return <offset, partition_epoch, leader_epoch>. Does this sound > > > > reasonable? > > > > > > > > > > Thanks, > > > > > Dong > > > > > > > > > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > >> Hi, Dong, > > > > >> > > > > >> Yes, that's what I am thinking. OffsetEpoch will be composed of > > > > >> (partition_epoch, > > > > >> leader_epoch). > > > > >> > > > > >> Thanks, > > > > >> > > > > >> Jun > > > > >> > > > > >> > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> > > wrote: > > > > >> > > > > >> > Hey Jun, > > > > >> > > > > > >> > Thanks much. I like the the new API that you proposed. I am not > > sure > > > > >> what > > > > >> > you exactly mean by offset_epoch. I suppose that we can use the > > pair > > > > of > > > > >> > (partition_epoch, leader_epoch) as the offset_epoch, right? > > > > >> > > > > > >> > Thanks, > > > > >> > Dong > > > > >> > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <j...@confluent.io> > wrote: > > > > >> > > > > > >> > > Hi, Dong, > > > > >> > > > > > > >> > > Got it. The api that you proposed works. The question is > whether > > > > >> that's > > > > >> > the > > > > >> > > api that we want to have in the long term. My concern is that > > > while > > > > >> the > > > > >> > api > > > > >> > > change is simple, the new api seems harder to explain and use. > > For > > > > >> > example, > > > > >> > > a consumer storing offsets externally now needs to call > > > > >> > > waitForMetadataUpdate() after calling seek(). > > > > >> > > > > > > >> > > An alternative approach is to make the following compatible > api > > > > >> changes > > > > >> > in > > > > >> > > Consumer. > > > > >> > > * Add an additional OffsetEpoch field in OffsetAndMetadata. > (no > > > need > > > > >> to > > > > >> > > change the CommitSync() api) > > > > >> > > * Add a new api seek(TopicPartition partition, long offset, > > > > >> OffsetEpoch > > > > >> > > offsetEpoch). We can potentially deprecate the old api > > > > >> > seek(TopicPartition > > > > >> > > partition, long offset) in the future. > > > > >> > > > > > > >> > > The alternative approach has similar amount of api changes as > > > yours > > > > >> but > > > > >> > has > > > > >> > > the following benefits. > > > > >> > > 1. The api works in a similar way as how offset management > works > > > now > > > > >> and > > > > >> > is > > > > >> > > probably what we want in the long term. > > > > >> > > 2. It can reset offsets better when there is data loss due to > > > > unclean > > > > >> > > leader election or correlated replica failure. > > > > >> > > 3. It can reset offsets better when topic is recreated. > > > > >> > > > > > > >> > > Thanks, > > > > >> > > > > > > >> > > Jun > > > > >> > > > > > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <lindon...@gmail.com > > > > > > wrote: > > > > >> > > > > > > >> > > > Hey Jun, > > > > >> > > > > > > > >> > > > Yeah I agree that ideally we don't want an ever growing > global > > > > >> metadata > > > > >> > > > version. I just think it may be more desirable to keep the > > > > consumer > > > > >> API > > > > >> > > > simple. > > > > >> > > > > > > > >> > > > In my current proposal, metadata version returned in the > fetch > > > > >> response > > > > >> > > > will be stored with the offset together. More specifically, > > the > > > > >> > > > metadata_epoch in the new offset topic schema will be the > > > largest > > > > >> > > > metadata_epoch from all the MetadataResponse and > FetchResponse > > > > ever > > > > >> > > > received by this consumer. > > > > >> > > > > > > > >> > > > We probably don't have to change the consumer API for > > > > >> > > > commitSync(Map<TopicPartition, OffsetAndMetadata>). If user > > > calls > > > > >> > > > commitSync(...) to commit offset 10 for a given partition, > for > > > > most > > > > >> > > > use-cases, this consumer instance should have consumed > message > > > > with > > > > >> > > offset > > > > >> > > > 9 from this partition, in which case the consumer can > remember > > > and > > > > >> use > > > > >> > > the > > > > >> > > > metadata_epoch from the corresponding FetchResponse when > > > > committing > > > > >> > > offset. > > > > >> > > > If user calls commitSync(..) to commit offset 10 for a given > > > > >> partition > > > > >> > > > without having consumed the message with offset 9 using this > > > > >> consumer > > > > >> > > > instance, this is probably an advanced use-case. In this > case > > > the > > > > >> > > advanced > > > > >> > > > user can retrieve the metadata_epoch using the newly added > > > > >> > > metadataEpoch() > > > > >> > > > API after it fetches the message with offset 9 (probably > from > > > > >> another > > > > >> > > > consumer instance) and encode this metadata_epoch in the > > > > >> > > > string OffsetAndMetadata.metadata. Do you think this > solution > > > > would > > > > >> > work? > > > > >> > > > > > > > >> > > > By "not sure that I fully understand your latest > suggestion", > > > are > > > > >> you > > > > >> > > > referring to solution related to unclean leader election > using > > > > >> > > leader_epoch > > > > >> > > > in my previous email? > > > > >> > > > > > > > >> > > > Thanks, > > > > >> > > > Dong > > > > >> > > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <j...@confluent.io> > > > wrote: > > > > >> > > > > > > > >> > > > > Hi, Dong, > > > > >> > > > > > > > > >> > > > > Not sure that I fully understand your latest suggestion. > > > > >> Returning an > > > > >> > > > ever > > > > >> > > > > growing global metadata version itself is no ideal, but is > > > fine. > > > > >> My > > > > >> > > > > question is whether the metadata version returned in the > > fetch > > > > >> > response > > > > >> > > > > needs to be stored with the offset together if offsets are > > > > stored > > > > >> > > > > externally. If so, we also have to change the consumer API > > for > > > > >> > > > commitSync() > > > > >> > > > > and need to worry about compatibility. If we don't store > the > > > > >> metadata > > > > >> > > > > version together with the offset, on a consumer restart, > > it's > > > > not > > > > >> > clear > > > > >> > > > how > > > > >> > > > > we can ensure the metadata in the consumer is high enough > > > since > > > > >> there > > > > >> > > is > > > > >> > > > no > > > > >> > > > > metadata version to compare with. > > > > >> > > > > > > > > >> > > > > Thanks, > > > > >> > > > > > > > > >> > > > > Jun > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin < > > lindon...@gmail.com > > > > > > > > >> > wrote: > > > > >> > > > > > > > > >> > > > > > Hey Jun, > > > > >> > > > > > > > > > >> > > > > > Thanks much for the explanation. > > > > >> > > > > > > > > > >> > > > > > I understand the advantage of partition_epoch over > > > > >> metadata_epoch. > > > > >> > My > > > > >> > > > > > current concern is that the use of leader_epoch and the > > > > >> > > partition_epoch > > > > >> > > > > > requires us considerable change to consumer's public API > > to > > > > take > > > > >> > care > > > > >> > > > of > > > > >> > > > > > the case where user stores offset externally. For > example, > > > > >> > > *consumer*. > > > > >> > > > > > *commitSync*(..) would have to take a map whose value is > > > > >> <offset, > > > > >> > > > > metadata, > > > > >> > > > > > leader epoch, partition epoch>. *consumer*.*seek*(...) > > would > > > > >> also > > > > >> > > need > > > > >> > > > > > leader_epoch and partition_epoch as parameter. > Technically > > > we > > > > >> can > > > > >> > > > > probably > > > > >> > > > > > still make it work in a backward compatible manner after > > > > careful > > > > >> > > design > > > > >> > > > > and > > > > >> > > > > > discussion. But these changes can make the consumer's > > > > interface > > > > >> > > > > > unnecessarily complex for more users who do not store > > offset > > > > >> > > > externally. > > > > >> > > > > > > > > > >> > > > > > After thinking more about it, we can address all > problems > > > > >> discussed > > > > >> > > by > > > > >> > > > > only > > > > >> > > > > > using the metadata_epoch without introducing > leader_epoch > > or > > > > the > > > > >> > > > > > partition_epoch. The current KIP describes the changes > to > > > the > > > > >> > > consumer > > > > >> > > > > API > > > > >> > > > > > and how the new API can be used if user stores offset > > > > >> externally. > > > > >> > In > > > > >> > > > > order > > > > >> > > > > > to address the scenario you described earlier, we can > > > include > > > > >> > > > > > metadata_epoch in the FetchResponse and the > > > > LeaderAndIsrRequest. > > > > >> > > > Consumer > > > > >> > > > > > remembers the largest metadata_epoch from all the > > > > FetchResponse > > > > >> it > > > > >> > > has > > > > >> > > > > > received. The metadata_epoch committed with the offset, > > > either > > > > >> > within > > > > >> > > > or > > > > >> > > > > > outside Kafka, should be the largest metadata_epoch > across > > > all > > > > >> > > > > > FetchResponse and MetadataResponse ever received by this > > > > >> consumer. > > > > >> > > > > > > > > > >> > > > > > The drawback of using only the metadata_epoch is that we > > can > > > > not > > > > >> > > always > > > > >> > > > > do > > > > >> > > > > > the smart offset reset in case of unclean leader > election > > > > which > > > > >> you > > > > >> > > > > > mentioned earlier. But in most case, unclean leader > > election > > > > >> > probably > > > > >> > > > > > happens when consumer is not rebalancing/restarting. In > > > these > > > > >> > cases, > > > > >> > > > > either > > > > >> > > > > > consumer is not directly affected by unclean leader > > election > > > > >> since > > > > >> > it > > > > >> > > > is > > > > >> > > > > > not consuming from the end of the log, or consumer can > > > derive > > > > >> the > > > > >> > > > > > leader_epoch from the most recent message received > before > > it > > > > >> sees > > > > >> > > > > > OffsetOutOfRangeException. So I am not sure it is worth > > > adding > > > > >> the > > > > >> > > > > > leader_epoch to consumer API to address the remaining > > corner > > > > >> case. > > > > >> > > What > > > > >> > > > > do > > > > >> > > > > > you think? > > > > >> > > > > > > > > > >> > > > > > Thanks, > > > > >> > > > > > Dong > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao < > j...@confluent.io > > > > > > > >> wrote: > > > > >> > > > > > > > > > >> > > > > > > Hi, Dong, > > > > >> > > > > > > > > > > >> > > > > > > Thanks for the reply. > > > > >> > > > > > > > > > > >> > > > > > > To solve the topic recreation issue, we could use > > either a > > > > >> global > > > > >> > > > > > metadata > > > > >> > > > > > > version or a partition level epoch. But either one > will > > > be a > > > > >> new > > > > >> > > > > concept, > > > > >> > > > > > > right? To me, the latter seems more natural. It also > > makes > > > > it > > > > >> > > easier > > > > >> > > > to > > > > >> > > > > > > detect if a consumer's offset is still valid after a > > topic > > > > is > > > > >> > > > > recreated. > > > > >> > > > > > As > > > > >> > > > > > > you pointed out, we don't need to store the partition > > > epoch > > > > in > > > > >> > the > > > > >> > > > > > message. > > > > >> > > > > > > The following is what I am thinking. When a partition > is > > > > >> created, > > > > >> > > we > > > > >> > > > > can > > > > >> > > > > > > assign a partition epoch from an ever-increasing > global > > > > >> counter > > > > >> > and > > > > >> > > > > store > > > > >> > > > > > > it in /brokers/topics/[topic]/ > partitions/[partitionId] > > in > > > > ZK. > > > > >> > The > > > > >> > > > > > > partition > > > > >> > > > > > > epoch is propagated to every broker. The consumer will > > be > > > > >> > tracking > > > > >> > > a > > > > >> > > > > > tuple > > > > >> > > > > > > of <offset, leader epoch, partition epoch> for > offsets. > > > If a > > > > >> > topic > > > > >> > > is > > > > >> > > > > > > recreated, it's possible that a consumer's offset and > > > leader > > > > >> > epoch > > > > >> > > > > still > > > > >> > > > > > > match that in the broker, but partition epoch won't > be. > > In > > > > >> this > > > > >> > > case, > > > > >> > > > > we > > > > >> > > > > > > can potentially still treat the consumer's offset as > out > > > of > > > > >> range > > > > >> > > and > > > > >> > > > > > reset > > > > >> > > > > > > the offset based on the offset reset policy in the > > > consumer. > > > > >> This > > > > >> > > > seems > > > > >> > > > > > > harder to do with a global metadata version. > > > > >> > > > > > > > > > > >> > > > > > > Jun > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin < > > > > >> lindon...@gmail.com> > > > > >> > > > wrote: > > > > >> > > > > > > > > > > >> > > > > > > > Hey Jun, > > > > >> > > > > > > > > > > > >> > > > > > > > This is a very good example. After thinking through > > this > > > > in > > > > >> > > > detail, I > > > > >> > > > > > > agree > > > > >> > > > > > > > that we need to commit offset with leader epoch in > > order > > > > to > > > > >> > > address > > > > >> > > > > > this > > > > >> > > > > > > > example. > > > > >> > > > > > > > > > > > >> > > > > > > > I think the remaining question is how to address the > > > > >> scenario > > > > >> > > that > > > > >> > > > > the > > > > >> > > > > > > > topic is deleted and re-created. One possible > solution > > > is > > > > to > > > > >> > > commit > > > > >> > > > > > > offset > > > > >> > > > > > > > with both the leader epoch and the metadata version. > > The > > > > >> logic > > > > >> > > and > > > > >> > > > > the > > > > >> > > > > > > > implementation of this solution does not require a > new > > > > >> concept > > > > >> > > > (e.g. > > > > >> > > > > > > > partition epoch) and it does not require any change > to > > > the > > > > >> > > message > > > > >> > > > > > format > > > > >> > > > > > > > or leader epoch. It also allows us to order the > > metadata > > > > in > > > > >> a > > > > >> > > > > > > > straightforward manner which may be useful in the > > > future. > > > > >> So it > > > > >> > > may > > > > >> > > > > be > > > > >> > > > > > a > > > > >> > > > > > > > better solution than generating a random partition > > epoch > > > > >> every > > > > >> > > time > > > > >> > > > > we > > > > >> > > > > > > > create a partition. Does this sound reasonable? > > > > >> > > > > > > > > > > > >> > > > > > > > Previously one concern with using the metadata > version > > > is > > > > >> that > > > > >> > > > > consumer > > > > >> > > > > > > > will be forced to refresh metadata even if metadata > > > > version > > > > >> is > > > > >> > > > > > increased > > > > >> > > > > > > > due to topics that the consumer is not interested > in. > > > Now > > > > I > > > > >> > > > realized > > > > >> > > > > > that > > > > >> > > > > > > > this is probably not a problem. Currently client > will > > > > >> refresh > > > > >> > > > > metadata > > > > >> > > > > > > > either due to InvalidMetadataException in the > response > > > > from > > > > >> > > broker > > > > >> > > > or > > > > >> > > > > > due > > > > >> > > > > > > > to metadata expiry. The addition of the metadata > > version > > > > >> should > > > > >> > > > > > increase > > > > >> > > > > > > > the overhead of metadata refresh caused by > > > > >> > > > InvalidMetadataException. > > > > >> > > > > If > > > > >> > > > > > > > client refresh metadata due to expiry and it > receives > > a > > > > >> > metadata > > > > >> > > > > whose > > > > >> > > > > > > > version is lower than the current metadata version, > we > > > can > > > > >> > reject > > > > >> > > > the > > > > >> > > > > > > > metadata but still reset the metadata age, which > > > > essentially > > > > >> > keep > > > > >> > > > the > > > > >> > > > > > > > existing behavior in the client. > > > > >> > > > > > > > > > > > >> > > > > > > > Thanks much, > > > > >> > > > > > > > Dong > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >