Hey Dong, One more thought came to mind. Have you considered edge cases around topic deletion? I think currently if a topic is deleted and then re-created, the leader epoch will start back at the beginning. It seems like that could cause trouble for this solution. One thing that helps is that we have logic to remove committed offsets for deleted topics, but there may not be any guarantees on when that happens relative to when the metadata is updated on all brokers. It seems like it could even happen that the topic is deleted and recreated quickly enough that the consumer doesn't even "witness" the deletion.
Thanks, Jason On Tue, Dec 19, 2017 at 11:40 AM, Jason Gustafson <ja...@confluent.io> wrote: > I think you're saying that depending on the bug, in the worst case, you > may have to downgrade the client. I think that's fair. Note that one > advantage of making this a fatal error is that we'll be more likely to hit > unexpected edge cases in system tests. > > -Jason > > On Tue, Dec 19, 2017 at 11:26 AM, Dong Lin <lindon...@gmail.com> wrote: > >> Hey Jason, >> >> Yeah this may sound a bit confusing. Let me explain my thoughts. >> >> If there is no bug in the client library, after consumer rebalance or >> consumer restart, consume will fetch the previously committed offset and >> fetch the committed metadata until the leader epoch in the metadata >= the >> leader epoch in the OffsetFetchResponse. Therefore, when consumer commits >> offset later, the leader epoch in the OffsetCommitRequest should be larger >> than the leader epoch from the previously committed offset. Does this >> sound >> correct? >> >> Given the above understanding, it seems to suggest that the only >> explanation for this exception is that there is bug in the client library. >> And due to this specific bug, I am not sure we can avoid this error by >> simply restarting consumer. And because this error is non-retriable, user >> may be forced to downgrade client library. Did I miss something here? >> >> Thanks, >> Dong >> >> >> On Tue, Dec 19, 2017 at 11:19 AM, Jason Gustafson <ja...@confluent.io> >> wrote: >> >> > Hey Dong, >> > >> > Thanks for the updates. Just one question: >> > >> > When application receives >> > > this exception, the only choice will be to revert Kafka client >> library to >> > > an earlier version. >> > >> > >> > Not sure I follow this. Wouldn't we just restart the consumer? That >> would >> > cause it to fetch the previous committed offset and then fetch the >> correct >> > metadata. >> > >> > Thanks, >> > Jason >> > >> > On Tue, Dec 19, 2017 at 10:36 AM, Dong Lin <lindon...@gmail.com> wrote: >> > >> > > Hey Jason, >> > > >> > > Thanks for the comments. These make sense. I have updated the KIP to >> > > include a new error INVALID_LEADER_EPOCH. This will be a non-retriable >> > > error which may be thrown from consumer's API. When application >> receives >> > > this exception, the only choice will be to revert Kafka client >> library to >> > > an earlier version. >> > > >> > > Previously I think it may be better to simply log an error because I >> am >> > not >> > > sure it is a good idea to force user to downgrade Kafka client library >> > when >> > > the error itself, e.g. smaller leader epoch, may not be that fatal. >> One >> > the >> > > other hand it could be argued that we don't know what else can go >> wrong >> > in >> > > the buggy client library and it may be a good reason to force user to >> > > downgrade library. >> > > >> > > Thanks, >> > > Dong >> > > >> > > >> > > On Tue, Dec 19, 2017 at 9:06 AM, Jason Gustafson <ja...@confluent.io> >> > > wrote: >> > > >> > > > Hey Dong, >> > > > >> > > > >> > > > > I think it is a good idea to let coordinator do the additional >> sanity >> > > > check >> > > > > to ensure the leader epoch from OffsetCommitRequest never >> decreases. >> > > This >> > > > > can help us detect bug. The next question will be what should we >> do >> > if >> > > > > OffsetCommitRequest provides a smaller leader epoch. One possible >> > > > solution >> > > > > is to return a non-retriable error to consumer which will then be >> > > thrown >> > > > to >> > > > > user application. But I am not sure it is worth doing it given its >> > > impact >> > > > > on the user. Maybe it will be safer to simply have an error >> message >> > in >> > > > the >> > > > > server log and allow offset commit to succeed. What do you think? >> > > > >> > > > >> > > > I think the check would only have value if you return an error when >> it >> > > > fails. It seems primarily useful to detect buggy consumer logic, so >> a >> > > > non-retriable error makes sense to me. Clients which don't implement >> > this >> > > > capability can use the sentinel value and keep the current behavior. >> > > > >> > > > It seems that FetchResponse includes leader epoch via the path >> > > > > FetchResponse -> MemoryRecords -> MutableRecordBatch -> >> > > > DefaultRecordBatch >> > > > > -> partitionLeaderEpoch. Could this be an existing case where we >> > expose >> > > > the >> > > > > leader epoch to clients? >> > > > >> > > > >> > > > Right, in this case the client has no direct dependence on the >> field, >> > but >> > > > it could still be argued that it is exposed (I had actually >> considered >> > > > stuffing this field into an opaque blob of bytes in the message >> format >> > > > which the client wasn't allowed to touch, but it didn't happen in >> the >> > > end). >> > > > I'm not opposed to using the leader epoch field here, I was just >> > > mentioning >> > > > that it does tie clients a bit tighter to something which could be >> > > > considered a Kafka internal implementation detail. It makes the >> > protocol >> > > a >> > > > bit less intuitive as well since it is rather difficult to explain >> the >> > > edge >> > > > case it is protecting. That said, we've hit other scenarios where >> being >> > > > able to detect stale metadata in the client would be helpful, so I >> > think >> > > it >> > > > might be worth the tradeoff. >> > > > >> > > > -Jason >> > > > >> > > > On Mon, Dec 18, 2017 at 6:09 PM, Dong Lin <lindon...@gmail.com> >> wrote: >> > > > >> > > > > Hey Jason, >> > > > > >> > > > > Thanks much for reviewing the KIP. >> > > > > >> > > > > I think it is a good idea to let coordinator do the additional >> sanity >> > > > check >> > > > > to ensure the leader epoch from OffsetCommitRequest never >> decreases. >> > > This >> > > > > can help us detect bug. The next question will be what should we >> do >> > if >> > > > > OffsetCommitRequest provides a smaller leader epoch. One possible >> > > > solution >> > > > > is to return a non-retriable error to consumer which will then be >> > > thrown >> > > > to >> > > > > user application. But I am not sure it is worth doing it given its >> > > impact >> > > > > on the user. Maybe it will be safer to simply have an error >> message >> > in >> > > > the >> > > > > server log and allow offset commit to succeed. What do you think? >> > > > > >> > > > > It seems that FetchResponse includes leader epoch via the path >> > > > > FetchResponse -> MemoryRecords -> MutableRecordBatch -> >> > > > DefaultRecordBatch >> > > > > -> partitionLeaderEpoch. Could this be an existing case where we >> > expose >> > > > the >> > > > > leader epoch to clients? >> > > > > >> > > > > Thanks, >> > > > > Dong >> > > > > >> > > > > >> > > > > >> > > > > On Mon, Dec 18, 2017 at 3:27 PM, Jason Gustafson < >> ja...@confluent.io >> > > >> > > > > wrote: >> > > > > >> > > > > > Hi Dong, >> > > > > > >> > > > > > Thanks for the KIP. Good job identifying the problem. One minor >> > > > question >> > > > > I >> > > > > > had is whether the coordinator should enforce that the leader >> epoch >> > > > > > associated with an offset commit can only go forward for each >> > > > partition? >> > > > > > Currently it looks like we just depend on the client for this, >> but >> > > > since >> > > > > > we're caching the leader epoch anyway, it seems like a cheap >> safety >> > > > > > condition. To support old clients, you can always allow the >> commit >> > if >> > > > the >> > > > > > leader epoch is unknown. >> > > > > > >> > > > > > I agree that we shouldn't expose the leader epoch in >> > > OffsetAndMetadata >> > > > in >> > > > > > the consumer API for what it's worth. As you have noted, it is >> more >> > > of >> > > > an >> > > > > > implementation detail. By the same argument, it's also a bit >> > > > unfortunate >> > > > > > that we have to expose it in the request API since that is >> nearly >> > as >> > > > > > binding in terms of how it limits future iterations. I could be >> > > wrong, >> > > > > but >> > > > > > this appears to be the first case where clients will depend on >> the >> > > > > concept >> > > > > > of leader epoch. Might not be a big deal considering how deeply >> > > > embedded >> > > > > > leader epochs already are in the inter-broker RPCs and the >> message >> > > > format >> > > > > > itself, but just wanted to mention the fact that good >> encapsulation >> > > > > applies >> > > > > > to the client request API as well. >> > > > > > >> > > > > > Thanks, >> > > > > > Jason >> > > > > > >> > > > > > On Mon, Dec 18, 2017 at 1:58 PM, Dong Lin <lindon...@gmail.com> >> > > wrote: >> > > > > > >> > > > > > > Hey Jun, >> > > > > > > >> > > > > > > Thanks much for your comments. These are very thoughtful >> ideas. >> > > > Please >> > > > > > see >> > > > > > > my comments below. >> > > > > > > >> > > > > > > On Thu, Dec 14, 2017 at 6:38 PM, Jun Rao <j...@confluent.io> >> > wrote: >> > > > > > > >> > > > > > > > Hi, Dong, >> > > > > > > > >> > > > > > > > Thanks for the update. A few more comments below. >> > > > > > > > >> > > > > > > > 10. It seems that we need to return the leader epoch in the >> > fetch >> > > > > > > response >> > > > > > > > as well When fetching data, we could be fetching data from a >> > > leader >> > > > > > epoch >> > > > > > > > older than what's returned in the metadata response. So, we >> > want >> > > to >> > > > > use >> > > > > > > the >> > > > > > > > leader epoch associated with the offset being fetched for >> > > > committing >> > > > > > > > offsets. >> > > > > > > > >> > > > > > > >> > > > > > > It seems that we may have two separate issues here. The first >> > issue >> > > > is >> > > > > > that >> > > > > > > consumer uses metadata that is older than the one it uses >> before. >> > > The >> > > > > > > second issue is that consumer uses metadata which is newer >> than >> > the >> > > > > > > corresponding leader epoch in the leader broker. We know that >> the >> > > > > > > OffsetOutOfRangeException described in this KIP can be >> prevented >> > by >> > > > > > > avoiding the first issue. On the other hand, it seems that the >> > > > > > > OffsetOffsetOutOfRangeException can still happen even if we >> > avoid >> > > > the >> > > > > > > second issue -- if consumer uses an older version of metadata, >> > the >> > > > > leader >> > > > > > > epoch in its metadata may equal the leader epoch in the broker >> > even >> > > > if >> > > > > > the >> > > > > > > leader epoch in the broker is oudated. >> > > > > > > >> > > > > > > Given this understanding, I am not sure why we need to return >> the >> > > > > leader >> > > > > > > epoch in the fetch response. As long as consumer's metadata is >> > not >> > > > > going >> > > > > > > back in version, I think we are good. Did I miss something >> here? >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > 11. Should we now extend OffsetAndMetadata used in the >> offset >> > > > commit >> > > > > > api >> > > > > > > in >> > > > > > > > KafkaConsumer to include leader epoch? Similarly, should we >> > > return >> > > > > > leader >> > > > > > > > epoch in endOffsets(), beginningOffsets() and position()? We >> > > > probably >> > > > > > > need >> > > > > > > > to think about how to make the api backward compatible. >> > > > > > > > >> > > > > > > >> > > > > > > After thinking through this carefully, I think we probably >> don't >> > > want >> > > > > to >> > > > > > > extend OffsetAndMetadata to include leader epoch because >> leader >> > > epoch >> > > > > is >> > > > > > > kind of implementation detail which ideally should be hidden >> from >> > > > user. >> > > > > > The >> > > > > > > consumer can include leader epoch in the OffsetCommitRequest >> > after >> > > > > taking >> > > > > > > offset from commitSync(final Map<TopicPartition, >> > OffsetAndMetadata> >> > > > > > > offsets). Similarly consumer can store leader epoch from >> > > > > > > OffsetFetchResponse and only provide offset to user via >> > > > > > > consumer.committed(topicPartition). This solution seems to >> work >> > > well >> > > > > and >> > > > > > > we >> > > > > > > don't have to make changes to consumer's public API. Does this >> > > sound >> > > > > OK? >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > 12. It seems that we now need to store leader epoch in the >> > offset >> > > > > > topic. >> > > > > > > > Could you include the new schema for the value of the offset >> > > topic >> > > > > and >> > > > > > > add >> > > > > > > > upgrade notes? >> > > > > > > >> > > > > > > >> > > > > > > You are right. I have updated the KIP to specify the new >> schema >> > for >> > > > the >> > > > > > > value of the offset topic. Can you take another look? >> > > > > > > >> > > > > > > For existing messages in the offset topic, leader_epoch will >> be >> > > > > missing. >> > > > > > We >> > > > > > > will use leader_epoch = -1 to indicate the missing >> leader_epoch. >> > > Then >> > > > > the >> > > > > > > consumer behavior will be the same as it is now because any >> > > > > leader_epoch >> > > > > > in >> > > > > > > the MetadataResponse will be larger than the leader_epoch = >> -1 in >> > > the >> > > > > > > OffetFetchResponse. Thus we don't need specific procedure for >> > > > upgrades >> > > > > > due >> > > > > > > to this change in the offset topic schema. By "upgrade >> nodes", do >> > > you >> > > > > > mean >> > > > > > > the sentences we need to include in the upgrade.html in the PR >> > > later? >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > Jun >> > > > > > > > >> > > > > > > > >> > > > > > > > On Tue, Dec 12, 2017 at 5:19 PM, Dong Lin < >> lindon...@gmail.com >> > > >> > > > > wrote: >> > > > > > > > >> > > > > > > > > Hey Jun, >> > > > > > > > > >> > > > > > > > > I see. Sounds good. Yeah it is probably simpler to leave >> this >> > > to >> > > > > > > another >> > > > > > > > > KIP in the future. >> > > > > > > > > >> > > > > > > > > Thanks for all the comments. Since there is no further >> > comment >> > > in >> > > > > the >> > > > > > > > > community, I will open the voting thread. >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > Dong >> > > > > > > > > >> > > > > > > > > On Mon, Dec 11, 2017 at 5:37 PM, Jun Rao < >> j...@confluent.io> >> > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi, Dong, >> > > > > > > > > > >> > > > > > > > > > The case that I am thinking is network partitioning. >> > Suppose >> > > > one >> > > > > > > > deploys >> > > > > > > > > a >> > > > > > > > > > stretched cluster across multiple AZs in the same >> region. >> > If >> > > > the >> > > > > > > > machines >> > > > > > > > > > in one AZ can't communicate to brokers in other AZs due >> to >> > a >> > > > > > network >> > > > > > > > > issue, >> > > > > > > > > > the brokers in that AZ won't get any new metadata. >> > > > > > > > > > >> > > > > > > > > > We can potentially solve this problem by requiring some >> > kind >> > > of >> > > > > > > regular >> > > > > > > > > > heartbeats between the controller and the broker. This >> may >> > > need >> > > > > > some >> > > > > > > > more >> > > > > > > > > > thoughts. So, it's probably fine to leave this to >> another >> > KIP >> > > > in >> > > > > > the >> > > > > > > > > > future. >> > > > > > > > > > >> > > > > > > > > > Thanks, >> > > > > > > > > > >> > > > > > > > > > Jun >> > > > > > > > > > >> > > > > > > > > > On Mon, Dec 11, 2017 at 2:55 PM, Dong Lin < >> > > lindon...@gmail.com >> > > > > >> > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Hey Jun, >> > > > > > > > > > > >> > > > > > > > > > > Thanks for the comment. I am open to improve this KIP >> to >> > > > > address >> > > > > > > more >> > > > > > > > > > > problems. I probably need more help in understanding >> what >> > > is >> > > > > the >> > > > > > > > > current >> > > > > > > > > > > problem with consumer using outdated metadata and >> whether >> > > it >> > > > is >> > > > > > > > easier >> > > > > > > > > to >> > > > > > > > > > > address it together with this KIP. >> > > > > > > > > > > >> > > > > > > > > > > I agree that a consumer can potentially talk to old >> > leader >> > > > for >> > > > > a >> > > > > > > long >> > > > > > > > > > time >> > > > > > > > > > > even after this KIP. But after this KIP, the consumer >> > > > probably >> > > > > > > should >> > > > > > > > > not >> > > > > > > > > > > get OffetOutofRangeException and therefore will not >> cause >> > > > > offset >> > > > > > > > rewind >> > > > > > > > > > > issue. So the only problem is that consumer will not >> be >> > > able >> > > > to >> > > > > > > fetch >> > > > > > > > > > data >> > > > > > > > > > > until it has updated metadata. It seems that this >> > situation >> > > > can >> > > > > > > only >> > > > > > > > > > happen >> > > > > > > > > > > if the broker is too slow in processing >> > LeaderAndIsrRequest >> > > > > since >> > > > > > > > > > otherwise >> > > > > > > > > > > the consumer will be forced to update metadata due to >> > > > > > > > > > > NotLeaderForPartitionException. So the problem we are >> > > having >> > > > > > here >> > > > > > > is >> > > > > > > > > > that >> > > > > > > > > > > consumer will not be able to fetch data if some >> broker is >> > > too >> > > > > > slow >> > > > > > > in >> > > > > > > > > > > processing LeaderAndIsrRequest. >> > > > > > > > > > > >> > > > > > > > > > > Because Kafka propagates LeaderAndIsrRequest >> > asynchronously >> > > > to >> > > > > > all >> > > > > > > > > > brokers >> > > > > > > > > > > in the cluster, there will always be a period of time >> > when >> > > > > > consumer >> > > > > > > > can >> > > > > > > > > > not >> > > > > > > > > > > fetch data for the partition during the leadership >> > change. >> > > > Thus >> > > > > > it >> > > > > > > > > seems >> > > > > > > > > > > more like a broker-side performance issue instead of >> > > > > client-side >> > > > > > > > > > > correctness issue. My gut feel is that it is not >> causing >> > a >> > > > > much a >> > > > > > > > > problem >> > > > > > > > > > > as the problem to be fixed in this KIP. And if we >> were to >> > > > > address >> > > > > > > it, >> > > > > > > > > we >> > > > > > > > > > > probably need to make change in the broker side, e.g. >> > with >> > > > > > > > prioritized >> > > > > > > > > > > queue for controller-related requests, which may be >> kind >> > of >> > > > > > > > orthogonal >> > > > > > > > > to >> > > > > > > > > > > this KIP. I am not very sure it will be easier to >> address >> > > it >> > > > > with >> > > > > > > the >> > > > > > > > > > > change in this KIP. Do you have any recommendation? >> > > > > > > > > > > >> > > > > > > > > > > Thanks, >> > > > > > > > > > > Dong >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On Mon, Dec 11, 2017 at 1:51 PM, Jun Rao < >> > j...@confluent.io >> > > > >> > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hi, Dong, >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks for the reply. >> > > > > > > > > > > > >> > > > > > > > > > > > My suggestion of forcing the metadata refresh from >> the >> > > > > > controller >> > > > > > > > may >> > > > > > > > > > not >> > > > > > > > > > > > work in general since the cached controller could be >> > > > outdated >> > > > > > > too. >> > > > > > > > > The >> > > > > > > > > > > > general problem is that if a consumer's metadata is >> > > > outdated, >> > > > > > it >> > > > > > > > may >> > > > > > > > > > get >> > > > > > > > > > > > stuck with the old leader for a long time. We can >> > address >> > > > the >> > > > > > > issue >> > > > > > > > > of >> > > > > > > > > > > > detecting outdated metadata in a separate KIP in the >> > > future >> > > > > if >> > > > > > > you >> > > > > > > > > > didn't >> > > > > > > > > > > > intend to address it in this KIP. >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks, >> > > > > > > > > > > > >> > > > > > > > > > > > Jun >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > On Sat, Dec 9, 2017 at 10:12 PM, Dong Lin < >> > > > > lindon...@gmail.com >> > > > > > > >> > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Hey Jun, >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks much for your comments. Given that client >> > needs >> > > to >> > > > > > > > > > de-serialize >> > > > > > > > > > > > the >> > > > > > > > > > > > > metadata anyway, the extra overhead of checking >> the >> > > > > > > per-partition >> > > > > > > > > > > version >> > > > > > > > > > > > > for every partition should not be a big concern. >> Thus >> > > it >> > > > > > makes >> > > > > > > > > sense >> > > > > > > > > > to >> > > > > > > > > > > > use >> > > > > > > > > > > > > leader epoch as the per-partition version instead >> of >> > > > > > creating a >> > > > > > > > > > global >> > > > > > > > > > > > > metadata version. I will update the KIP to do >> that. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Regarding the detection of outdated metadata, I >> think >> > > it >> > > > is >> > > > > > > > > possible >> > > > > > > > > > to >> > > > > > > > > > > > > ensure that client gets latest metadata by >> fetching >> > > from >> > > > > > > > > controller. >> > > > > > > > > > > Note >> > > > > > > > > > > > > that this requires extra logic in the controller >> such >> > > > that >> > > > > > > > > controller >> > > > > > > > > > > > > updates metadata directly in memory without >> requiring >> > > > > > > > > > > > > UpdateMetadataRequest. But I am not sure the main >> > > > > motivation >> > > > > > of >> > > > > > > > > this >> > > > > > > > > > at >> > > > > > > > > > > > > this moment. But this makes controller more like a >> > > > > bottleneck >> > > > > > > in >> > > > > > > > > the >> > > > > > > > > > > > > cluster which we probably want to avoid. >> > > > > > > > > > > > > >> > > > > > > > > > > > > I think we can probably keep the current way of >> > > ensuring >> > > > > > > metadata >> > > > > > > > > > > > > freshness. Currently client will be forced to >> refresh >> > > > > > metadata >> > > > > > > if >> > > > > > > > > > > broker >> > > > > > > > > > > > > returns error (e.g. NotLeaderForPartition) due to >> > > > outdated >> > > > > > > > metadata >> > > > > > > > > > or >> > > > > > > > > > > if >> > > > > > > > > > > > > the metadata does not contain the partition that >> the >> > > > client >> > > > > > > > needs. >> > > > > > > > > In >> > > > > > > > > > > the >> > > > > > > > > > > > > future, as you previously suggested, we can >> include >> > > > > > > per-partition >> > > > > > > > > > > > > leaderEpoch in the FetchRequest/ProduceRequest >> such >> > > that >> > > > > > broker >> > > > > > > > can >> > > > > > > > > > > > return >> > > > > > > > > > > > > error if the epoch is smaller than cached epoch in >> > the >> > > > > > broker. >> > > > > > > > > Given >> > > > > > > > > > > that >> > > > > > > > > > > > > this adds more complexity to Kafka, I think we can >> > > > probably >> > > > > > > think >> > > > > > > > > > about >> > > > > > > > > > > > > that leader when we have a specific use-case or >> > problem >> > > > to >> > > > > > > solve >> > > > > > > > > with >> > > > > > > > > > > > > up-to-date metadata. Does this sound OK? >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > Dong >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Fri, Dec 8, 2017 at 3:53 PM, Jun Rao < >> > > > j...@confluent.io> >> > > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi, Dong, >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Thanks for the reply. A few more points below. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > For dealing with how to prevent a consumer >> > switching >> > > > > from a >> > > > > > > new >> > > > > > > > > > > leader >> > > > > > > > > > > > to >> > > > > > > > > > > > > > an old leader, you suggestion that refreshes >> > metadata >> > > > on >> > > > > > > > consumer >> > > > > > > > > > > > restart >> > > > > > > > > > > > > > until it sees a metadata version >= the one >> > > associated >> > > > > with >> > > > > > > the >> > > > > > > > > > > offset >> > > > > > > > > > > > > > works too, as long as we guarantee that the >> cached >> > > > > metadata >> > > > > > > > > > versions >> > > > > > > > > > > on >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > brokers only go up. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The second discussion point is on whether the >> > > metadata >> > > > > > > > versioning >> > > > > > > > > > > > should >> > > > > > > > > > > > > be >> > > > > > > > > > > > > > per partition or global. For the partition level >> > > > > > versioning, >> > > > > > > > you >> > > > > > > > > > were >> > > > > > > > > > > > > > concerned about the performance. Given that >> > metadata >> > > > > > updates >> > > > > > > > are >> > > > > > > > > > > rare, >> > > > > > > > > > > > I >> > > > > > > > > > > > > am >> > > > > > > > > > > > > > not sure if it's a big concern though. Doing a >> > > million >> > > > if >> > > > > > > tests >> > > > > > > > > is >> > > > > > > > > > > > > probably >> > > > > > > > > > > > > > going to take less than 1ms. Another thing is >> that >> > > the >> > > > > > > metadata >> > > > > > > > > > > version >> > > > > > > > > > > > > > seems to need to survive controller failover. In >> > your >> > > > > > current >> > > > > > > > > > > > approach, a >> > > > > > > > > > > > > > consumer may not be able to wait on the right >> > version >> > > > of >> > > > > > the >> > > > > > > > > > metadata >> > > > > > > > > > > > > after >> > > > > > > > > > > > > > the consumer restart since the metadata version >> may >> > > > have >> > > > > > been >> > > > > > > > > > > recycled >> > > > > > > > > > > > on >> > > > > > > > > > > > > > the server side due to a controller failover >> while >> > > the >> > > > > > > consumer >> > > > > > > > > is >> > > > > > > > > > > > down. >> > > > > > > > > > > > > > The partition level leaderEpoch survives >> controller >> > > > > failure >> > > > > > > and >> > > > > > > > > > won't >> > > > > > > > > > > > > have >> > > > > > > > > > > > > > this issue. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Lastly, neither your proposal nor mine addresses >> > the >> > > > > issue >> > > > > > > how >> > > > > > > > to >> > > > > > > > > > > > > guarantee >> > > > > > > > > > > > > > a consumer to detect that is metadata is >> outdated. >> > > > > > Currently, >> > > > > > > > the >> > > > > > > > > > > > > consumer >> > > > > > > > > > > > > > is not guaranteed to fetch metadata from every >> > broker >> > > > > > within >> > > > > > > > some >> > > > > > > > > > > > bounded >> > > > > > > > > > > > > > period of time. Maybe this is out of the scope >> of >> > > your >> > > > > KIP. >> > > > > > > But >> > > > > > > > > one >> > > > > > > > > > > > idea >> > > > > > > > > > > > > is >> > > > > > > > > > > > > > force the consumer to refresh metadata from the >> > > > > controller >> > > > > > > > > > > > periodically. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Jun >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Thu, Dec 7, 2017 at 11:25 AM, Dong Lin < >> > > > > > > lindon...@gmail.com >> > > > > > > > > >> > > > > > > > > > > wrote: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Hey Jun, >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Thanks much for the comments. Great point >> > > > particularly >> > > > > > > > > regarding >> > > > > > > > > > > > (3). I >> > > > > > > > > > > > > > > haven't thought about this before. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > It seems that there are two possible ways >> where >> > the >> > > > > > version >> > > > > > > > > > number >> > > > > > > > > > > > can >> > > > > > > > > > > > > be >> > > > > > > > > > > > > > > used. One solution is for client to check the >> > > version >> > > > > > > number >> > > > > > > > at >> > > > > > > > > > the >> > > > > > > > > > > > > time >> > > > > > > > > > > > > > it >> > > > > > > > > > > > > > > receives MetadataResponse. And if the version >> > > number >> > > > in >> > > > > > the >> > > > > > > > > > > > > > > MetadataResponse is smaller than the version >> > number >> > > > in >> > > > > > the >> > > > > > > > > > client's >> > > > > > > > > > > > > > cache, >> > > > > > > > > > > > > > > the client will be forced to fetch metadata >> > again. >> > > > > > Another >> > > > > > > > > > > solution, >> > > > > > > > > > > > > as >> > > > > > > > > > > > > > > you have suggested, is for broker to check the >> > > > version >> > > > > > > number >> > > > > > > > > at >> > > > > > > > > > > the >> > > > > > > > > > > > > time >> > > > > > > > > > > > > > > it receives a request from client. The broker >> > will >> > > > > reject >> > > > > > > the >> > > > > > > > > > > request >> > > > > > > > > > > > > if >> > > > > > > > > > > > > > > the version is smaller than the version in >> > broker's >> > > > > > cache. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > I am not very sure that the second solution >> can >> > > > address >> > > > > > the >> > > > > > > > > > problem >> > > > > > > > > > > > > here. >> > > > > > > > > > > > > > > In the scenario described in the JIRA ticket, >> > > > broker's >> > > > > > > cache >> > > > > > > > > may >> > > > > > > > > > be >> > > > > > > > > > > > > > > outdated because it has not processed the >> > > > > > > LeaderAndIsrRequest >> > > > > > > > > > from >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > controller. Thus it may still process client's >> > > > request >> > > > > > even >> > > > > > > > if >> > > > > > > > > > the >> > > > > > > > > > > > > > version >> > > > > > > > > > > > > > > in client's request is actually outdated. Does >> > this >> > > > > make >> > > > > > > > sense? >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > IMO, it seems that we can address problem (3) >> by >> > > > saving >> > > > > > the >> > > > > > > > > > > metadata >> > > > > > > > > > > > > > > version together with the offset. After >> consumer >> > > > > starts, >> > > > > > it >> > > > > > > > > will >> > > > > > > > > > > keep >> > > > > > > > > > > > > > > fetching metadata until the metadata version >> >= >> > the >> > > > > > version >> > > > > > > > > saved >> > > > > > > > > > > > with >> > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > offset of this partition. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Regarding problems (1) and (2): Currently we >> use >> > > the >> > > > > > > version >> > > > > > > > > > number >> > > > > > > > > > > > in >> > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > MetadataResponse to ensure that the metadata >> does >> > > not >> > > > > go >> > > > > > > back >> > > > > > > > > in >> > > > > > > > > > > > time. >> > > > > > > > > > > > > > > There are two alternative solutions to address >> > > > problems >> > > > > > (1) >> > > > > > > > and >> > > > > > > > > > > (2). >> > > > > > > > > > > > > One >> > > > > > > > > > > > > > > solution is for client to enumerate all >> > partitions >> > > in >> > > > > the >> > > > > > > > > > > > > > MetadataResponse, >> > > > > > > > > > > > > > > compare their epoch with those in the cached >> > > > metadata, >> > > > > > and >> > > > > > > > > > rejects >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > MetadataResponse iff any leader epoch is >> smaller. >> > > The >> > > > > > main >> > > > > > > > > > concern >> > > > > > > > > > > is >> > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > MetadataResponse currently cached information >> of >> > > all >> > > > > > > > partitions >> > > > > > > > > > in >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > entire cluster. It may slow down client's >> > > performance >> > > > > if >> > > > > > we >> > > > > > > > > were >> > > > > > > > > > to >> > > > > > > > > > > > do >> > > > > > > > > > > > > > it. >> > > > > > > > > > > > > > > The other solution is for client to enumerate >> > > > > partitions >> > > > > > > for >> > > > > > > > > only >> > > > > > > > > > > > > topics >> > > > > > > > > > > > > > > registered in the org.apache.kafka.clients. >> > > Metadata, >> > > > > > which >> > > > > > > > > will >> > > > > > > > > > be >> > > > > > > > > > > > an >> > > > > > > > > > > > > > > empty >> > > > > > > > > > > > > > > set for producer and the set of subscribed >> > > partitions >> > > > > for >> > > > > > > > > > consumer. >> > > > > > > > > > > > But >> > > > > > > > > > > > > > > this degrades to all topics if consumer >> > subscribes >> > > to >> > > > > > > topics >> > > > > > > > in >> > > > > > > > > > the >> > > > > > > > > > > > > > cluster >> > > > > > > > > > > > > > > by pattern. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Note that client will only be forced to update >> > > > metadata >> > > > > > if >> > > > > > > > the >> > > > > > > > > > > > version >> > > > > > > > > > > > > in >> > > > > > > > > > > > > > > the MetadataResponse is smaller than the >> version >> > in >> > > > the >> > > > > > > > cached >> > > > > > > > > > > > > metadata. >> > > > > > > > > > > > > > In >> > > > > > > > > > > > > > > general it should not be a problem. It can be >> a >> > > > problem >> > > > > > > only >> > > > > > > > if >> > > > > > > > > > > some >> > > > > > > > > > > > > > broker >> > > > > > > > > > > > > > > is particularly slower than other brokers in >> > > > processing >> > > > > > > > > > > > > > > UpdateMetadataRequest. When this is the case, >> it >> > > > means >> > > > > > that >> > > > > > > > the >> > > > > > > > > > > > broker >> > > > > > > > > > > > > is >> > > > > > > > > > > > > > > also particularly slower in processing >> > > > > > LeaderAndIsrRequest, >> > > > > > > > > which >> > > > > > > > > > > can >> > > > > > > > > > > > > > cause >> > > > > > > > > > > > > > > problem anyway because some partition will >> > probably >> > > > > have >> > > > > > no >> > > > > > > > > > leader >> > > > > > > > > > > > > during >> > > > > > > > > > > > > > > this period. I am not sure problems (1) and >> (2) >> > > cause >> > > > > > more >> > > > > > > > > > problem >> > > > > > > > > > > > than >> > > > > > > > > > > > > > > what we already have. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > > Dong >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Wed, Dec 6, 2017 at 6:42 PM, Jun Rao < >> > > > > > j...@confluent.io> >> > > > > > > > > > wrote: >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Hi, Dong, >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Great finding on the issue. It's a real >> > problem. >> > > A >> > > > > few >> > > > > > > > > comments >> > > > > > > > > > > > about >> > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > KIP. (1) I am not sure about updating >> > > > > > > > > controller_metadata_epoch >> > > > > > > > > > > on >> > > > > > > > > > > > > > every >> > > > > > > > > > > > > > > > UpdateMetadataRequest. Currently, the >> > controller >> > > > can >> > > > > > send >> > > > > > > > > > > > > > > > UpdateMetadataRequest when there is no >> actual >> > > > > metadata >> > > > > > > > > change. >> > > > > > > > > > > > Doing >> > > > > > > > > > > > > > this >> > > > > > > > > > > > > > > > may require unnecessary metadata refresh on >> the >> > > > > client. >> > > > > > > (2) >> > > > > > > > > > > > > > > > controller_metadata_epoch is global across >> all >> > > > > topics. >> > > > > > > This >> > > > > > > > > > means >> > > > > > > > > > > > > that >> > > > > > > > > > > > > > a >> > > > > > > > > > > > > > > > client may be forced to update its metadata >> > even >> > > > when >> > > > > > the >> > > > > > > > > > > metadata >> > > > > > > > > > > > > for >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > topics that it cares haven't changed. (3) It >> > > > doesn't >> > > > > > seem >> > > > > > > > > that >> > > > > > > > > > > the >> > > > > > > > > > > > > KIP >> > > > > > > > > > > > > > > > handles the corner case when a consumer is >> > > > restarted. >> > > > > > > Say a >> > > > > > > > > > > > consumer >> > > > > > > > > > > > > > > reads >> > > > > > > > > > > > > > > > from the new leader, commits the offset and >> > then >> > > is >> > > > > > > > > restarted. >> > > > > > > > > > On >> > > > > > > > > > > > > > > restart, >> > > > > > > > > > > > > > > > the consumer gets an outdated metadata and >> > > fetches >> > > > > from >> > > > > > > the >> > > > > > > > > old >> > > > > > > > > > > > > leader. >> > > > > > > > > > > > > > > > Then, the consumer will get into the offset >> out >> > > of >> > > > > > range >> > > > > > > > > issue. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Given the above, I am thinking of the >> following >> > > > > > approach. >> > > > > > > > We >> > > > > > > > > > > > actually >> > > > > > > > > > > > > > > > already have metadata versioning at the >> > partition >> > > > > > level. >> > > > > > > > Each >> > > > > > > > > > > > leader >> > > > > > > > > > > > > > has >> > > > > > > > > > > > > > > a >> > > > > > > > > > > > > > > > leader epoch which is monotonically >> increasing. >> > > We >> > > > > can >> > > > > > > > > > > potentially >> > > > > > > > > > > > > > > > propagate leader epoch back in the metadata >> > > > response >> > > > > > and >> > > > > > > > the >> > > > > > > > > > > > clients >> > > > > > > > > > > > > > can >> > > > > > > > > > > > > > > > cache that. This solves the issue of (1) and >> > (2). >> > > > To >> > > > > > > solve >> > > > > > > > > (3), >> > > > > > > > > > > > when >> > > > > > > > > > > > > > > saving >> > > > > > > > > > > > > > > > an offset, we could save both an offset and >> the >> > > > > > > > corresponding >> > > > > > > > > > > > leader >> > > > > > > > > > > > > > > epoch. >> > > > > > > > > > > > > > > > When fetching the data, the consumer >> provides >> > > both >> > > > > the >> > > > > > > > offset >> > > > > > > > > > and >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > > leader epoch. A leader will only serve the >> > > request >> > > > if >> > > > > > its >> > > > > > > > > > leader >> > > > > > > > > > > > > epoch >> > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > equal to or greater than the leader epoch >> from >> > > the >> > > > > > > > consumer. >> > > > > > > > > To >> > > > > > > > > > > > > achieve >> > > > > > > > > > > > > > > > this, we need to change the fetch request >> > > protocol >> > > > > and >> > > > > > > the >> > > > > > > > > > offset >> > > > > > > > > > > > > > commit >> > > > > > > > > > > > > > > > api, which requires some more thoughts. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Jun >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > On Wed, Dec 6, 2017 at 10:57 AM, Dong Lin < >> > > > > > > > > lindon...@gmail.com >> > > > > > > > > > > >> > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Bump up the thread. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > It will be great to have more comments on >> > > whether >> > > > > we >> > > > > > > > should >> > > > > > > > > > do >> > > > > > > > > > > it >> > > > > > > > > > > > > or >> > > > > > > > > > > > > > > > > whether there is better way to address the >> > > > > motivation >> > > > > > > of >> > > > > > > > > this >> > > > > > > > > > > > KIP. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > On Mon, Dec 4, 2017 at 3:09 PM, Dong Lin < >> > > > > > > > > > lindon...@gmail.com> >> > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > I don't have an interesting rejected >> > > > alternative >> > > > > > > > solution >> > > > > > > > > > to >> > > > > > > > > > > > put >> > > > > > > > > > > > > in >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > KIP. If there is good alternative >> solution >> > > from >> > > > > > > anyone >> > > > > > > > in >> > > > > > > > > > > this >> > > > > > > > > > > > > > > thread, >> > > > > > > > > > > > > > > > I >> > > > > > > > > > > > > > > > > am >> > > > > > > > > > > > > > > > > > happy to discuss this and update the KIP >> > > > > > accordingly. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > > > > > Dong >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > On Mon, Dec 4, 2017 at 1:12 PM, Ted Yu < >> > > > > > > > > > yuzhih...@gmail.com> >> > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> It is clearer now. >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > >> I noticed that Rejected Alternatives >> > section >> > > > is >> > > > > > > empty. >> > > > > > > > > > > > > > > > > >> Have you considered any alternative ? >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > >> Cheers >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > >> On Mon, Dec 4, 2017 at 1:07 PM, Dong >> Lin < >> > > > > > > > > > > lindon...@gmail.com >> > > > > > > > > > > > > >> > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > >> > Ted, thanks for catching this. I have >> > > > updated >> > > > > > the >> > > > > > > > > > sentence >> > > > > > > > > > > > to >> > > > > > > > > > > > > > make >> > > > > > > > > > > > > > > > it >> > > > > > > > > > > > > > > > > >> > readable. >> > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > >> > Thanks, >> > > > > > > > > > > > > > > > > >> > Dong >> > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > >> > On Sat, Dec 2, 2017 at 3:05 PM, Ted >> Yu < >> > > > > > > > > > > yuzhih...@gmail.com >> > > > > > > > > > > > > >> > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > >> > > bq. It the controller_epoch of the >> > > > incoming >> > > > > > > > > > > > > MetadataResponse, >> > > > > > > > > > > > > > or >> > > > > > > > > > > > > > > > if >> > > > > > > > > > > > > > > > > >> the >> > > > > > > > > > > > > > > > > >> > > controller_epoch is the same but >> the >> > > > > > > > > > > > > controller_metadata_epoch >> > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > >> > > Can you update the above sentence >> so >> > > that >> > > > > the >> > > > > > > > > > intention >> > > > > > > > > > > is >> > > > > > > > > > > > > > > > clearer ? >> > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > >> > > Thanks >> > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > >> > > On Fri, Dec 1, 2017 at 6:33 PM, >> Dong >> > > Lin < >> > > > > > > > > > > > > lindon...@gmail.com >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > >> > > > Hi all, >> > > > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > > > > >> > > > I have created KIP-232: Detect >> > > outdated >> > > > > > > metadata >> > > > > > > > > by >> > > > > > > > > > > > adding >> > > > > > > > > > > > > > > > > >> > > > ControllerMetadataEpoch field: >> > > > > > > > > > > > > > > > > >> > > > https://cwiki.apache.org/ >> > > > > > > > > > > confluence/display/KAFKA/KIP- >> > > > > > > > > > > > > > > > > >> > > > 232%3A+Detect+outdated+ >> > > > > metadata+by+adding+ >> > > > > > > > > > > > > > > > > >> > ControllerMetadataEpoch+field >> > > > > > > > > > > > > > > > > >> > > > . >> > > > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > > > > >> > > > The KIP proposes to add fields in >> > > > > > > > MetadataResponse >> > > > > > > > > > and >> > > > > > > > > > > > > > > > > >> > > > UpdateMetadataRequest so that >> client >> > > can >> > > > > > > reject >> > > > > > > > > > > outdated >> > > > > > > > > > > > > > > > metadata >> > > > > > > > > > > > > > > > > >> and >> > > > > > > > > > > > > > > > > >> > > avoid >> > > > > > > > > > > > > > > > > >> > > > unnecessary >> > OffsetOutOfRangeException. >> > > > > > > Otherwise >> > > > > > > > > > there >> > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > currently >> > > > > > > > > > > > > > > > > >> > race >> > > > > > > > > > > > > > > > > >> > > > condition that can cause >> consumer to >> > > > reset >> > > > > > > > offset >> > > > > > > > > > > which >> > > > > > > > > > > > > > > > negatively >> > > > > > > > > > > > > > > > > >> > affect >> > > > > > > > > > > > > > > > > >> > > > the consumer's availability. >> > > > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > > > > >> > > > Feedback and suggestions are >> > welcome! >> > > > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > > > > >> > > > Regards, >> > > > > > > > > > > > > > > > > >> > > > Dong >> > > > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >