Hi Jason, The proposed API seems reasonable to me too. Could you please also update the wiki page ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation) with a section say "workflow" on how the proposed API will be co-used with others to:
1. consumer callers handling a LogTruncationException. 2. consumer internals for handling a retriable UnknownLeaderEpochException. Guozhang On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner <a...@confluent.io> wrote: > Hi Jason, > > > I also like your proposal and agree that KafkaConsumer#seekToCommitted() > is > more intuitive as a way to initialize both consumer's position and its > fetch state. > > > My understanding that KafkaConsumer#seekToCommitted() is purely for > clients > who store their offsets externally, right? And we are still going to > add KafkaConsumer#findOffsets() > in this KIP as we discussed, so that the client can handle > LogTruncationException? > > > Thanks, > > Anna > > > On Thu, Jul 12, 2018 at 3:57 PM Dong Lin <lindon...@gmail.com> wrote: > > > Hey Jason, > > > > It is a great summary. The solution sounds good. I might have minor > > comments regarding the method name. But we can discuss that minor points > > later after we reach consensus on the high level API. > > > > Thanks, > > Dong > > > > > > On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hey Anna and Dong, > > > > > > Thanks a lot for the great discussion. I've been hanging back a bit > > because > > > honestly the best option hasn't seemed clear. I agree with Anna's > general > > > observation that there is a distinction between the position of the > > > consumer and its fetch state up to that position. If you think about > it, > > a > > > committed offset actually represents both of these. The metadata is > used > > to > > > initialize the state of the consumer application and the offset > > initializes > > > the position. Additionally, we are extending the offset commit in this > > KIP > > > to also include the last epoch fetched by the consumer, which is used > to > > > initialize the internal fetch state. Of course if you do an arbitrary > > > `seek` and immediately commit offsets, then there won't be a last epoch > > to > > > commit. This seems intuitive since there is no fetch state in this > case. > > We > > > only commit fetch state when we have it. > > > > > > So if we think about a committed offset as initializing both the > > consumer's > > > position and its fetch state, then the gap in the API is evidently that > > we > > > don't have a way to initialize the consumer to a committed offset. We > do > > it > > > implicitly of course for offsets stored in Kafka, but since external > > > storage is a use case we support, then we should have an explicit API > as > > > well. Perhaps something like this: > > > > > > seekToCommitted(TopicPartition, OffsetAndMetadata) > > > > > > In this KIP, we are proposing to allow the `OffsetAndMetadata` object > to > > > include the leader epoch, so I think this would have the same effect as > > > Anna's suggested `seekToRecord`. But perhaps it is a more natural fit > > given > > > the current API? Furthermore, if we find a need for additional metadata > > in > > > the offset commit API in the future, then we will just need to modify > the > > > `OffsetAndMetadata` object and we will not need a new `seek` API. > > > > > > With this approach, I think then we can leave the `position` API as it > > is. > > > The position of the consumer is still just the next expected fetch > > offset. > > > If a user needs to record additional state based on previous fetch > > > progress, then they would use the result of the previous fetch to > obtain > > > it. This makes the dependence on fetch progress explicit. I think we > > could > > > make this a little more convenience with a helper in the > > `ConsumerRecords` > > > object, but I think that's more of a nice-to-have. > > > > > > Thoughts? > > > > > > By the way, I have been iterating a little bit on the replica side of > > this > > > KIP. My initial proposal in fact did not have strong enough fencing to > > > protect all of the edge cases. I believe the current proposal fixes the > > > problems, but I am still verifying the model. > > > > > > Thanks, > > > Jason > > > > > > > > > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > Hey Anna, > > > > > > > > Thanks much for the explanation. Approach 1 also sounds good to me. I > > > think > > > > findOffsets() is useful for users who don't use automatic offset > reset > > > > policy. > > > > > > > > Just one more question. Since users who store offsets externally need > > to > > > > provide leaderEpoch to findOffsets(...), do we need an extra API for > > user > > > > to get both offset and leaderEpoch, e.g. recordPosition()? > > > > > > > > Thanks, > > > > Dong > > > > > > > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner <a...@confluent.io> > > > wrote: > > > > > > > > > Hi Dong, > > > > > > > > > > > > > > > What I called “not covering all use cases” is what you call > > best-effort > > > > > (not guaranteeing some corner cases). I think we are on the same > page > > > > here. > > > > > > > > > > > > > > > I wanted to be clear in the API whether the consumer seeks to a > > > position > > > > > (offset) or to a record (offset, leader epoch). The only use-case > of > > > > > seeking to a record is seeking to a committed offset for a user who > > > > stores > > > > > committed offsets externally. (Unless users find some other reason > to > > > > seek > > > > > to a record.) I thought it was possible to provide this > functionality > > > > with > > > > > findOffset(offset, leader epoch) followed by a seek(offset). > However, > > > you > > > > > are right that this will not handle the race condition where > > > > non-divergent > > > > > offset found by findOffset() could change again before the consumer > > > does > > > > > the first fetch. > > > > > > > > > > > > > > > Regarding position() — if we add position that returns (offset, > > leader > > > > > epoch), this is specifically a position after a record that was > > > actually > > > > > consumed or position of a committed record. In which case, I still > > > think > > > > > it’s cleaner to get a record position of consumed message from a > new > > > > helper > > > > > method in ConsumerRecords() or from committed offsets. > > > > > > > > > > > > > > > I think all the use-cases could be then covered with: > > > > > > > > > > (Approach 1) > > > > > > > > > > seekToRecord(offset, leaderEpoch) — this will just initialize/set > the > > > > > consumer state; > > > > > > > > > > findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch} > > > > > > > > > > > > > > > If we agree that the race condition is also a corner case, then I > > think > > > > we > > > > > can cover use-cases with: > > > > > > > > > > (Approach 2) > > > > > > > > > > findOffsets(offset, leaderEpoch) returns offset — we still want > > leader > > > > > epoch as a parameter for the users who store their committed > offsets > > > > > externally. > > > > > > > > > > > > > > > I am actually now leaning more to approach 1, since it is more > > > explicit, > > > > > and maybe there are more use cases for it. > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Anna > > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 3:47 PM Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > > > Hey Anna, > > > > > > > > > > > > Thanks for the comment. To answer your question, it seems that we > > can > > > > > cover > > > > > > all case in this KIP. As stated in "Consumer Handling" section, > > > KIP-101 > > > > > > based approach will be used to derive the truncation offset from > > the > > > > > > 2-tuple (offset, leaderEpoch). This approach is best effort and > it > > is > > > > > > inaccurate only in very rare scenarios (as described in KIP-279). > > > > > > > > > > > > By using seek(offset, leaderEpoch), consumer will still be able > to > > > > follow > > > > > > this best-effort approach to detect log truncation and determine > > the > > > > > > truncation offset. On the other hand, if we use seek(offset), > > > consumer > > > > > will > > > > > > not detect log truncation in some cases which weakens the > guarantee > > > of > > > > > this > > > > > > KIP. Does this make sense? > > > > > > > > > > > > Thanks, > > > > > > Dong > > > > > > > > > > > > On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner <a...@confluent.io > > > > > > wrote: > > > > > > > > > > > > > Sorry, I hit "send" before finishing. Continuing... > > > > > > > > > > > > > > > > > > > > > 2) Hiding most of the consumer handling log truncation logic > with > > > > > minimal > > > > > > > exposure in KafkaConsumer API. I was proposing this path. > > > > > > > > > > > > > > > > > > > > > Before answering your specific questions… I want to answer to > > your > > > > > > comment > > > > > > > “In general, maybe we should discuss the final solution that > > covers > > > > all > > > > > > > cases?”. With current KIP, we don’t cover all cases of consumer > > > > > detecting > > > > > > > log truncation because the KIP proposes a leader epoch cache in > > > > > consumer > > > > > > > that does not persist across restarts. Plus, we only store last > > > > > committed > > > > > > > offset (either internally or users can store externally). This > > has > > > a > > > > > > > limitation that the consumer will not always be able to find > > point > > > of > > > > > > > truncation just because we have a limited history (just one > data > > > > > point). > > > > > > > > > > > > > > > > > > > > > So, maybe we should first agree on whether we accept that > storing > > > > last > > > > > > > committed offset/leader epoch has a limitation that the > consumer > > > will > > > > > not > > > > > > > be able to detect log truncation in all cases? > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Anna > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner < > a...@confluent.io> > > > > > wrote: > > > > > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > > > > > Thanks for the follow up! I finally have much more clear > > > > > understanding > > > > > > of > > > > > > > > where you are coming from. > > > > > > > > > > > > > > > > You are right. The success of findOffsets()/finding a point > of > > > > > > > > non-divergence depends on whether we have enough entries in > the > > > > > > > consumer's > > > > > > > > leader epoch cache. However, I think this is a fundamental > > > > limitation > > > > > > of > > > > > > > > having a leader epoch cache that does not persist across > > consumer > > > > > > > restarts. > > > > > > > > > > > > > > > > If we consider the general case where consumer may or may not > > > have > > > > > this > > > > > > > > cache, then I see two paths: > > > > > > > > 1) Letting the user to track the leader epoch history > > externally, > > > > and > > > > > > > have > > > > > > > > more exposure to leader epoch and finding point of > > non-divergence > > > > in > > > > > > > > KafkaConsumer API. I understand this is the case you were > > talking > > > > > > about. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin < > lindon...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Hey Anna, > > > > > > > >> > > > > > > > >> Thanks much for your detailed explanation and example! It > does > > > > help > > > > > me > > > > > > > >> understand the difference between our understanding. > > > > > > > >> > > > > > > > >> So it seems that the solution based on findOffsets() > currently > > > > > focuses > > > > > > > >> mainly on the scenario that consumer has cached leaderEpoch > -> > > > > > offset > > > > > > > >> mapping whereas I was thinking about the general case where > > > > consumer > > > > > > may > > > > > > > >> or > > > > > > > >> may not have this cache. I guess that is why we have > different > > > > > > > >> understanding here. I have some comments below. > > > > > > > >> > > > > > > > >> > > > > > > > >> 3) The proposed solution using findOffsets(offset, > > leaderEpoch) > > > > > > followed > > > > > > > >> by > > > > > > > >> seek(offset) works if consumer has the cached leaderEpoch -> > > > > offset > > > > > > > >> mapping. But if we assume consumer has this cache, do we > need > > to > > > > > have > > > > > > > >> leaderEpoch in the findOffsets(...)? Intuitively, the > > > > > > > findOffsets(offset) > > > > > > > >> can also derive the leaderEpoch using offset just like the > > > > proposed > > > > > > > >> solution does with seek(offset). > > > > > > > >> > > > > > > > >> > > > > > > > >> 4) If consumer does not have cached leaderEpoch -> offset > > > mapping, > > > > > > which > > > > > > > >> is > > > > > > > >> the case if consumer is restarted on a new machine, then it > is > > > not > > > > > > clear > > > > > > > >> what leaderEpoch would be included in the FetchRequest if > > > consumer > > > > > > does > > > > > > > >> seek(offset). This is the case that motivates the first > > question > > > > of > > > > > > the > > > > > > > >> previous email. In general, maybe we should discuss the > final > > > > > solution > > > > > > > >> that > > > > > > > >> covers all cases? > > > > > > > >> > > > > > > > >> > > > > > > > >> 5) The second question in my previous email is related to > the > > > > > > following > > > > > > > >> paragraph: > > > > > > > >> > > > > > > > >> "... In some cases, offsets returned from position() could > be > > > > actual > > > > > > > >> consumed messages by this consumer identified by {offset, > > leader > > > > > > epoch}. > > > > > > > >> In > > > > > > > >> other cases, position() returns offset that was not actually > > > > > consumed. > > > > > > > >> Suppose, the user calls position() for the last offset...". > > > > > > > >> > > > > > > > >> I guess my point is that, if user calls position() for the > > last > > > > > offset > > > > > > > and > > > > > > > >> uses that offset in seek(...), then user can probably just > > call > > > > > > > >> Consumer#seekToEnd() without calling position() and > seek(...). > > > > > > Similarly > > > > > > > >> user can call Consumer#seekToBeginning() to the seek to the > > > > earliest > > > > > > > >> position without calling position() and seek(...). Thus > > > position() > > > > > > only > > > > > > > >> needs to return the actual consumed messages identified by > > > > {offset, > > > > > > > leader > > > > > > > >> epoch}. Does this make sense? > > > > > > > >> > > > > > > > >> > > > > > > > >> Thanks, > > > > > > > >> Dong > > > > > > > >> > > > > > > > >> > > > > > > > >> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner < > > a...@confluent.io > > > > > > > > > > wrote: > > > > > > > >> > > > > > > > >> > Hi Dong, > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > Thanks for considering my suggestions. > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > Based on your comments, I realized that my suggestion was > > not > > > > > > complete > > > > > > > >> with > > > > > > > >> > regard to KafkaConsumer API vs. consumer-broker protocol. > > > While > > > > I > > > > > > > >> propose > > > > > > > >> > to keep KafkaConsumer#seek() unchanged and take offset > only, > > > the > > > > > > > >> underlying > > > > > > > >> > consumer will send the next FetchRequest() to broker with > > > offset > > > > > and > > > > > > > >> > leaderEpoch if it is known (based on leader epoch cache in > > > > > > consumer) — > > > > > > > >> note > > > > > > > >> > that this is different from the current KIP, which > suggests > > to > > > > > > always > > > > > > > >> send > > > > > > > >> > unknown leader epoch after seek(). This way, if the > consumer > > > > and a > > > > > > > >> broker > > > > > > > >> > agreed on the point of non-divergence, which is some > > {offset, > > > > > > > >> leaderEpoch} > > > > > > > >> > pair, the new leader which causes another truncation (even > > > > further > > > > > > > back) > > > > > > > >> > will be able to detect new divergence and restart the > > process > > > of > > > > > > > finding > > > > > > > >> > the new point of non-divergence. So, to answer your > > question, > > > If > > > > > the > > > > > > > >> > truncation happens just after the user calls > > > > > > > >> > KafkaConsumer#findOffsets(offset, leaderEpoch) followed > by > > > > > > > seek(offset), > > > > > > > >> > the user will not seek to the wrong position without > knowing > > > > that > > > > > > > >> > truncation has happened, because the consumer will get > > another > > > > > > > >> truncation > > > > > > > >> > error, and seek again. > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > I am afraid, I did not understand your second question. > Let > > me > > > > > > > >> summarize my > > > > > > > >> > suggestions again, and then give an example to hopefully > > make > > > my > > > > > > > >> > suggestions more clear. Also, the last part of my example > > > shows > > > > > how > > > > > > > the > > > > > > > >> > use-case in your first question will work. If it does not > > > answer > > > > > > your > > > > > > > >> > second question, would you mind clarifying? I am also > > focusing > > > > on > > > > > > the > > > > > > > >> case > > > > > > > >> > of a consumer having enough entries in the cache. The case > > of > > > > > > > restarting > > > > > > > >> > from committed offset either stored externally or > internally > > > > will > > > > > > > >> probably > > > > > > > >> > need to be discussed more. > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > Let me summarize my suggestion again: > > > > > > > >> > > > > > > > > >> > 1) KafkaConsumer#seek() and KafkaConsumer#position() > remains > > > > > > unchanged > > > > > > > >> > > > > > > > > >> > 2) New KafkaConsumer#findOffsets() takes {offset, > > leaderEpoch} > > > > > pair > > > > > > > per > > > > > > > >> > topic partition and returns offset per topic partition. > > > > > > > >> > > > > > > > > >> > 3) FetchRequest() to broker after KafkaConsumer#seek() > will > > > > > contain > > > > > > > the > > > > > > > >> > offset set by seek and leaderEpoch that corresponds to the > > > > offset > > > > > > > based > > > > > > > >> on > > > > > > > >> > leader epoch cache in the consumer. > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > The rest of this e-mail is a long and contrived example > with > > > > > several > > > > > > > log > > > > > > > >> > truncations and unclean leader elections to illustrate the > > API > > > > and > > > > > > > your > > > > > > > >> > first use-case. Suppose we have three brokers. Initially, > > > Broker > > > > > A, > > > > > > B, > > > > > > > >> and > > > > > > > >> > C has one message at offset 0 with leader epoch 0. Then, > > > Broker > > > > A > > > > > > goes > > > > > > > >> down > > > > > > > >> > for some time. Broker B becomes a leader with epoch 1, and > > > > writes > > > > > > > >> messages > > > > > > > >> > to offsets 1 and 2. Broker C fetches offset 1, but before > > > > fetching > > > > > > > >> offset > > > > > > > >> > 2, becomes a leader with leader epoch 2 and writes a > message > > > at > > > > > > offset > > > > > > > >> 2. > > > > > > > >> > Here is the state of brokers at this point: > > > > > > > >> > > > > > > > > >> > > Broker A: > > > > > > > >> > > offset 0, epoch 0 <— leader > > > > > > > >> > > goes down… > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > Broker B: > > > > > > > >> > > offset 0, epoch 0 > > > > > > > >> > > offset 1, epoch 1 <- leader > > > > > > > >> > > offset 2, epoch 1 > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > Broker C: > > > > > > > >> > > offset 0, epoch 0 > > > > > > > >> > > offset 1, epoch 1 > > > > > > > >> > > offset 2, epoch 2 <— leader > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > Before Broker C becomes a leader with leader epoch 2, the > > > > consumer > > > > > > > >> consumed > > > > > > > >> > the following messages from broker A and broker B: > > > > > > > >> > > > > > > > > >> > {offset=0, leaderEpoch=0}, {offset=1, leaderEpoch=1}, > > > {offset=2, > > > > > > > >> > leaderEpoch=1}. > > > > > > > >> > > > > > > > > >> > Consumer’s leader epoch cache at this point contains the > > > > following > > > > > > > >> entries: > > > > > > > >> > > > > > > > > >> > (leaderEpoch=0, startOffset=0) > > > > > > > >> > > > > > > > > >> > (leaderEpoch=1, startOffset=1) > > > > > > > >> > > > > > > > > >> > endOffset = 3 > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > Then, broker B becomes the follower of broker C, truncates > > and > > > > > > starts > > > > > > > >> > fetching from offset 2. > > > > > > > >> > > > > > > > > >> > Consumer sends fetchRequest(offset=3, leaderEpoch=1) and > > gets > > > > > > > >> > LOG_TRUNCATION > > > > > > > >> > error from broker C. > > > > > > > >> > > > > > > > > >> > In response, the client calls KafkaConsumer#findOffsets( > > > > offset=3, > > > > > > > >> > leaderEpoch=1). The underlying consumer sends > > > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker C responds > with > > > > > > > >> > {leaderEpoch=1, endOffset=2}. So, > > > > > > KafkaConsumer#findOffsets(offset=3, > > > > > > > >> > leaderEpoch=1) returns offset=2. > > > > > > > >> > > > > > > > > >> > In response, consumer calls KafkaConsumer@seek(offset=2) > > > > followed > > > > > > by > > > > > > > >> > poll(), which results in FetchRequest(offset=2, > > leaderEpoch=1) > > > > to > > > > > > > >> broker C. > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > I will continue with this example with the goal to answer > > your > > > > > first > > > > > > > >> > question about truncation just after findOffsets() > followed > > by > > > > > > seek(): > > > > > > > >> > > > > > > > > >> > Suppose, brokers B and C go down, and broker A comes up > and > > > > > becomes > > > > > > a > > > > > > > >> > leader with leader epoch 3, and writes a message to offset > > 1. > > > > > > Suppose, > > > > > > > >> this > > > > > > > >> > happens before the consumer gets response from broker C to > > the > > > > > > > previous > > > > > > > >> > fetch request: FetchRequest(offset=2, leaderEpoch=1). > > > > > > > >> > > > > > > > > >> > Consumer re-sends FetchRequest(offset=2, leaderEpoch=1) to > > > > broker > > > > > A, > > > > > > > >> which > > > > > > > >> > returns LOG_TRUNCATION error, because broker A has leader > > > epoch > > > > 3 > > > > > > > > > > > > > >> leader > > > > > > > >> > epoch in FetchRequest with starting offset = 1 < offset 2 > in > > > > > > > >> > FetchRequest(). > > > > > > > >> > > > > > > > > >> > In response, the user calls KafkaConsumer#findOffsets( > > > offset=2, > > > > > > > >> > leaderEpoch=1). The underlying consumer sends > > > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker A responds > with > > > > > > > >> > {leaderEpoch=0, endOffset=1}; the underlying consumer > finds > > > > > > > leaderEpoch > > > > > > > >> = 0 > > > > > > > >> > in its cache with end offset == 1, which results in > > > > > > > >> > KafkaConsumer#findOffsets(offset=2, leaderEpoch=1) > returning > > > > > offset > > > > > > > = 1. > > > > > > > >> > > > > > > > > >> > In response, the user calls KafkaConsumer@seek(offset=1) > > > > followed > > > > > > by > > > > > > > >> > poll(), which results in FetchRequest(offset=1, > > leaderEpoch=0) > > > > to > > > > > > > >> broker A, > > > > > > > >> > which responds with message at offset 1, leader epoch 3. > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > I will think some more about consumers restarting from > > > committed > > > > > > > >> offsets, > > > > > > > >> > and send a follow up. > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > Thanks, > > > > > > > >> > > > > > > > > >> > Anna > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > On Sat, Jul 7, 2018 at 1:36 AM Dong Lin < > > lindon...@gmail.com> > > > > > > wrote: > > > > > > > >> > > > > > > > > >> > > Hey Anna, > > > > > > > >> > > > > > > > > > >> > > Thanks much for the thoughtful reply. It makes sense to > > > > > different > > > > > > > >> between > > > > > > > >> > > "seeking to a message" and "seeking to a position". I > have > > > to > > > > > > > >> questions > > > > > > > >> > > here: > > > > > > > >> > > > > > > > > > >> > > - For "seeking to a message" use-case, with the proposed > > > > > approach > > > > > > > user > > > > > > > >> > > needs to call findOffset(offset, leaderEpoch) followed > by > > > > > > > >> seek(offset). > > > > > > > >> > If > > > > > > > >> > > message truncation and message append happen immediately > > > after > > > > > > > >> > > findOffset(offset, > > > > > > > >> > > leaderEpoch) but before seek(offset), it seems that user > > > will > > > > > seek > > > > > > > to > > > > > > > >> the > > > > > > > >> > > wrong message without knowing the truncation has > happened. > > > > Would > > > > > > > this > > > > > > > >> be > > > > > > > >> > a > > > > > > > >> > > problem? > > > > > > > >> > > > > > > > > > >> > > - For "seeking to a position" use-case, it seems that > > there > > > > can > > > > > be > > > > > > > two > > > > > > > >> > > positions, i.e. earliest and latest. So these two cases > > can > > > be > > > > > > > >> > > Consumer.fulfilled by seekToBeginning() and > > > > > Consumer.seekToEnd(). > > > > > > > >> Then it > > > > > > > >> > > seems that user will only need to call position() and > > seek() > > > > for > > > > > > > >> "seeking > > > > > > > >> > > to a message" use-case? > > > > > > > >> > > > > > > > > > >> > > Thanks, > > > > > > > >> > > Dong > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner < > > > > > a...@confluent.io> > > > > > > > >> wrote: > > > > > > > >> > > > > > > > > > >> > > > Hi Jason and Dong, > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > I’ve been thinking about your suggestions and > discussion > > > > > > regarding > > > > > > > >> > > > position(), seek(), and new proposed API. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > Here is my thought process why we should keep > position() > > > and > > > > > > > seek() > > > > > > > >> API > > > > > > > >> > > > unchanged. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > I think we should separate {offset, leader epoch} that > > > > > uniquely > > > > > > > >> > > identifies > > > > > > > >> > > > a message from an offset that is a position. In some > > > cases, > > > > > > > offsets > > > > > > > >> > > > returned from position() could be actual consumed > > messages > > > > by > > > > > > this > > > > > > > >> > > consumer > > > > > > > >> > > > identified by {offset, leader epoch}. In other cases, > > > > > position() > > > > > > > >> > returns > > > > > > > >> > > > offset that was not actually consumed. Suppose, the > user > > > > calls > > > > > > > >> > position() > > > > > > > >> > > > for the last offset. Suppose we return {offset, leader > > > > epoch} > > > > > of > > > > > > > the > > > > > > > >> > > > message currently in the log. Then, the message gets > > > > truncated > > > > > > > >> before > > > > > > > >> > > > consumer’s first poll(). It does not make sense for > > poll() > > > > to > > > > > > fail > > > > > > > >> in > > > > > > > >> > > this > > > > > > > >> > > > case, because the log truncation did not actually > happen > > > > from > > > > > > the > > > > > > > >> > > consumer > > > > > > > >> > > > perspective. On the other hand, as the KIP proposes, > it > > > > makes > > > > > > > sense > > > > > > > >> for > > > > > > > >> > > the > > > > > > > >> > > > committed() method to return {offset, leader epoch} > > > because > > > > > > those > > > > > > > >> > offsets > > > > > > > >> > > > represent actual consumed messages. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > The same argument applies to the seek() method — we > are > > > not > > > > > > > seeking > > > > > > > >> to > > > > > > > >> > a > > > > > > > >> > > > message, we are seeking to a position. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > I like the proposal to add KafkaConsumer#findOffsets() > > > API. > > > > I > > > > > am > > > > > > > >> > assuming > > > > > > > >> > > > something like: > > > > > > > >> > > > > > > > > > > >> > > > Map<TopicPartition, Long> > > findOffsets(Map<TopicPartition, > > > > > > > >> > OffsetAndEpoch> > > > > > > > >> > > > offsetsToSearch) > > > > > > > >> > > > > > > > > > > >> > > > Similar to seek() and position(), I think > findOffsets() > > > > should > > > > > > > >> return > > > > > > > >> > > > offset without leader epoch, because what we want is > the > > > > > offset > > > > > > > >> that we > > > > > > > >> > > > think is closest to the not divergent message from the > > > given > > > > > > > >> consumed > > > > > > > >> > > > message. Until the consumer actually fetches the > > message, > > > we > > > > > > > should > > > > > > > >> not > > > > > > > >> > > let > > > > > > > >> > > > the consumer store the leader epoch for a message it > did > > > not > > > > > > > >> consume. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > So, the workflow will be: > > > > > > > >> > > > > > > > > > > >> > > > 1) The user gets LogTruncationException with {offset, > > > leader > > > > > > epoch > > > > > > > >> of > > > > > > > >> > the > > > > > > > >> > > > previous message} (whatever we send with new > > FetchRecords > > > > > > > request). > > > > > > > >> > > > > > > > > > > >> > > > 2) offset = findOffsets(tp -> {offset, leader epoch}) > > > > > > > >> > > > > > > > > > > >> > > > 3) seek(offset) > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > For the use-case where the users store committed > offsets > > > > > > > externally: > > > > > > > >> > > > > > > > > > > >> > > > 1) Such users would have to track the leader epoch > > > together > > > > > with > > > > > > > an > > > > > > > >> > > offset. > > > > > > > >> > > > Otherwise, there is no way to detect later what leader > > > epoch > > > > > was > > > > > > > >> > > associated > > > > > > > >> > > > with the message. I think it’s reasonable to ask that > > from > > > > > users > > > > > > > if > > > > > > > >> > they > > > > > > > >> > > > want to detect log truncation. Otherwise, they will > get > > > the > > > > > > > current > > > > > > > >> > > > behavior. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > If the users currently get an offset to be stored > using > > > > > > > position(), > > > > > > > >> I > > > > > > > >> > see > > > > > > > >> > > > two possibilities. First, they call save offset > returned > > > > from > > > > > > > >> > position() > > > > > > > >> > > > that they call before poll(). In that case, it would > not > > > be > > > > > > > correct > > > > > > > >> to > > > > > > > >> > > > store {offset, leader epoch} if we would have changed > > > > > position() > > > > > > > to > > > > > > > >> > > return > > > > > > > >> > > > {offset, leader epoch} since actual fetched message > > could > > > be > > > > > > > >> different > > > > > > > >> > > > (from the example I described earlier). So, it would > be > > > more > > > > > > > >> correct to > > > > > > > >> > > > call position() after poll(). However, the user > already > > > gets > > > > > > > >> > > > ConsumerRecords at this point, from which the user can > > > > extract > > > > > > > >> {offset, > > > > > > > >> > > > leader epoch} of the last message. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > So, I like the idea of adding a helper method to > > > > > > ConsumerRecords, > > > > > > > as > > > > > > > >> > > Jason > > > > > > > >> > > > proposed, something like: > > > > > > > >> > > > > > > > > > > >> > > > public OffsetAndEpoch lastOffsetWithLeaderEpoch(), > where > > > > > > > >> OffsetAndEpoch > > > > > > > >> > > is > > > > > > > >> > > > a data struct holding {offset, leader epoch}. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > In this case, we would advise the user to follow the > > > > workflow: > > > > > > > >> poll(), > > > > > > > >> > > get > > > > > > > >> > > > {offset, leader epoch} from > > ConsumerRecords#lastOffsetWith > > > > > > > >> > LeaderEpoch(), > > > > > > > >> > > > save offset and leader epoch, process records. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > 2) When the user needs to seek to the last committed > > > offset, > > > > > > they > > > > > > > >> call > > > > > > > >> > > new > > > > > > > >> > > > findOffsets(saved offset, leader epoch), and then > > > > > seek(offset). > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > What do you think? > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > Thanks, > > > > > > > >> > > > > > > > > > > >> > > > Anna > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > On Tue, Jul 3, 2018 at 4:06 PM Dong Lin < > > > > lindon...@gmail.com> > > > > > > > >> wrote: > > > > > > > >> > > > > > > > > > > >> > > > > Hey Jason, > > > > > > > >> > > > > > > > > > > > >> > > > > Thanks much for your thoughtful explanation. > > > > > > > >> > > > > > > > > > > > >> > > > > Yes the solution using findOffsets(offset, > > leaderEpoch) > > > > also > > > > > > > >> works. > > > > > > > >> > The > > > > > > > >> > > > > advantage of this solution it adds only one API > > instead > > > of > > > > > two > > > > > > > >> APIs. > > > > > > > >> > > The > > > > > > > >> > > > > concern is that its usage seems a bit more clumsy > for > > > > > advanced > > > > > > > >> users. > > > > > > > >> > > > More > > > > > > > >> > > > > specifically, advanced users who store offsets > > > externally > > > > > will > > > > > > > >> always > > > > > > > >> > > > need > > > > > > > >> > > > > to call findOffsets() before calling seek(offset) > > during > > > > > > > consumer > > > > > > > >> > > > > initialization. And those advanced users will need > to > > > > > manually > > > > > > > >> keep > > > > > > > >> > > track > > > > > > > >> > > > > of the leaderEpoch of the last ConsumerRecord. > > > > > > > >> > > > > > > > > > > > >> > > > > The other solution may be more user-friendly for > > > advanced > > > > > > users > > > > > > > >> is to > > > > > > > >> > > add > > > > > > > >> > > > > two APIs, `void seek(offset, leaderEpoch)` and > > `(offset, > > > > > > epoch) > > > > > > > = > > > > > > > >> > > > > offsetEpochs(topicPartition)`. > > > > > > > >> > > > > > > > > > > > >> > > > > I kind of prefer the second solution because it is > > > easier > > > > to > > > > > > use > > > > > > > >> for > > > > > > > >> > > > > advanced users. If we need to expose leaderEpoch > > anyway > > > to > > > > > > > safely > > > > > > > >> > > > identify > > > > > > > >> > > > > a message, it may be conceptually simpler to expose > it > > > > > > directly > > > > > > > in > > > > > > > >> > > > > seek(...) rather than requiring one more translation > > > using > > > > > > > >> > > > > findOffsets(...). But I am also OK with the first > > > solution > > > > > if > > > > > > > >> other > > > > > > > >> > > > > developers also favor that one :) > > > > > > > >> > > > > > > > > > > > >> > > > > Thanks, > > > > > > > >> > > > > Dong > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson < > > > > > > > >> ja...@confluent.io > > > > > > > >> > > > > > > > > > >> > > > > wrote: > > > > > > > >> > > > > > > > > > > > >> > > > > > Hi Dong, > > > > > > > >> > > > > > > > > > > > > >> > > > > > Thanks, I've been thinking about your suggestions > a > > > bit. > > > > > It > > > > > > is > > > > > > > >> > > > > challenging > > > > > > > >> > > > > > to make this work given the current APIs. One of > the > > > > > > > >> difficulties > > > > > > > >> > is > > > > > > > >> > > > that > > > > > > > >> > > > > > we don't have an API to find the leader epoch for > a > > > > given > > > > > > > >> offset at > > > > > > > >> > > the > > > > > > > >> > > > > > moment. So if the user does a seek to offset 5, > then > > > > we'll > > > > > > > need > > > > > > > >> a > > > > > > > >> > new > > > > > > > >> > > > API > > > > > > > >> > > > > > to find the corresponding epoch in order to > fulfill > > > the > > > > > new > > > > > > > >> > > position() > > > > > > > >> > > > > API. > > > > > > > >> > > > > > Potentially we could modify ListOffsets to enable > > > > finding > > > > > > the > > > > > > > >> > leader > > > > > > > >> > > > > epoch, > > > > > > > >> > > > > > but I am not sure it is worthwhile. Perhaps it is > > > > > reasonable > > > > > > > for > > > > > > > >> > > > advanced > > > > > > > >> > > > > > usage to expect that the epoch information, if > > needed, > > > > > will > > > > > > be > > > > > > > >> > > > extracted > > > > > > > >> > > > > > from the records directly? It might make sense to > > > > expose a > > > > > > > >> helper > > > > > > > >> > in > > > > > > > >> > > > > > `ConsumerRecords` to make this a little easier > > though. > > > > > > > >> > > > > > > > > > > > > >> > > > > > Alternatively, if we think it is important to have > > > this > > > > > > > >> information > > > > > > > >> > > > > exposed > > > > > > > >> > > > > > directly, we could create batch APIs to solve the > > > naming > > > > > > > >> problem. > > > > > > > >> > For > > > > > > > >> > > > > > example: > > > > > > > >> > > > > > > > > > > > > >> > > > > > Map<TopicPartition, OffsetAndEpoch> positions(); > > > > > > > >> > > > > > void seek(Map<TopicPartition, OffsetAndEpoch> > > > > positions); > > > > > > > >> > > > > > > > > > > > > >> > > > > > However, I'm actually leaning toward leaving the > > > seek() > > > > > and > > > > > > > >> > > position() > > > > > > > >> > > > > APIs > > > > > > > >> > > > > > unchanged. Instead, we can add a new API to search > > for > > > > > > offset > > > > > > > by > > > > > > > >> > > > > timestamp > > > > > > > >> > > > > > or by offset/leader epoch. Let's say we call it > > > > > > `findOffsets`. > > > > > > > >> If > > > > > > > >> > the > > > > > > > >> > > > > user > > > > > > > >> > > > > > hits a log truncation error, they can use this API > > to > > > > find > > > > > > the > > > > > > > >> > > closest > > > > > > > >> > > > > > offset and then do a seek(). At the same time, we > > > > > deprecate > > > > > > > the > > > > > > > >> > > > > > `offsetsForTimes` APIs. We now have two use cases > > > which > > > > > > > require > > > > > > > >> > > finding > > > > > > > >> > > > > > offsets, so I think we should make this API > general > > > and > > > > > > leave > > > > > > > >> the > > > > > > > >> > > door > > > > > > > >> > > > > open > > > > > > > >> > > > > > for future extensions. > > > > > > > >> > > > > > > > > > > > > >> > > > > > By the way, I'm unclear about the desire to move > > part > > > of > > > > > > this > > > > > > > >> > > > > functionality > > > > > > > >> > > > > > to AdminClient. Guozhang suggested this > previously, > > > but > > > > I > > > > > > > think > > > > > > > >> it > > > > > > > >> > > only > > > > > > > >> > > > > > makes sense for cross-cutting capabilities such as > > > topic > > > > > > > >> creation. > > > > > > > >> > If > > > > > > > >> > > > we > > > > > > > >> > > > > > have an API which is primarily useful by > consumers, > > > > then I > > > > > > > think > > > > > > > >> > > that's > > > > > > > >> > > > > > where it should be exposed. The AdminClient also > has > > > its > > > > > own > > > > > > > API > > > > > > > >> > > > > integrity > > > > > > > >> > > > > > and should not become a dumping ground for > advanced > > > use > > > > > > cases. > > > > > > > >> I'll > > > > > > > >> > > > > update > > > > > > > >> > > > > > the KIP with the `findOffsets` API suggested > above > > > and > > > > we > > > > > > can > > > > > > > >> see > > > > > > > >> > if > > > > > > > >> > > > it > > > > > > > >> > > > > > does a good enough job of keeping the API simple > for > > > > > common > > > > > > > >> cases. > > > > > > > >> > > > > > > > > > > > > >> > > > > > Thanks, > > > > > > > >> > > > > > Jason > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin < > > > > > > > lindon...@gmail.com> > > > > > > > >> > > wrote: > > > > > > > >> > > > > > > > > > > > > >> > > > > > > Hey Jason, > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Regarding seek(...), it seems that we want an > API > > > for > > > > > user > > > > > > > to > > > > > > > >> > > > > initialize > > > > > > > >> > > > > > > consumer with (offset, leaderEpoch) and that API > > > > should > > > > > > > allow > > > > > > > >> > > > throwing > > > > > > > >> > > > > > > PartitionTruncationException. Suppose we agree > on > > > > this, > > > > > > then > > > > > > > >> > > > > > > seekToNearest() is not sufficient because it > will > > > > always > > > > > > > >> swallow > > > > > > > >> > > > > > > PartitionTruncationException. Here we have two > > > > options. > > > > > > The > > > > > > > >> first > > > > > > > >> > > > > option > > > > > > > >> > > > > > is > > > > > > > >> > > > > > > to add API offsetsForLeaderEpochs() to translate > > > > > > > (leaderEpoch, > > > > > > > >> > > > offset) > > > > > > > >> > > > > to > > > > > > > >> > > > > > > offset. The second option is to have add > > > seek(offset, > > > > > > > >> > leaderEpoch). > > > > > > > >> > > > It > > > > > > > >> > > > > > > seems that second option may be more simpler > > because > > > > it > > > > > > > makes > > > > > > > >> it > > > > > > > >> > > > clear > > > > > > > >> > > > > > that > > > > > > > >> > > > > > > (offset, leaderEpoch) will be used to identify > > > > > consumer's > > > > > > > >> > position > > > > > > > >> > > > in a > > > > > > > >> > > > > > > partition. And user only needs to handle > > > > > > > >> > > PartitionTruncationException > > > > > > > >> > > > > > from > > > > > > > >> > > > > > > the poll(). In comparison the first option > seems a > > > bit > > > > > > > harder > > > > > > > >> to > > > > > > > >> > > use > > > > > > > >> > > > > > > because user have to also handle the > > > > > > > >> PartitionTruncationException > > > > > > > >> > > if > > > > > > > >> > > > > > > offsetsForLeaderEpochs() returns different > offset > > > from > > > > > > > >> > > user-provided > > > > > > > >> > > > > > > offset. What do you think? > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > If we decide to add API seek(offset, > leaderEpoch), > > > > then > > > > > we > > > > > > > can > > > > > > > >> > > decide > > > > > > > >> > > > > > > whether and how to add API to translate (offset, > > > > > > > leaderEpoch) > > > > > > > >> to > > > > > > > >> > > > > offset. > > > > > > > >> > > > > > It > > > > > > > >> > > > > > > seems that this API will be needed by advanced > > user > > > to > > > > > > don't > > > > > > > >> want > > > > > > > >> > > > auto > > > > > > > >> > > > > > > offset reset (so that it can be notified) but > > still > > > > > wants > > > > > > to > > > > > > > >> > reset > > > > > > > >> > > > > offset > > > > > > > >> > > > > > > to closest. For those users if probably makes > > sense > > > to > > > > > > only > > > > > > > >> have > > > > > > > >> > > the > > > > > > > >> > > > > API > > > > > > > >> > > > > > in > > > > > > > >> > > > > > > AdminClient. offsetsForTimes() seems like a > common > > > API > > > > > > that > > > > > > > >> will > > > > > > > >> > be > > > > > > > >> > > > > > needed > > > > > > > >> > > > > > > by user's of consumer in general, so it may be > > more > > > > > > > >> reasonable to > > > > > > > >> > > > stay > > > > > > > >> > > > > in > > > > > > > >> > > > > > > the consumer API. I don't have a strong opinion > on > > > > > whether > > > > > > > >> > > > > > > offsetsForTimes() should be replaced by API in > > > > > > AdminClient. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Though (offset, leaderEpoch) is needed to > uniquely > > > > > > identify > > > > > > > a > > > > > > > >> > > message > > > > > > > >> > > > > in > > > > > > > >> > > > > > > general, it is only needed for advanced users > who > > > has > > > > > > turned > > > > > > > >> on > > > > > > > >> > > > unclean > > > > > > > >> > > > > > > leader election, need to use seek(..), and don't > > > want > > > > > auto > > > > > > > >> offset > > > > > > > >> > > > > reset. > > > > > > > >> > > > > > > Most other users probably just want to enable > auto > > > > > offset > > > > > > > >> reset > > > > > > > >> > and > > > > > > > >> > > > > store > > > > > > > >> > > > > > > offset in Kafka. Thus we might want to keep the > > > > existing > > > > > > > >> > > offset-only > > > > > > > >> > > > > APIs > > > > > > > >> > > > > > > (e.g. seek() and position()) for most users > while > > > > adding > > > > > > new > > > > > > > >> APIs > > > > > > > >> > > for > > > > > > > >> > > > > > > advanced users. And yes, it seems that we need > new > > > > name > > > > > > for > > > > > > > >> > > > position(). > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Though I think we need new APIs to carry the new > > > > > > information > > > > > > > >> > (e.g. > > > > > > > >> > > > > > > leaderEpoch), I am not very sure how that should > > > look > > > > > > like. > > > > > > > >> One > > > > > > > >> > > > > possible > > > > > > > >> > > > > > > option is those APIs in KIP-232. Another option > is > > > > > > something > > > > > > > >> like > > > > > > > >> > > > this: > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > ````` > > > > > > > >> > > > > > > class OffsetEpochs { > > > > > > > >> > > > > > > long offset; > > > > > > > >> > > > > > > int leaderEpoch; > > > > > > > >> > > > > > > int partitionEpoch; // This may be needed > > later > > > as > > > > > > > >> discussed > > > > > > > >> > in > > > > > > > >> > > > > > KIP-232 > > > > > > > >> > > > > > > ... // Hopefully these are all we need to > > identify > > > > > > message > > > > > > > >> in > > > > > > > >> > > > Kafka. > > > > > > > >> > > > > > But > > > > > > > >> > > > > > > if we need more then we can add new fields in > this > > > > > class. > > > > > > > >> > > > > > > } > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > OffsetEpochs offsetEpochs(TopicPartition); > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > void seek(TopicPartition, OffsetEpochs); > > > > > > > >> > > > > > > `````` > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Thanks, > > > > > > > >> > > > > > > Dong > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > On Fri, Jun 29, 2018 at 11:13 AM, Jason > Gustafson > > < > > > > > > > >> > > > ja...@confluent.io> > > > > > > > >> > > > > > > wrote: > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > Hey Dong, > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > Thanks for the feedback. The first three > points > > > are > > > > > > easy: > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > 1. Yes, we should be consistent. > > > > > > > >> > > > > > > > 2. Yes, I will add this. > > > > > > > >> > > > > > > > 3. Yes, I think we should document the changes > > to > > > > the > > > > > > > >> committed > > > > > > > >> > > > > offset > > > > > > > >> > > > > > > > schema. I meant to do this, but it slipped my > > > mind. > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > The latter questions are tougher. One option I > > was > > > > > > > >> considering > > > > > > > >> > is > > > > > > > >> > > > to > > > > > > > >> > > > > > have > > > > > > > >> > > > > > > > only `offsetsForLeaderEpochs` exposed from the > > > > > consumer > > > > > > > and > > > > > > > >> to > > > > > > > >> > > drop > > > > > > > >> > > > > the > > > > > > > >> > > > > > > new > > > > > > > >> > > > > > > > seek() API. That seems more consistent with > the > > > > > current > > > > > > > use > > > > > > > >> of > > > > > > > >> > > > > > > > `offsetsForTimes` (we don't have a separate > > > > > > > >> `seekToTimestamp` > > > > > > > >> > > API). > > > > > > > >> > > > > An > > > > > > > >> > > > > > > > alternative might be to take a page from the > > > > > AdminClient > > > > > > > API > > > > > > > >> > and > > > > > > > >> > > > add > > > > > > > >> > > > > a > > > > > > > >> > > > > > > new > > > > > > > >> > > > > > > > method to generalize offset lookup. For > example, > > > we > > > > > > could > > > > > > > >> have > > > > > > > >> > > > > > > > `lookupOffsets(LookupOptions)`. We could then > > > > > deprecate > > > > > > > >> > > > > > > `offsetsForTimes` > > > > > > > >> > > > > > > > and this would open the door for future > > extensions > > > > > > without > > > > > > > >> > > needing > > > > > > > >> > > > > new > > > > > > > >> > > > > > > > APIs. > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > The case of position() is a little more > > annoying. > > > It > > > > > > would > > > > > > > >> have > > > > > > > >> > > > been > > > > > > > >> > > > > > > better > > > > > > > >> > > > > > > > had we let this return an object so that it is > > > > easier > > > > > to > > > > > > > >> > extend. > > > > > > > >> > > > This > > > > > > > >> > > > > > is > > > > > > > >> > > > > > > > the only reason I didn't add the API to the > KIP. > > > > Maybe > > > > > > we > > > > > > > >> > should > > > > > > > >> > > > bite > > > > > > > >> > > > > > the > > > > > > > >> > > > > > > > bullet and fix this now? Unfortunately we'll > > have > > > to > > > > > > come > > > > > > > up > > > > > > > >> > > with a > > > > > > > >> > > > > new > > > > > > > >> > > > > > > > name. Maybe `currentPosition`? > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > Thoughts? > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > -Jason > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin < > > > > > > > >> > lindon...@gmail.com> > > > > > > > >> > > > > > wrote: > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > Regarding points 4) and 5) above, motivation > > for > > > > the > > > > > > > >> > > alternative > > > > > > > >> > > > > APIs > > > > > > > >> > > > > > > is > > > > > > > >> > > > > > > > > that, if we decide that leaderEpoch is > equally > > > > > > important > > > > > > > >> as > > > > > > > >> > > > offset > > > > > > > >> > > > > in > > > > > > > >> > > > > > > > > identifying a message, then it may be > > reasonable > > > > to > > > > > > > always > > > > > > > >> > > > specify > > > > > > > >> > > > > it > > > > > > > >> > > > > > > > > wherever offset is currently required in the > > > > > consumer > > > > > > > API > > > > > > > >> to > > > > > > > >> > > > > > identify a > > > > > > > >> > > > > > > > > message, e.g. position(), seek(). For > example, > > > > since > > > > > > we > > > > > > > >> allow > > > > > > > >> > > > user > > > > > > > >> > > > > to > > > > > > > >> > > > > > > > > retrieve offset using position() instead of > > > asking > > > > > > user > > > > > > > to > > > > > > > >> > keep > > > > > > > >> > > > > track > > > > > > > >> > > > > > > of > > > > > > > >> > > > > > > > > the offset of the latest ConsumerRecord, may > > be > > > it > > > > > > will > > > > > > > be > > > > > > > >> > more > > > > > > > >> > > > > > > > consistent > > > > > > > >> > > > > > > > > for user to also retrieve leaderEpoch using > > > > > > position()? > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin < > > > > > > > >> > > lindon...@gmail.com> > > > > > > > >> > > > > > > wrote: > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > Hey Jason, > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks for the update. It looks pretty > good. > > > > Just > > > > > > some > > > > > > > >> > minor > > > > > > > >> > > > > > comments > > > > > > > >> > > > > > > > > > below: > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > 1) The KIP adds new error code > > > "LOG_TRUNCATION" > > > > > and > > > > > > > new > > > > > > > >> > > > exception > > > > > > > >> > > > > > > > > TruncatedPartitionException. > > > > > > > >> > > > > > > > > > Can we make the name more consistent, e.g. > > > > > > > >> > > > > LogTruncationException? > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > 2) Do we need to add > > > UnknownLeaderEpochException > > > > > as > > > > > > > >> part of > > > > > > > >> > > API > > > > > > > >> > > > > > > change? > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > 3) Not sure if the offset topic schema is > > also > > > > > > public > > > > > > > >> API. > > > > > > > >> > If > > > > > > > >> > > > so, > > > > > > > >> > > > > > > maybe > > > > > > > >> > > > > > > > > we > > > > > > > >> > > > > > > > > > should also include the schema change in > the > > > > API? > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > 4) For users who store offset externally, > > > > > currently > > > > > > > they > > > > > > > >> > get > > > > > > > >> > > > > offset > > > > > > > >> > > > > > > > using > > > > > > > >> > > > > > > > > > position(..), store the offset externally, > > and > > > > use > > > > > > > >> seek(..) > > > > > > > >> > > to > > > > > > > >> > > > > > > > initialize > > > > > > > >> > > > > > > > > > the consumer next time. After this KIP > they > > > will > > > > > > need > > > > > > > to > > > > > > > >> > > store > > > > > > > >> > > > > and > > > > > > > >> > > > > > > use > > > > > > > >> > > > > > > > > the > > > > > > > >> > > > > > > > > > leaderEpoch together with the offset. > Should > > > we > > > > > also > > > > > > > >> update > > > > > > > >> > > the > > > > > > > >> > > > > API > > > > > > > >> > > > > > > so > > > > > > > >> > > > > > > > > that > > > > > > > >> > > > > > > > > > user can also get leaderEpoch from > > > > position(...)? > > > > > > Not > > > > > > > >> sure > > > > > > > >> > if > > > > > > > >> > > > it > > > > > > > >> > > > > is > > > > > > > >> > > > > > > OK > > > > > > > >> > > > > > > > to > > > > > > > >> > > > > > > > > > ask user to track the latest leaderEpoch > of > > > > > > > >> ConsumerRecord > > > > > > > >> > by > > > > > > > >> > > > > > > > themselves. > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > 5) Also for users who store offset > > externally, > > > > > they > > > > > > > >> need to > > > > > > > >> > > > call > > > > > > > >> > > > > > > > seek(..) > > > > > > > >> > > > > > > > > > with leaderEpoch to initialize consumer. > > With > > > > > > current > > > > > > > >> KIP > > > > > > > >> > > users > > > > > > > >> > > > > > need > > > > > > > >> > > > > > > to > > > > > > > >> > > > > > > > > > call seekToNearest(), whose name suggests > > that > > > > the > > > > > > > final > > > > > > > >> > > > position > > > > > > > >> > > > > > may > > > > > > > >> > > > > > > > be > > > > > > > >> > > > > > > > > > different from what was requested. > However, > > if > > > > > users > > > > > > > may > > > > > > > >> > want > > > > > > > >> > > > to > > > > > > > >> > > > > > > avoid > > > > > > > >> > > > > > > > > auto > > > > > > > >> > > > > > > > > > offset reset and be notified explicitly > when > > > > there > > > > > > is > > > > > > > >> log > > > > > > > >> > > > > > truncation, > > > > > > > >> > > > > > > > > then seekToNearest() > > > > > > > >> > > > > > > > > > probably does not help here. Would it make > > > sense > > > > > to > > > > > > > >> replace > > > > > > > >> > > > > > > > > seekToNearest() > > > > > > > >> > > > > > > > > > with seek(offset, leaderEpoch) + > AminClient. > > > > > > > >> > > > > > > > offsetsForLeaderEpochs(...)? > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks, > > > > > > > >> > > > > > > > > > Dong > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > On Wed, Jun 27, 2018 at 3:57 PM, Jason > > > > Gustafson < > > > > > > > >> > > > > > ja...@confluent.io > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > wrote: > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > >> Hey Guozhang, > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > >> That's fair. In fact, perhaps we do not > > need > > > > this > > > > > > API > > > > > > > >> at > > > > > > > >> > > all. > > > > > > > >> > > > We > > > > > > > >> > > > > > > > already > > > > > > > >> > > > > > > > > >> have the new seek() in this KIP which can > > do > > > > the > > > > > > > lookup > > > > > > > >> > > based > > > > > > > >> > > > on > > > > > > > >> > > > > > > epoch > > > > > > > >> > > > > > > > > for > > > > > > > >> > > > > > > > > >> this use case. I guess we should probably > > > call > > > > it > > > > > > > >> > > > > seekToNearest() > > > > > > > >> > > > > > > > though > > > > > > > >> > > > > > > > > >> to > > > > > > > >> > > > > > > > > >> make it clear that the final position may > > be > > > > > > > different > > > > > > > >> > from > > > > > > > >> > > > what > > > > > > > >> > > > > > was > > > > > > > >> > > > > > > > > >> requested. > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > >> Thanks, > > > > > > > >> > > > > > > > > >> Jason > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang > > > Wang > > > > < > > > > > > > >> > > > > > wangg...@gmail.com> > > > > > > > >> > > > > > > > > >> wrote: > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > >> > Hi Jason, > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > I think it is less worthwhile to add > > > > > > > >> > > > > > > KafkaConsumer#offsetsForLeader > > > > > > > >> > > > > > > > > >> Epochs, > > > > > > > >> > > > > > > > > >> > since probably only very advanced users > > are > > > > > aware > > > > > > > of > > > > > > > >> the > > > > > > > >> > > > > > > > leaderEpoch, > > > > > > > >> > > > > > > > > >> and > > > > > > > >> > > > > > > > > >> > hence ever care to use it anyways. It > is > > > more > > > > > > like > > > > > > > an > > > > > > > >> > > admin > > > > > > > >> > > > > > client > > > > > > > >> > > > > > > > > >> > operation than a consumer client > > operation: > > > > if > > > > > > the > > > > > > > >> > > > motivation > > > > > > > >> > > > > is > > > > > > > >> > > > > > > to > > > > > > > >> > > > > > > > > >> > facility customized reset policy, maybe > > > > adding > > > > > it > > > > > > > as > > > > > > > >> > > > > > > > > >> > AdminClient#offsetsForLeaderEpochs > > > > > > > >> > > > > > > > > >> > is better as it is not an aggressive > > > > assumption > > > > > > > that > > > > > > > >> for > > > > > > > >> > > > such > > > > > > > >> > > > > > > > advanced > > > > > > > >> > > > > > > > > >> > users they are willing to use some > admin > > > > client > > > > > > to > > > > > > > >> get > > > > > > > >> > > > further > > > > > > > >> > > > > > > > > >> information? > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > Guozhang > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason > > > > > Gustafson > > > > > > < > > > > > > > >> > > > > > > > ja...@confluent.io> > > > > > > > >> > > > > > > > > >> > wrote: > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > Thanks for the feedback. I've updated > > the > > > > > KIP. > > > > > > > >> > > > Specifically > > > > > > > >> > > > > I > > > > > > > >> > > > > > > > > removed > > > > > > > >> > > > > > > > > >> the > > > > > > > >> > > > > > > > > >> > > "closest" reset option and the > proposal > > > to > > > > > > reset > > > > > > > by > > > > > > > >> > > > > timestamp > > > > > > > >> > > > > > > when > > > > > > > >> > > > > > > > > the > > > > > > > >> > > > > > > > > >> > > precise truncation point cannot be > > > > > determined. > > > > > > > >> > Instead, > > > > > > > >> > > I > > > > > > > >> > > > > > > proposed > > > > > > > >> > > > > > > > > >> that > > > > > > > >> > > > > > > > > >> > we > > > > > > > >> > > > > > > > > >> > > always reset using the nearest epoch > > > when a > > > > > > reset > > > > > > > >> > policy > > > > > > > >> > > > is > > > > > > > >> > > > > > > > defined > > > > > > > >> > > > > > > > > >> > (either > > > > > > > >> > > > > > > > > >> > > "earliest" or "latest"). Does that > > sound > > > > > > > >> reasonable? > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > One thing I am still debating is > > whether > > > it > > > > > > would > > > > > > > >> be > > > > > > > >> > > > better > > > > > > > >> > > > > to > > > > > > > >> > > > > > > > have > > > > > > > >> > > > > > > > > a > > > > > > > >> > > > > > > > > >> > > separate API to find the closest > offset > > > > using > > > > > > the > > > > > > > >> > leader > > > > > > > >> > > > > > epoch. > > > > > > > >> > > > > > > In > > > > > > > >> > > > > > > > > the > > > > > > > >> > > > > > > > > >> > > current KIP, I suggested to piggyback > > > this > > > > > > > >> information > > > > > > > >> > > on > > > > > > > >> > > > an > > > > > > > >> > > > > > > > > >> exception, > > > > > > > >> > > > > > > > > >> > but > > > > > > > >> > > > > > > > > >> > > I'm beginning to think it would be > > better > > > > not > > > > > > to > > > > > > > >> hide > > > > > > > >> > > the > > > > > > > >> > > > > > > lookup. > > > > > > > >> > > > > > > > It > > > > > > > >> > > > > > > > > >> is > > > > > > > >> > > > > > > > > >> > > awkward to implement since it means > > > > delaying > > > > > > the > > > > > > > >> > > exception > > > > > > > >> > > > > and > > > > > > > >> > > > > > > the > > > > > > > >> > > > > > > > > API > > > > > > > >> > > > > > > > > >> > may > > > > > > > >> > > > > > > > > >> > > actually be useful when customizing > > reset > > > > > logic > > > > > > > if > > > > > > > >> no > > > > > > > >> > > auto > > > > > > > >> > > > > > reset > > > > > > > >> > > > > > > > > >> policy > > > > > > > >> > > > > > > -- -- Guozhang