Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-07 Thread Jason Gustafson
Hey Dong, I see what you mean. This would have been clearer if the committed offset was the offset of the last consumed record. I don't feel too strongly about it either. Perhaps we can use the more concise name and just rely on the documentation to explain its usage. It should be rare that users

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-07 Thread Dong Lin
Hey Jason, Thanks for the reply. Regarding 3), I am thinking that both "Offset" and "LastLeaderEpoch" in the OffsetCommitRequest are associated with the last consumed messages. Value of "Offset" is not necessarily the offset of the next message due to log compaction. Since we are naming "Offset"

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-07 Thread Jason Gustafson
Hi Dong, Thanks for the comments. 1) Yes, makes sense. 2) This is an interesting point. The suggestion made more sense in the initial version of the KIP, but I think you are right that we should use the same fencing semantics we use for the Fetch and OffsetForLeaderEpoch APIs. Just like a

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-07 Thread Jason Gustafson
Hey Jun, 57. It's a fair point. I could go either way, but I'm slightly inclined to just document the new API for now. We'll still support seeking to an offset with corresponding epoch information, so deprecating the old seek() seems like overkill. 60. The phrasing was a little confusing. Does

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-07 Thread Dong Lin
Hey Jason, Thanks for the update. I have some comments below: 1) Since FencedLeaderEpochException indicates that the metadata in the client is outdated, should it extend InvalidMetadataException? 2) It is mentioned that "To fix the problem with KIP-232, we will add the leader epoch the

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-06 Thread Jun Rao
Hi, Jason, Thanks for the reply. They all make sense. Just a couple of more minor comments. 57. I was thinking that if will be useful to encourage people to use the new seek() api to get better semantics. Deprecating the old seek api is one way. I guess we could also just document it for now.

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-06 Thread Jason Gustafson
Hi Jun, I spent a little more time looking at the usage in WorkerSinkTask. I think actually the initialization of the positions in the assignment callback is not strictly necessary. We keep a map of the current consumed offsets which is updated as we consume the data. As far as I can tell, we

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-06 Thread Jason Gustafson
Hey Jun, Thanks for the review. Responses below: 50. Yes, that is right. I clarified this in the KIP. 51. Yes, updated the KIP to mention. 52. Yeah, this was a reference to a previous iteration. I've fixed it. 53. I changed the API to use an `Optional` for the leader epoch and added a note

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-03 Thread Jun Rao
Hi, Jason, Thanks for the updated KIP. Well thought-through. Just a few minor comments below. 50. For seek(TopicPartition partition, OffsetAndMetadata offset), I guess under the cover, it will make OffsetsForLeaderEpoch request to determine if the seeked offset is still valid before fetching? If

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-03 Thread Jason Gustafson
Hey All, I think I've addressed all pending review. If there is no additional feedback, I'll plan to start a vote thread next week. Thanks, Jason On Tue, Jul 31, 2018 at 9:46 AM, Dong Lin wrote: > Hey Jason, > > Thanks for your reply. I will comment below. > > Regarding 1, we probably can not

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-31 Thread Dong Lin
Hey Jason, Thanks for your reply. I will comment below. Regarding 1, we probably can not simply rename both to `LeaderEpoch` because we already have a LeaderEpoch field in OffsetsForLeaderEpoch. Regarding 5, I am not strong on this. I agree with the two benefits of having two error codes: 1)

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-30 Thread Jason Gustafson
Hey Dong, Thanks for the detailed review. Responses below: 1/2: Thanks for noticing the inconsistency. Would it be reasonable to simply call it LeaderEpoch for both APIs? 3: I agree it should be a map. I will update. 4: Fair point. I think we should always be able to identify an offset. Let's

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-27 Thread Dong Lin
Hey Jason, Thanks for the update! I agree with the current proposal overall. I have some minor comments related to naming etc. 1) I am not strong and will just leave it here for discussion. Would it be better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for the new field in the

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-27 Thread Jason Gustafson
Thanks Anna, you are right on both points. I updated the KIP. -Jason On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner wrote: > Hi Jason, > > Thanks for the update. I agree with the current proposal. > > Two minor comments: > 1) In “API Changes” section, first paragraph says that “users can catch

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-26 Thread Anna Povzner
Hi Jason, Thanks for the update. I agree with the current proposal. Two minor comments: 1) In “API Changes” section, first paragraph says that “users can catch the more specific exception type and use the new `seekToNearest()` API defined below.”. Since LogTruncationException “will include the

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-25 Thread Jason Gustafson
Hi All, I have made some updates to the KIP. As many of you know, a side project of mine has been specifying the Kafka replication protocol in TLA. You can check out the code here if you are interested: https://github.com/hachikuji/kafka-specification. In addition to uncovering a couple unknown

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-20 Thread Guozhang Wang
Hi Jason, The proposed API seems reasonable to me too. Could you please also update the wiki page ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation) with a section say "workflow" on how the proposed API will be co-used with others

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-17 Thread Anna Povzner
Hi Jason, I also like your proposal and agree that KafkaConsumer#seekToCommitted() is more intuitive as a way to initialize both consumer's position and its fetch state. My understanding that KafkaConsumer#seekToCommitted() is purely for clients who store their offsets externally, right? And

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-12 Thread Dong Lin
Hey Jason, It is a great summary. The solution sounds good. I might have minor comments regarding the method name. But we can discuss that minor points later after we reach consensus on the high level API. Thanks, Dong On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson wrote: > Hey Anna and

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-12 Thread Jason Gustafson
Hey Anna and Dong, Thanks a lot for the great discussion. I've been hanging back a bit because honestly the best option hasn't seemed clear. I agree with Anna's general observation that there is a distinction between the position of the consumer and its fetch state up to that position. If you

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-11 Thread Dong Lin
Hey Anna, Thanks much for the explanation. Approach 1 also sounds good to me. I think findOffsets() is useful for users who don't use automatic offset reset policy. Just one more question. Since users who store offsets externally need to provide leaderEpoch to findOffsets(...), do we need an

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-11 Thread Anna Povzner
Hi Dong, What I called “not covering all use cases” is what you call best-effort (not guaranteeing some corner cases). I think we are on the same page here. I wanted to be clear in the API whether the consumer seeks to a position (offset) or to a record (offset, leader epoch). The only

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-10 Thread Dong Lin
Hey Anna, Thanks for the comment. To answer your question, it seems that we can cover all case in this KIP. As stated in "Consumer Handling" section, KIP-101 based approach will be used to derive the truncation offset from the 2-tuple (offset, leaderEpoch). This approach is best effort and it is

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-10 Thread Anna Povzner
Sorry, I hit "send" before finishing. Continuing... 2) Hiding most of the consumer handling log truncation logic with minimal exposure in KafkaConsumer API. I was proposing this path. Before answering your specific questions… I want to answer to your comment “In general, maybe we should

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-10 Thread Anna Povzner
Hi Dong, Thanks for the follow up! I finally have much more clear understanding of where you are coming from. You are right. The success of findOffsets()/finding a point of non-divergence depends on whether we have enough entries in the consumer's leader epoch cache. However, I think this is a

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-10 Thread Dong Lin
Hey Anna, Thanks much for your detailed explanation and example! It does help me understand the difference between our understanding. So it seems that the solution based on findOffsets() currently focuses mainly on the scenario that consumer has cached leaderEpoch -> offset mapping whereas I was

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-09 Thread Anna Povzner
Hi Dong, Thanks for considering my suggestions. Based on your comments, I realized that my suggestion was not complete with regard to KafkaConsumer API vs. consumer-broker protocol. While I propose to keep KafkaConsumer#seek() unchanged and take offset only, the underlying consumer will send

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-07 Thread Dong Lin
Hey Anna, Thanks much for the thoughtful reply. It makes sense to different between "seeking to a message" and "seeking to a position". I have to questions here: - For "seeking to a message" use-case, with the proposed approach user needs to call findOffset(offset, leaderEpoch) followed by

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-06 Thread Jun Rao
Hi, Jason, Thanks for the KIP. Looks good overall. Just a few minor comments below. 1. "As the consumer is fetching from a partition, it will keep a small cache of the recent epochs that were fetched for each partition. " Do we need to cache more than one leader epoch? Also, during consumer

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-04 Thread Anna Povzner
Hi Jason and Dong, I’ve been thinking about your suggestions and discussion regarding position(), seek(), and new proposed API. Here is my thought process why we should keep position() and seek() API unchanged. I think we should separate {offset, leader epoch} that uniquely identifies a

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-03 Thread Dong Lin
Hey Jason, Thanks much for your thoughtful explanation. Yes the solution using findOffsets(offset, leaderEpoch) also works. The advantage of this solution it adds only one API instead of two APIs. The concern is that its usage seems a bit more clumsy for advanced users. More specifically,

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-02 Thread Jason Gustafson
Hi Dong, Thanks, I've been thinking about your suggestions a bit. It is challenging to make this work given the current APIs. One of the difficulties is that we don't have an API to find the leader epoch for a given offset at the moment. So if the user does a seek to offset 5, then we'll need a

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-30 Thread Dong Lin
Hey Jason, Regarding seek(...), it seems that we want an API for user to initialize consumer with (offset, leaderEpoch) and that API should allow throwing PartitionTruncationException. Suppose we agree on this, then seekToNearest() is not sufficient because it will always swallow

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-29 Thread Jason Gustafson
Hey Dong, Thanks for the feedback. The first three points are easy: 1. Yes, we should be consistent. 2. Yes, I will add this. 3. Yes, I think we should document the changes to the committed offset schema. I meant to do this, but it slipped my mind. The latter questions are tougher. One option I

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-29 Thread Dong Lin
Regarding points 4) and 5) above, motivation for the alternative APIs is that, if we decide that leaderEpoch is equally important as offset in identifying a message, then it may be reasonable to always specify it wherever offset is currently required in the consumer API to identify a message, e.g.

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-29 Thread Dong Lin
Hey Jason, Thanks for the update. It looks pretty good. Just some minor comments below: 1) The KIP adds new error code "LOG_TRUNCATION" and new exception TruncatedPartitionException. Can we make the name more consistent, e.g. LogTruncationException? 2) Do we need to add

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Guozhang Wang
Sounds good to me. On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson wrote: > Hey Guozhang, > > That's fair. In fact, perhaps we do not need this API at all. We already > have the new seek() in this KIP which can do the lookup based on epoch for > this use case. I guess we should probably call

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Jason Gustafson
Hey Guozhang, That's fair. In fact, perhaps we do not need this API at all. We already have the new seek() in this KIP which can do the lookup based on epoch for this use case. I guess we should probably call it seekToNearest() though to make it clear that the final position may be different from

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Guozhang Wang
Hi Jason, I think it is less worthwhile to add KafkaConsumer#offsetsForLeaderEpochs, since probably only very advanced users are aware of the leaderEpoch, and hence ever care to use it anyways. It is more like an admin client operation than a consumer client operation: if the motivation is to

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Jason Gustafson
Thanks for the feedback. I've updated the KIP. Specifically I removed the "closest" reset option and the proposal to reset by timestamp when the precise truncation point cannot be determined. Instead, I proposed that we always reset using the nearest epoch when a reset policy is defined (either

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Jason Gustafson
@Dong Those are fair points. Both approaches require some fuzziness to reset the offset in these pathological scenarios and we cannot guarantee at-least-once delivery either way unless we have the full history of leader epochs that were consumed. The KIP-101 logic may actually be more accurate

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Guozhang Wang
Jason, thanks for the KIP. A few comments: * I think Dong's question about whether to use timestamp-based approach v.s. start-offset-of-first-larger-epoch is valid; more specifically, with timestamp-based approach we may still be reseting to an offset falling into the truncated interval, and

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-26 Thread Dong Lin
Hey Jason, Thanks for the explanation. Please correct me if this is wrong. The "unknown truncation offset" scenario happens when consumer does not have the full leaderEpoch -> offset mapping. In this case we can still use the KIP-101-based approach to truncate offset to "start offset of the

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-26 Thread Jason Gustafson
The other thing I forgot to mention is that resetting the offset using the leader epoch is only available with the latest message format. By supporting reset by timestamp, users on the v1 format can still get some benefit from this KIP. -Jason On Tue, Jun 26, 2018 at 11:47 AM, Jason Gustafson

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-26 Thread Jason Gustafson
Hey Dong, Thanks for the comments. - The KIP says that, with auto.offset.reset="closest", timestamp is used to > find offset if truncation offset is unknown. It seems that if consumer > knows the timestamp of the last message, then the consumer should also know > the (offset, leaderEpoch) of the

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-26 Thread Dong Lin
Hey Jason, Thanks for the KIP! It is pretty useful. At high level the new set of reset policies may be a bit complicated and confusing to users. I am wondering whether we can simplify it. A few questions below: - The KIP says that, with auto.offset.reset="closest", timestamp is used to find

[DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-25 Thread Jason Gustafson
Hey All, I wrote up a KIP to handle one more edge case in the replication protocol and to support better handling of truncation in the consumer when unclean leader election is enabled. Let me know what you think.